前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Chromium】ThreadPool的ThreadGroup

【Chromium】ThreadPool的ThreadGroup

原创
作者头像
lealc
发布2024-03-25 10:33:10
1190
发布2024-03-25 10:33:10
举报
文章被收录于专栏:Chromium学习Chromium学习

源码

先附上可用于学习的开源代码:Base库

喜欢可以帮忙Star一下

前言

编译:参考Base库即可

环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17

:本章节需要提前了解ThreadPool

  • ThreadGroup:线程组,线程组的接口和基本实现。线程组是线程池中一部分线程的子集(有关在发布任务和创建任务运行器时选择线程组的逻辑,请参阅 GetThreadGroupForTraits())
  • ThreadGroupImpl:一个运行任务的工作线程组。在调用Start()之前,线程组不会创建线程。任务可以随时发布,但在调用Start()之后才会运行。这个类是线程安全的。

ThreadGroup

这个类是管理线程池中线程的核心类,包含线程的创建、观察、阻塞、回收等事件,ThreadGroup类提供接口,ThreadGroupImpl类提供具体实现。

线程组的接口和基本实现。线程组是线程池中的一部分线程的子集(有关在发布任务和创建任务运行器时选择线程组的逻辑,请参阅 GetThreadGroupForTraits())。

ThreadGroup::Delegate

这个Delegate类对于ThreadGroup意味着它提供了一种机制来确定任务应该被重新入队的线程组。当ThreadGroup运行完一个任务后,如果任务源(TaskSource)仍然非空,就会调用Delegate的GetThreadGroupForTraits方法来确定应该将任务源重新入队到哪个线程组中。

Delegate类是一个接口类,它定义了一个纯虚函数GetThreadGroupForTraits,需要由具体的实现类来实现。通过实现这个方法,可以根据任务的特性(TaskTraits)来决定任务源应该重新入队到哪个线程组中。

这种设计可以让ThreadGroup在处理任务时,根据任务的特性选择合适的线程组来处理任务,从而实现任务的分配和调度。通过Delegate类的实现,可以根据具体的需求和策略来决定任务应该被重新入队到哪个线程组中,以实现更灵活和高效的任务处理。

代码语言:C++
复制
class BASE_EXPORT Delegate {
? public:
? virtual ~Delegate() = default;

? // 当 ThreadGroup 在从非空的 TaskSource 中运行任务后,调用此方法。实现必须返回应该重新入队的 TaskSource 所在的线程组。
? virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0;
};

ThreadGroup::WorkerEnvironment

这个枚举类WorkerEnvironment对于ThreadGroup意味着它定义了不同的工作环境选项,用于指定在工作线程中初始化的特殊环境。

具体来说,WorkerEnvironment枚举类包含以下选项:

  • NONE:表示不需要特殊的工作环境。
  • COM_MTA:表示在工作线程中需要初始化一个多线程单元(COM MTA)。这通常在Windows操作系统上使用,用于支持多线程的COM组件。
  • COM_STA:表示在工作线程中需要初始化一个单线程单元(COM STA)。这同样在Windows操作系统上使用,用于支持单线程的COM组件。

通过使用WorkerEnvironment枚举类,可以在创建ThreadGroup时指定工作线程的特殊环境要求。这样可以确保工作线程在运行任务时具备所需的环境,以满足特定的需求和使用场景。例如,在使用COM组件的情况下,可以选择适当的工作环境来确保COM组件的正确初始化和使用。

代码语言:C++
复制
enum class WorkerEnvironment {
??? // No special worker environment required.
??? NONE,
#if defined(OS_WIN)
??? // Initialize a COM MTA on the worker.
??? COM_MTA,
??? // Initialize a COM STA on the worker.
??? COM_STA,
#endif? // defined(OS_WIN)
? };

线程绑定

将ThreadGroup与当前线程绑定有以下好处:

  1. 任务分配:通过将ThreadGroup与当前线程绑定,可以确保任务被分配给正确的线程组。这对于任务调度和负载均衡非常重要,可以确保任务在适当的线程组中执行,以提高系统的性能和效率。
  2. 上下文共享:通过将ThreadGroup与当前线程绑定,可以在当前线程的上下文中共享ThreadGroup的状态和数据。这对于多线程编程中的协作和通信非常有用,可以方便地共享线程组级别的信息,而不需要显式传递参数或使用全局变量。
  3. 线程安全性:通过将ThreadGroup与当前线程绑定,可以确保在当前线程中访问ThreadGroup的操作是线程安全的。这可以避免多线程环境下的竞态条件和数据竞争问题,提高代码的可靠性和稳定性。
  4. 简化代码:通过将ThreadGroup与当前线程绑定,可以简化代码逻辑和减少参数传递。在当前线程的上下文中,可以直接访问ThreadGroup,而不需要显式传递ThreadGroup作为参数。这可以使代码更加清晰和简洁。

总的来说,将ThreadGroup与当前线程绑定可以提供更好的任务分配、上下文共享、线程安全性和代码简化。这对于多线程编程中的任务调度和协作非常有用,可以提高系统的性能、可维护性和可扩展性。

代码语言:C++
复制
 // Registers the thread group in TLS.
? void BindToCurrentThread();

? // Resets the thread group in TLS.
? void UnbindFromCurrentThread();

? // Returns true if the thread group is registered in TLS.
? bool IsBoundToCurrentThread() const;

在ThreadGroup类中,这些方法的含义如下:

  • BindToCurrentThread(): 将当前线程绑定到ThreadGroup。这将在线程本地存储(TLS)中注册ThreadGroup,以便在当前线程中标识和使用该ThreadGroup。通过调用这个方法,可以将ThreadGroup与当前线程相关联,以便在当前线程中执行任务或进行其他操作。
  • UnbindFromCurrentThread(): 解除当前线程与ThreadGroup的绑定。这将从线程本地存储(TLS)中移除ThreadGroup的注册,不再将ThreadGroup与当前线程关联。通过调用这个方法,可以解除ThreadGroup与当前线程的关联,以便在其他线程或上下文中使用ThreadGroup。
  • IsBoundToCurrentThread() const: 检查ThreadGroup是否已经在当前线程中注册。如果ThreadGroup已经在当前线程的线程本地存储(TLS)中注册,则返回true;否则返回false。通过调用这个方法,可以确定当前线程是否与ThreadGroup相关联,以便在需要时执行相应的操作。

这些方法提供了一种管理ThreadGroup与线程之间关联的机制。通过绑定和解绑ThreadGroup与线程,可以确保在正确的线程上执行任务,并在需要时进行相应的操作和处理。

Public接口

这些方法在ThreadGroup类中具有以下含义:

  • RemoveTaskSource(scopedrefptr<TaskSource> task_source): 从ThreadGroup的优先级队列(priority_queue)中移除指定的任务源(task_source)。如果成功移除,则返回一个RegisteredTaskSource对象,其值为true;如果任务源当前不在优先级队列中(例如,当工作线程正在执行该任务时),则返回false。
  • UpdateSortKey(TransactionWithOwnedTaskSource transaction_with_task_source): 根据任务源的当前特性,更新ThreadGroup的优先级队列中与任务源相关的位置。具体实现应该实例化一个具体的ScopedWorkersExecutor对象,并调用UpdateSortKeyImpl()方法。
  • PushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source): 将TransactionWithRegisteredTaskSource中的任务源推入ThreadGroup的优先级队列,并根据需要唤醒工作线程。具体实现应该实例化一个具体的* ScopedWorkersExecutor对象,并调用PushTaskSourceAndWakeUpWorkersImpl()方法。
  • InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup* destination_thread_group): 从ThreadGroup的优先级队列中移除所有任务源,并将它们转移到另一个目标ThreadGroup中。调用此方法后,任何发布到此ThreadGroup的任务源将被转发到目标ThreadGroup。这个方法在UseNativeThreadPool实验完成后应该被移除。
  • ShouldYield(TaskPriority priority) const: 如果在该线程组中运行的具有给定优先级的任务应尽快返回,则返回true。这可能是因为该优先级不允许运行,或者有更高优先级的任务正在等待执行。该方法是线程安全的,但可能返回过时的结果(如果任务由于此原因不必要地让出,它将被重新调度)。
  • JoinForTesting(): 阻止新任务开始运行,并等待当前正在运行的任务完成执行。在此方法返回后,保证没有线程会代表此ThreadGroup执行工作。在调用此方法后发布任务是无效的。可以在调用JoinForTesting()之前调用TaskTracker::Flush()来完成现有任务,否则在JoinForTesting()期间可能会发布任务。这个方法只能调用一次。
  • GetMaxConcurrentNonBlockedTasksDeprecated() const: 返回在该ThreadGroup中可以同时运行的最大非阻塞任务数。这个方法应该被移除。
  • ReportHeartbeatMetrics() const: 根据具体实现报告相关的指标。
  • DidUpdateCanRunPolicy(): 根据新的CanRunPolicy策略唤醒工作线程。在TaskTracker更新CanRunPolicy后必须调用此方法。

附上源码:

代码语言:C++
复制
// Removes |task_source| from |priority_queue_|. Returns a
// RegisteredTaskSource that evaluats to true if successful, or false if
// |task_source| is not currently in |priority_queue_|, such as when a worker
// is running a task from it.
RegisteredTaskSource RemoveTaskSource(scoped_refptr<TaskSource> task_source);

// Updates the position of the TaskSource in |transaction_with_task_source| in
// this ThreadGroup's PriorityQueue based on the TaskSource's current traits.
//
// Implementations should instantiate a concrete ScopedWorkersExecutor and
// invoke UpdateSortKeyImpl().
virtual void UpdateSortKey(TransactionWithOwnedTaskSource transaction_with_task_source) = 0;

// Pushes the TaskSource in |transaction_with_task_source| into this
// ThreadGroup's PriorityQueue and wakes up workers as appropriate.
//
// Implementations should instantiate a concrete ScopedWorkersExecutor and
// invoke PushTaskSourceAndWakeUpWorkersImpl().
virtual void PushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source) = 0;

// Removes all task sources from this ThreadGroup's PriorityQueue and enqueues
// them in another |destination_thread_group|. After this method is called,
// any task sources posted to this ThreadGroup will be forwarded to
// |destination_thread_group|.
//
// TODO(crbug.com/756547): Remove this method once the UseNativeThreadPool
// experiment is complete.
void InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup* destination_thread_group);

// Returns true if a task with |priority| running in this thread group should
// return ASAP, either because this priority is not allowed to run or because
// work of higher priority is pending. Thread-safe but may return an outdated
// result (if a task unnecessarily yields due to this, it will simply be
// re-scheduled).
bool ShouldYield(TaskPriority priority) const;

// Prevents new tasks from starting to run and waits for currently running
// tasks to complete their execution. It is guaranteed that no thread will do
// work on behalf of this ThreadGroup after this returns. It is
// invalid to post a task once this is called. TaskTracker::Flush() can be
// called before this to complete existing tasks, which might otherwise post a
// task during JoinForTesting(). This can only be called once.
virtual void JoinForTesting() = 0;

// Returns the maximum number of non-blocked tasks that can run concurrently
// in this ThreadGroup.
//
// TODO(fdoray): Remove this method. https://crbug.com/687264
virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0;

// Reports relevant metrics per implementation.
virtual void ReportHeartbeatMetrics() const = 0;

// Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
// called after an update to CanRunPolicy in TaskTracker.
virtual void DidUpdateCanRunPolicy() = 0;

ThreadGroup::BaseScopedWorkersExecutor

这是一个基类BaseScopedWorkersExecutor的定义,派生类必须从这个基类派生出一个ScopedWorkersExecutor类来执行在作用域结束时对工作线程的操作,当所有锁都已释放。

BaseScopedWorkersExecutor类具有以下成员函数和成员变量:

  • void ScheduleReleaseTaskSource(RegisteredTaskSource task_source): 将一个已注册的任务源(RegisteredTaskSource)添加到待释放的任务源列表中。这个函数用于在作用域结束时调度释放任务源的操作。
  • BaseScopedWorkersExecutor(): 构造函数,用于初始化BaseScopedWorkersExecutor对象。
  • ~BaseScopedWorkersExecutor(): 析构函数,用于释放BaseScopedWorkersExecutor对象。
  • std::vector<RegisteredTaskSource> tasksources_to_release: 存储待释放的任务源的列表。
  • DISALLOW_COPY_AND_ASSIGN(BaseScopedWorkersExecutor): 宏用于禁止拷贝和赋值操作,确保BaseScopedWorkersExecutor对象不会被拷贝或赋值。

派生类应该从BaseScopedWorkersExecutor派生出一个具体的ScopedWorkersExecutor类,并实现特定的操作,以在作用域结束时对工作线程执行相应的操作。这个基类提供了一个框架和基础设施,用于管理待释放的任务源,并在适当的时候执行相应的操作。

代码语言:C++
复制
class BaseScopedWorkersExecutor {
?public:
? void ScheduleReleaseTaskSource(RegisteredTaskSource task_source);

?protected:
? BaseScopedWorkersExecutor();
? ~BaseScopedWorkersExecutor();

?private:
? std::vector<RegisteredTaskSource> task_sources_to_release_;

? DISALLOW_COPY_AND_ASSIGN(BaseScopedWorkersExecutor);
};

ThreadGroup::ScopedReenqueueExecutor

这是一个ScopedReenqueueExecutor类的定义,它允许在作用域结束时,当所有锁都已释放时,将任务源推送到ThreadGroup的优先级队列中。

ScopedReenqueueExecutor类具有以下成员函数和成员变量:

  • ScopedReenqueueExecutor(): 构造函数,用于初始化ScopedReenqueueExecutor对象。
  • ~ScopedReenqueueExecutor(): 析构函数,用于释放ScopedReenqueueExecutor对象。
  • void SchedulePushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source, ThreadGroup destination_thread_group): 在作用域结束时调度将任务源推送到ThreadGroup的优先级队列中,并唤醒相应的工作线程。这个函数接受一个 TransactionWithRegisteredTaskSource对象和一个目标ThreadGroup作为参数,用于指定要推送的任务源和目标线程组。
  • Optional<TransactionWithRegisteredTaskSource> transactionwith_task_source: 存储要推送的任务源的TransactionWithRegisteredTaskSource对象。这个对象是可选的,因为在作用域结束时可能没有要推送的任务源。
  • ThreadGroup* destinationthread_group: 指向目标ThreadGroup的指针,用于指定要推送任务源的目标线程组。
代码语言:C++
复制
class ScopedReenqueueExecutor {
?public:
? ScopedReenqueueExecutor();
? ~ScopedReenqueueExecutor();

? // A TransactionWithRegisteredTaskSource and the ThreadGroup in which it
? // should be enqueued.
? void SchedulePushTaskSourceAndWakeUpWorkers(
????? TransactionWithRegisteredTaskSource transaction_with_task_source,
????? ThreadGroup* destination_thread_group);

?private:
? // A TransactionWithRegisteredTaskSource and the thread group in which it
? // should be enqueued.
? Optional<TransactionWithRegisteredTaskSource> transaction_with_task_source_;
? ThreadGroup* destination_thread_group_ = nullptr;

? DISALLOW_COPY_AND_ASSIGN(ScopedReenqueueExecutor);
};

源码解析

了解了以上ThreadGroup相关类后,再来了解一下ThreadGroup本身的源码实现

构造
代码语言:C++
复制
ThreadGroup(TrackedRef<TaskTracker> task_tracker,
? ? ? ? ? ? TrackedRef<Delegate> delegate,
? ? ? ? ? ? ThreadGroup* predecessor_thread_group = nullptr);

const TrackedRef<TaskTracker> task_tracker_;
const TrackedRef<Delegate> delegate_;

这是ThreadGroup类的构造函数的定义。它接受一个TaskTracker对象的TrackedRef,一个Delegate对象的TrackedRef,以及一个可选的predecessor_thread_group参数。

构造函数的参数含义如下:

  • TrackedRef<TaskTracker> task_tracker: 一个TaskTracker对象的TrackedRef,用于跟踪和管理与ThreadGroup相关的任务。
  • TrackedRef<Delegate> delegate: 一个Delegate对象的TrackedRef,用于处理ThreadGroup的委托操作和事件。
  • ThreadGroup* predecessor_thread_group = nullptr: 一个可选的ThreadGroup指针,表示一个可以在构建的ThreadGroup的锁之前获取的前任ThreadGroup。这是为了将所有任务源从predecessor_thread_group移动到构建的ThreadGroup,并支持UseNativeThreadPool实验。

这个构造函数用于创建一个ThreadGroup对象,并初始化相关的成员变量和状态。通过传递TaskTracker和Delegate对象,可以将ThreadGroup与任务跟踪器和委托操作关联起来。predecessor_thread_group参数用于在构建的ThreadGroup的锁之前获取前任ThreadGroup的锁,以便在任务源转移过程中保持同步。

需要注意的是,predecessor_thread_group参数是一个可选参数,并且在UseNativeThreadPool实验完成后可能会被移除。

代码语言:C++
复制
ThreadGroup* replacement_thread_group_ = nullptr;

这是ThreadGroup类的一个成员变量replacementthread_group的定义。如果replacementthread_group非空,则表示当前ThreadGroup无效,所有的任务源应该被调度到replacementthread_group上。这个成员变量用于支持UseNativeThreadPool实验。

当replacementthread_group非空时,表示当前ThreadGroup已经被替换,不再有效。所有的任务源应该被重新调度到replacementthread_group上,以确保任务的正确执行。这个机制可以用于实现线程池的替换和迁移,以支持系统的动态调整和优化。

其他
代码语言:C++
复制
size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);

size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const
EXCLUSIVE_LOCKS_REQUIRED(lock_);

virtual void EnsureEnoughWorkersLockRequired(
??? BaseScopedWorkersExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;

void ReEnqueueTaskSourceLockRequired(
??? BaseScopedWorkersExecutor* workers_executor,
??? ScopedReenqueueExecutor* reenqueue_executor,
??? TransactionWithRegisteredTaskSource transaction_with_task_source)
??? EXCLUSIVE_LOCKS_REQUIRED(lock_);

RunIntentWithRegisteredTaskSource TakeRunIntentWithRegisteredTaskSource(
??? BaseScopedWorkersExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);

// Must be invoked by implementations of the corresponding non-Impl() methods.
void UpdateSortKeyImpl(
??? BaseScopedWorkersExecutor* executor,
??? TransactionWithOwnedTaskSource transaction_with_task_source);
void PushTaskSourceAndWakeUpWorkersImpl(
??? BaseScopedWorkersExecutor* executor,
??? TransactionWithRegisteredTaskSource transaction_with_task_source);

mutable CheckedLock lock_;

PriorityQueue priority_queue_ GUARDED_BY(lock_);

std::atomic<TaskPriority> min_allowed_priority_ GUARDED_BY(lock_) {
??? TaskPriority::BEST_EFFORT
};

这段代码展示了ThreadGroup类中一些与线程调度和任务源管理相关的函数和成员变量。

  • GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired(): 返回根据当前的* CanRunPolicy允许运行的BEST_EFFORT任务源所需的额外工作线程数量。这个函数在获取锁的情况下被调用。
  • GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired(): 返回根据当前的* CanRunPolicy允许运行的USER_VISIBLE/USER_BLOCKING任务源所需的额外工作线程数量。这个函数在获取锁的情况下被调用。
  • EnsureEnoughWorkersLockRequired(): 确保有足够的工作线程来运行排队的任务源。这个函数在获取锁的情况下被调用,并且需要一个BaseScopedWorkersExecutor对象作为参数。
  • ReEnqueueTaskSourceLockRequired(): 将当前ThreadGroup中刚刚运行的任务重新排队到适当的ThreadGroup中。这个函数在获取锁的情况下被调用,并且需要一个BaseScopedWorkersExecutor对象和一个ScopedReenqueueExecutor对象作为参数。
  • TakeRunIntentWithRegisteredTaskSource(): 如果允许运行,从priorityqueue中获取下一个任务源,并将其从priorityqueue中弹出。如果任务源不再需要排队(达到了最大并发数),则返回nullptr。这个函数在获取锁的情况下被调用,并且需要一个BaseScopedWorkersExecutor对象作为参数。
  • UpdateSortKeyImpl(): 由对应的非Impl()方法的实现调用。用于更新任务源的排序键。
  • PushTaskSourceAndWakeUpWorkersImpl(): 由对应的非Impl()方法的实现调用。用于将任务源推送到priorityqueue中,并唤醒工作线程。
  • mutable CheckedLock lock_: 用于同步访问ThreadGroup类的所有非const、非atomic和非不可变成员的锁。
  • PriorityQueue priorityqueue: 用于存储所有线程的任务源的优先级队列。
  • std::atomic<TaskPriority> minallowed_priority: 允许运行的最低优先级。这个成员变量是原子的,并且在获取锁的情况下更新。它的读取不需要获取锁,因为它是原子的。

ThreadGroupImpl

ThreadGroup类是一组运行任务的工作线程。在调用Start()之前,线程组不会创建线程。任务可以在任何时候发布,但直到调用Start()之后才会运行。

这个类是线程安全的,这意味着它可以在多个线程中同时使用而不会导致竞争条件或数据损坏。多个线程可以同时调用ThreadGroup的成员函数,而不需要额外的同步机制。这是通过在关键的代码段上使用锁来实现的,以确保对共享数据的访问是互斥的。

线程安全性对于多线程环境中的任务调度和执行非常重要。ThreadGroup类的线程安全性确保了任务的正确执行和数据的一致性,同时避免了竞争条件和数据损坏的问题。这使得开发人员可以在多线程环境中使用ThreadGroup类,而不必担心并发问题。

源码解析

构造
代码语言:C++
复制
ThreadGroupImpl(StringPiece histogram_label,
??????????????? StringPiece thread_group_label,
??????????????? ThreadPriority priority_hint,
??????????????? TrackedRef<TaskTracker> task_tracker,
??????????????? TrackedRef<Delegate> delegate);

这是ThreadGroupImpl类的构造函数的定义。它接受以下参数:

  • StringPiece histogram_label: 用于标记线程组的标签。直方图的名称将是 "ThreadPool." + histogram_name + "." + histogram_label + 额外的后缀。这个参数不能为空。
  • StringPiece thread_group_label: 用于标记线程组的线程的标签。这个参数不能为空。
  • ThreadPriority priority_hint: 优先级的提示值,表示线程的首选优先级。实际的线程优先级取决于关闭状态和平台能力。
  • TrackedRef<TaskTracker> task_tracker: 一个TaskTracker对象的TrackedRef,用于跟踪和管理与ThreadGroup相关的任务。
  • TrackedRef<Delegate> delegate: 一个Delegate对象的TrackedRef,用于处理ThreadGroup的委托操作和事件。

这个构造函数用于创建一个ThreadGroupImpl对象,并初始化相关的成员变量和状态。通过传递histogram_label、thread_group_label、priority_hint、task_tracker和delegate,可以将ThreadGroupImpl与直方图、线程标签、优先级提示、任务跟踪器和委托操作关联起来。这些参数提供了构建和管理ThreadGroupImpl对象所需的基本信息和功能。

启动
代码语言:C++
复制
void Start(int max_tasks,
? ? ? ? ?  int max_best_effort_tasks,
? ? ? ? ?  TimeDelta suggested_reclaim_time,
? ? ? ? ?  scoped_refptr<TaskRunner> service_thread_task_runner,
? ? ? ? ?  WorkerThreadObserver* worker_thread_observer,
? ? ? ? ?  WorkerEnvironment worker_environment,
? ? ? ? ?  Optional<TimeDelta> may_block_threshold = Optional<TimeDelta>());

这是ThreadGroupImpl类的Start函数的定义。它接受以下参数:

  • int max_tasks: 允许同时运行的最大任务数。
  • int max_best_effort_tasks: 允许同时运行的最大BEST_EFFORT优先级任务数。
  • TimeDelta suggested_reclaim_time: 在空闲线程回收之前建议的时间间隔。
  • scoped_refptr<TaskRunner> service_thread_task_runner: 用于监视阻塞任务的TaskRunner。
  • WorkerThreadObserver* worker_thread_observer: 当工作线程进入和退出其主函数时,通知的WorkerThreadObserver。
  • WorkerEnvironment worker_environment: 指定任务执行的环境。
  • Optional<TimeDelta> may_block_threshold: MAY_BLOCK ScopedBlockingCall中的任务被视为阻塞的超时时间。如果未指定,则线程组将选择适当的值。

这个函数用于创建线程,并允许现有和未来的任务运行。线程组最多同时运行max_tasks / max_best_effort_tasks个任意/BEST_EFFORT优先级的未阻塞任务。在suggested_reclaim_time之后,它会回收未使用的线程。它使用service_thread_task_runner来监视阻塞任务。如果指定了worker_thread_observer,它会在工作线程进入和退出其主函数时通知观察者。worker_environment参数指定任务执行的环境。may_block_threshold参数是一个可选的超时时间,用于指定MAY_BLOCK ScopedBlockingCall中的任务被视为阻塞。这个函数只能调用一次。

函数内部会根据传入的参数进行一系列的初始化和配置操作,包括:

  • 设置最大任务数和最大BEST_EFFORT任务数。
  • 设置建议的回收时间间隔。
  • 设置用于监视阻塞任务的TaskRunner。
  • 设置用于通知工作线程进入和退出的WorkerThreadObserver。
  • 设置任务执行的环境。
  • 设置MAY_BLOCK ScopedBlockingCall中任务被视为阻塞的超时时间。
  • 创建并启动工作线程。

通过这些配置和初始化操作,ThreadGroupImpl开始运行,并允许任务在工作线程中执行。工作线程的数量和行为受到max_tasks、max_best_effort_tasks和其他配置参数的限制和控制。

需要注意的是,这个函数只能被调用一次。如果调用多次,会触发CHECK失败。这是为了确保ThreadGroupImpl只能被启动一次,避免重复创建和启动工作线程。

析构
代码语言:C++
复制
~ThreadGroupImpl() override;

这是ThreadGroupImpl类的析构函数的定义。它是一个虚析构函数,并且有一个override修饰符。

这段注释说明了ThreadGroupImpl对象的销毁规则:

  • 在生产环境中,不允许销毁通过Create()创建的ThreadGroupImpl对象。它会被永久泄漏,即不会被显式地销毁。这是因为ThreadGroupImpl对象通常在整个应用程序的生命周期内持续存在,并且由于线程的运行和任务的执行,无法安全地销毁。
  • 在测试中,ThreadGroupImpl对象只能在JoinForTesting()返回后才能销毁。JoinForTesting()是一个用于测试的函数,用于等待所有任务完成并停止工作线程。在调用JoinForTesting()之前销毁ThreadGroupImpl对象可能导致未定义的行为或资源泄漏。 这个注释提醒开发人员在使用ThreadGroupImpl对象时要注意销毁的时机和方式,以避免潜在的问题和资源泄漏。
Public函数
代码语言:C++
复制
 void WaitForWorkersIdleForTesting(size_t n);

? // Waits until at least |n| workers are idle.
? void WaitForWorkersIdleLockRequiredForTesting(size_t n)
????? EXCLUSIVE_LOCKS_REQUIRED(lock_);

? // Waits until all workers are idle.
? void WaitForAllWorkersIdleForTesting();

? // Waits until |n| workers have cleaned up (since the last call to
? // WaitForWorkersCleanedUpForTesting() or Start() if it wasn't called yet).
? void WaitForWorkersCleanedUpForTesting(size_t n);

? // Returns the number of workers in this thread group.
? size_t NumberOfWorkersForTesting() const;

? // Returns |max_tasks_|.
? size_t GetMaxTasksForTesting() const;

? // Returns the number of workers that are idle (i.e. not running tasks).
? size_t NumberOfIdleWorkersForTesting() const;

这些函数是ThreadGroupImpl类中用于测试的函数,用于等待和获取关于工作线程状态的信息。

  • WaitForWorkersIdleForTesting(sizet n): 等待至少n个工作线程处于空闲状态。在这个调用期间,工作线程被禁止进行清理操作。注意,在使用自定义的suggested_reclaim_time的测试中,需要在解除对工作线程的等待后迅速调用此函数,因为如果在调用此函数时工作线程已经被分离,它将永远不会进入空闲状态,从而导致此调用挂起。
  • WaitForWorkersIdleLockRequiredForTesting(size_t n): 在获取锁的情况下,等待至少n个工作线程处于空闲状态。
  • WaitForAllWorkersIdleForTesting(): 等待所有工作线程处于空闲状态。
  • WaitForWorkersCleanedUpForTesting(size_t n): 等待n个工作线程完成清理操作(自上次调用* WaitForWorkersCleanedUpForTesting()或Start()以来的清理操作)。
  • NumberOfWorkersForTesting(): 返回线程组中的工作线程数量。
  • GetMaxTasksForTesting(): 返回线程组的最大任务数。
  • NumberOfIdleWorkersForTesting(): 返回当前处于空闲状态(即没有运行任务)的工作线程数量。

这些函数提供了一些用于测试目的的功能,可以用于等待工作线程的特定状态或获取线程组的相关信息。这对于编写测试用例和验证线程组的行为非常有用。例如,可以使用WaitForWorkersIdleForTesting()等待工作线程空闲,然后执行某些操作来验证任务的执行情况。或者使用WaitForWorkersCleanedUpForTesting()等待工作线程完成清理操作,以确保资源的正确释放。

ThreadGroupImpl::InitializedInStart
代码语言:C++
复制
 // Values set at Start() and never modified afterwards.
? struct InitializedInStart {
? ? InitializedInStart();
? ? ~InitializedInStart();

#if DCHECK_IS_ON()
? ? // Set after all members of this struct are set.
? ? bool initialized = false;
#endif

? ? // Initial value of |max_tasks_|.
? ? size_t initial_max_tasks = 0;

? ? // Suggested reclaim time for workers.
? ? TimeDelta suggested_reclaim_time;

? ? // Environment to be initialized per worker.
? ? WorkerEnvironment worker_environment = WorkerEnvironment::NONE;

? ? scoped_refptr<TaskRunner> service_thread_task_runner;

? ? // Optional observer notified when a worker enters and exits its main.
? ? WorkerThreadObserver* worker_thread_observer = nullptr;

? ? bool may_block_without_delay;

? ? // Threshold after which the max tasks is increased to compensate for a
? ? // worker that is within a MAY_BLOCK ScopedBlockingCall.
? ? TimeDelta may_block_threshold;

? ? // The period between calls to AdjustMaxTasks() when the thread group is at
? ? // capacity.
? ? TimeDelta blocked_workers_poll_period;
? } initialized_in_start_;

这是ThreadGroupImpl类中的一个嵌套结构体InitializedInStart的定义。这个结构体包含了在Start()函数中设置并在之后不再修改的一些值。

结构体成员包括:

  • bool initialized: 一个布尔值,用于在结构体的所有成员设置完成后进行标记。在DCHECK开启的情况下,用于确保结构体的成员都已经正确初始化。
  • size_t initial_max_tasks: 初始的最大任务数。
  • TimeDelta suggested_reclaim_time: 建议的回收时间间隔,用于工作线程的回收。
  • WorkerEnvironment worker_environment: 每个工作线程要初始化的环境。
  • scoped_refptr<TaskRunner> service_thread_task_runner: 用于监视阻塞任务的TaskRunner。
  • WorkerThreadObserver* worker_thread_observer: 可选的观察者,用于在工作线程进入和退出主函数时进行通知。
  • bool may_block_without_delay: 一个布尔值,表示是否允许没有延迟的MAY_BLOCK ScopedBlockingCall。
  • TimeDelta may_block_threshold: 在MAY_BLOCK ScopedBlockingCall中,超过此阈值后,最大任务数会增加以补偿阻塞的工作线程。
  • TimeDelta blocked_workers_poll_period: 当线程组达到容量时,调用AdjustMaxTasks()的时间间隔。

这些成员变量在Start()函数中被设置,并在之后的运行中保持不变。它们存储了一些线程组的初始配置和参数,用于控制线程组的行为和性能。

配套函数如下:

代码语言:C++
复制
InitializedInStart& in_start() {
? #if DCHECK_IS_ON()
? ? DCHECK(!initialized_in_start_.initialized);
? #endif
? ? return initialized_in_start_;
}
const InitializedInStart& after_start() const {
? #if DCHECK_IS_ON()
? ? DCHECK(initialized_in_start_.initialized);
? #endif
? ? return initialized_in_start_;
}
成员变量
代码语言:C++
复制
? const std::string thread_group_label_;
? const ThreadPriority priority_hint_;

? // All workers owned by this thread group.
? std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_);

? // Maximum number of tasks of any priority / BEST_EFFORT priority that can run
? // concurrently in this thread group.
? size_t max_tasks_ GUARDED_BY(lock_) = 0;
? size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;

? // Number of tasks of any priority / BEST_EFFORT priority that are currently
? // running in this thread group.
? size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
? size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;

? // Number of workers running a task of any priority / BEST_EFFORT priority
? // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
? // caused a max tasks increase yet.
? int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
? int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;

? // Stack of idle workers. Initially, all workers are on this stack. A worker
? // is removed from the stack before its WakeUp() function is called and when
? // it receives work from GetWork() (a worker calls GetWork() when its sleep
? // timeout expires, even if its WakeUp() method hasn't been called). A worker
? // is pushed on this stack when it receives nullptr from GetWork().
? WorkerThreadStack idle_workers_stack_ GUARDED_BY(lock_);

? // Signaled when a worker is added to the idle workers stack.
? std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_
????? GUARDED_BY(lock_);

? // Stack that contains the timestamps of when workers get cleaned up.
? // Timestamps get popped off the stack as new workers are added.
? base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_
????? GUARDED_BY(lock_);

? // Whether an AdjustMaxTasks() task was posted to the service thread.
? bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;

? // Indicates to the delegates that workers are not permitted to cleanup.
? bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;

? // Counts the number of workers cleaned up since the last call to
? // WaitForWorkersCleanedUpForTesting() (or Start() if it wasn't called yet).
? // |some_workers_cleaned_up_for_testing_| is true if this was ever
? // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
? // specific number of workers being cleaned up via
? // WaitForWorkersCleanedUpForTesting().
? size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
#if DCHECK_IS_ON()
? bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
#endif

? // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
? // incremented.
? std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
????? GUARDED_BY(lock_);

#if DCHECK_IS_ON()
? // Set at the start of JoinForTesting().
? AtomicFlag join_for_testing_started_;
#endif

这些成员变量是ThreadGroupImpl类中的一些状态变量和数据结构,用于管理和跟踪线程组的状态和工作线程的情况。

  • const std::string threadgroup_label: 线程组的标签,用于标识线程组。
  • const ThreadPriority priorityhint: 线程组的优先级提示。
  • std::vector<scopedrefptr<WorkerThread>> workers: 线程组拥有的所有工作线程的容器。
  • sizet max_tasks和sizet max_best_effort_tasks: 线程组中可以同时运行的任意优先级和BEST_EFFORT优先级任务的最大数量。
  • sizet num_running_tasks和sizet num_running_best_effort_tasks: 当前正在运行的任意优先级和BEST_EFFORT优先级任务的数量。
  • int numunresolved_may_block和int numunresolved_best_effort_may_block: 在MAY_BLOCK ScopedBlockingCall范围内运行的任意优先级和BEST_EFFORT优先级任务的工作线程数量,但尚未导致最大任务数的增加。
  • WorkerThreadStack idleworkers_stack: 空闲工作线程的栈。初始情况下,所有工作线程都在这个栈上。当工作线程从GetWork()函数中接收到nullptr时,它会被推入这个栈上。
  • std::uniqueptr<ConditionVariable> idle_workers_stack_cv_for_testing: 用于通知工作线程被添加到空闲工作线程栈的条件变量。
代码语言:C++
复制
// ThreadPool.DetachDuration.[thread group name] histogram. Intentionally
// leaked.
HistogramBase* const detach_duration_histogram_;

// ThreadPool.NumTasksBeforeDetach.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_tasks_before_detach_histogram_;

// ThreadPool.NumTasksBetweenWaits.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_tasks_between_waits_histogram_;

// ThreadPool.NumWorkers.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_workers_histogram_;

// ThreadPool.NumActiveWorkers.[thread group name] histogram.
// Intentionally leaked.
HistogramBase* const num_active_workers_histogram_;

// Ensures recently cleaned up workers (ref.
// WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as
// they have a raw reference to |this| (and to TaskTracker) which can
// otherwise result in racy use-after-frees per no longer being part of
// |workers_| and hence not being explicitly joined in JoinForTesting():
// https://crbug.com/810464. Uses AtomicRefCount to make its only public
// method thread-safe.
TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;

这些成员变量是ThreadGroupImpl类中的一些标识和引用计数相关的变量。

  • HistogramBase* const detachduration_histogram: 用于记录线程组中工作线程的分离持续时间的标识。这个名称是有意泄漏的,即在线程组销毁时不会被显式地销毁。
  • HistogramBase* const numtasks_before_detach_histogram: 用于记录线程组中工作线程在分离之前执行的任务数量的标识。同样地,这个标识也是有意泄漏的。
  • HistogramBase* const numtasks_between_waits_histogram: 用于记录线程组中工作线程在等待之间执行的任务数量的标识。同样地,这个标识也是有意泄漏的。
  • HistogramBase* const numworkers_histogram: 用于记录线程组中工作线程数量的标识。同样地,这个标识也是有意泄漏的。
  • HistogramBase* const numactive_workers_histogram: 用于记录线程组中活动工作线程数量的标识。同样地,这个标识也是有意泄漏的。
  • TrackedRefFactory<ThreadGroupImpl> trackedref_factory: 用于确保最近清理的工作线程有足够的时间退出。这是为了避免在JoinForTesting()中使用已经不再属于workers的工作线程时发生竞争条件和使用-after-free的情况。tracked_ref_factory使用AtomicRefCount来确保线程安全。

谢谢各位看到这里,如果有感兴趣的模块或者代码需要攻略,也可以留言,会不定时更新。喜欢可以去github点点赞,再次感谢?

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码
  • 前言
  • ThreadGroup
    • ThreadGroup::Delegate
      • ThreadGroup::WorkerEnvironment
        • 线程绑定
          • Public接口
            • ThreadGroup::BaseScopedWorkersExecutor
              • ThreadGroup::ScopedReenqueueExecutor
                • 源码解析
                  • 构造
                  • 其他
              • ThreadGroupImpl
                • 源码解析
                  • 构造
                  • 启动
                  • 析构
                  • Public函数
                  • ThreadGroupImpl::InitializedInStart
                  • 成员变量
              相关产品与服务
              负载均衡
              负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
              http://www.vxiaotou.com