先附上可用于学习的开源代码:Base库
喜欢可以帮忙Star一下
编译:参考Base库即可
环境:Visual Studio 2022 - 17.8.3 + v143 + 10.0.22621.0 + C++17
首先需要说明的是,既然有了base::Thread,为什么还要有base::SimpleThread?
官方解释:你应该使用 Thread(thread.h)替代。Thread 是 Chrome 基于消息循环的线程抽象,如果你是在浏览器中运行的线程,那么很可能有假设你的线程将具有关联的消息循环。
这是一个简单的线程接口,它与本地操作系统线程相关联。只有在不需要关联消息循环的线程时才应使用该接口,最好的例子是单元测试。
使用最简单的接口是 DelegateSimpleThread,它将创建一个新线程,并在该新线程中执行 Delegate 的虚拟 Run() 方法,直到完成并退出线程。例如:
//?在这里面实现自定义的线程运行逻辑
class?MyThreadRunner?:?public?DelegateSimpleThread::Delegate?{?...?};
MyThreadRunner?runner;
DelegateSimpleThread?thread(&runner,?"good\_name\_here");
//?Start?方法将在成功启动和初始化线程后返回。
//?新创建的线程将调用?runner->Run(),并一直运行直到返回。
thread.Start();
//?thread.Join()?方法将等待线程退出。必须?调用?Join!
thread.Join();
//?SimpleThread?对象仍然有效,但是你不能再次调用?Join?或?Start?方法。
可以理解为base::Thread的一个简化版、轻量版
SimpleThread和base::Thread相同,也具有线程选项、线程管控基本功能,不同的是
SimpleThread新增了两个模块:DelegateSimpleThread和DelegateSimpleThreadPool
由于SimpleThread不包含线程循环,所以必须要定义其线程做的事情,也就是源码所给出的DelegateSimpleThread这个类,向我们展示了如何正确使用SimpleThread来操作线程,如果没有扩展的需要,可以直接使用DelegateSimpleThread以及包含的DelegateSimpleThread::Delegate来分别定义线程以及对应的逻辑,Delegate实际就是线程将自己的执行逻辑委托出去,抽象了一下。
SimpleThread的Option简单很多,仅提供了优先级、堆栈大小、是否可join这些设置
struct BASE_EXPORT Options {
? public:
? Options() = default;
? explicit Options(ThreadPriority priority_in) : priority(priority_in) {}
? ~Options() = default;
? // Allow copies.
? Options(const Options& other) = default;
? Options& operator=(const Options& other) = default;
? // A custom stack size, or 0 for the system default.
? size_t stack_size = 0;
? ThreadPriority priority = ThreadPriority::NORMAL;
? // If false, the underlying thread's PlatformThreadHandle will not be kept
? // around and as such the SimpleThread instance will not be Join()able and
? // must not be deleted before Run() is invoked. After that, it's up to
? // the subclass to determine when it is safe to delete itself.
? bool joinable = true;
};
其中优先级定义如下:
// Valid values for priority of Thread::Options and SimpleThread::Options, and
// SetCurrentThreadPriority(), listed in increasing order of importance.
enum class ThreadPriority : int {
??// 适用于不应中断高优先级工作的线程。 ??
? BACKGROUND, ???
? // 默认优先级级别。 ???
? NORMAL, ???
? // 适用于为显示生成数据的线程(大约 60Hz)。
? DISPLAY, ???
? // 适用于低延迟、抗干扰的音频。
? REALTIME_AUDIO,
};
接下来对SimpleThread源码进行详细解读
对外接口提供
// 创建一个 SimpleThread。|options| 应该用于管理涉及线程创建和管理的特定配置。
// 每个线程都有一个名称,它是一个显示字符串,用于标识线程。
// 直到调用 Start() 方法之前,线程不会被创建。
explicit SimpleThread(const std::string& name);
SimpleThread(const std::string& name, const Options& options);
~SimpleThread() override;
// 启动线程,并在线程启动和初始化后(即调用 ThreadMain() 后)才返回。
void Start();
// 加入线程。如果使用 StartAsync() 启动线程,则首先等待线程干净地启动,然后加入线程。
void Join();
// 启动线程,但立即返回,而不等待线程首先进行初始化(即不等待 ThreadMain() 被运行)。
void StartAsync();
// 子类应该重写 Run 方法。
virtual void Run() = 0;
// 返回线程 ID,在线程启动后才有效。如果使用 Start() 启动线程,则在调用 Start() 后它将有效。
// 如果使用 StartAsync() 启动线程,则在 HasBeenStarted() 返回 True 之前不能调用此方法。
PlatformThreadId tid();
// 如果线程已启动并初始化(即 ThreadMain() 已运行),则返回 True。
// 如果使用 StartAsync() 启动线程,但尚未初始化(即 ThreadMain() 尚未运行),则返回 False。
bool HasBeenStarted();
// 如果曾经调用过 Join(),则返回 True。
bool HasBeenJoined() const { return joined_; }
// 如果调用过 Start() 或 StartAsync(),则返回 true。
bool HasStartBeenAttempted() { return start_called_; }
// 从 PlatformThread::Delegate 中重写:
void ThreadMain() override;
const std::string name_;
const Options options_;
PlatformThreadHandle thread_;? // PlatformThread handle, reset after Join.
WaitableEvent event_;????????? // Signaled if Start() was ever called.
PlatformThreadId tid_ = kInvalidThreadId;? // The backing thread's id.
bool joined_ = false;????????????????????? // True if Join has been called.
// Set to true when the platform-thread creation has started.
bool start_called_ = false;
同步启动会进行等待启动线程成功后才会返回,异步则会直接返回,线程是否启动成功由调用方决策
void SimpleThread::Start() {
? StartAsync();
? ScopedAllowBaseSyncPrimitives allow_wait;
? event_.Wait();? // Wait for the thread to complete initialization.
}
void SimpleThread::StartAsync() {
? DCHECK(!HasStartBeenAttempted()) << "Tried to Start a thread multiple times.";
? start_called_ = true;
? BeforeStart();
? bool success =
? ? ? options_.joinable
? ? ? ? ? ? PlatformThread::CreateWithPriority(options_.stack_size, this,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? &thread_, options_.priority)
? ? ? ? ? : PlatformThread::CreateNonJoinableWithPriority(
? ? ? ? ? ? ? ? options_.stack_size, this, options_.priority);
? CHECK(success);
}
线程实际执行的主要函数,可以看到委托出去的Delegate定义的Run会最后执行,前面加上了BeforeRun的hook以及Start函数同步等待事件的触发
void SimpleThread::ThreadMain() {
? tid_ = PlatformThread::CurrentId();
? PlatformThread::SetName(name_);
? // We've initialized our new thread, signal that we're done to Start().
? event_.Signal();
? BeforeRun();
? Run();
}
join函数也加入了BeforeJoin的hook,方便继承子类自定义的需求使用
void SimpleThread::Join() {
? DCHECK(options_.joinable) << "A non-joinable thread can't be joined.";
? DCHECK(HasStartBeenAttempted()) << "Tried to Join a never-started thread.";
? DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
? BeforeJoin();
? PlatformThread::Join(thread_);
? thread_ = PlatformThreadHandle();
? joined_ = true;
}
SimpleThread提供了Run这个纯虚函数,这也是为什么不能直接使用SimpleThread的原因,需要子类继承后定义Run函数再使用
? // Subclasses should override the Run method.
? virtual void Run() = 0;
DelegateSimpleThread 是一个简单的线程,它将 Run() 委托给其 Delegate。非可加入(non-joinable)的 DelegateSimpleThread 在调用了 Run() 后可以安全地删除,从这个类的角度来看,它们的 Delegate 也可以在此时安全地删除(尽管实现必须确保在删除后,Run() 不会再使用 Delegate 的成员状态)。
委托Run函数给DelegateSimpleThread::Delegate
class BASE_EXPORT DelegateSimpleThread : public SimpleThread {
?public:
? class BASE_EXPORT Delegate {
?? public:
??? virtual ~Delegate() = default;
??? virtual void Run() = 0;
? };
? DelegateSimpleThread(Delegate* delegate,
?????????????????????? const std::string& name_prefix);
? DelegateSimpleThread(Delegate* delegate,
?????????????????????? const std::string& name_prefix,
?????????????????????? const Options& options);
? ~DelegateSimpleThread() override;
? void Run() override;
?private:
? Delegate* delegate_;
? DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThread);
};
可以看到delegate是一次性使用,使用完后会置为空指针
void DelegateSimpleThread::Run() {
? DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
? // Non-joinable DelegateSimpleThreads are allowed to be deleted during Run().
? // Member state must not be accessed after invoking Run().
? Delegate* delegate = delegate_;
? delegate_ = nullptr;
? delegate->Run();
}
DelegateSimpleThreadPool 允许您启动固定数量的线程,然后将作业分派给这些线程。当有大量需要以多线程方式完成的小任务,但又不想为每个小任务启动一个线程时,这非常方便。
只需调用 AddWork() 将委托添加到待处理的作业列表中。JoinAll() 将确保处理所有未完成的作业,并等待所有任务完成。您可以重用线程池,因此在调用 JoinAll() 后可以再次调用 Start()。
这和线程池PostTask有啥区别呢? DelegateSimpleThreadPool是启动固定数目一系列DelegateSimpleThread线程,然后将一系列DelegateSimpleThread::Delegate线程的委托作业丢给这些线程进行处理。(统一执行的都是里面的Run函数)
而线程池则是一系列工作线程,执行的是各种各样自定义的函数逻辑。
Public继承自DelegateSimpleThread::Delegate,表明每个里面的线程都会执行同样的Run函数
class BASE_EXPORT DelegateSimpleThreadPool
??? : public DelegateSimpleThread::Delegate {
?public:
? typedef DelegateSimpleThread::Delegate Delegate;
? DelegateSimpleThreadPool(const std::string& name_prefix, int num_threads);
? ~DelegateSimpleThreadPool() override;
? // Start up all of the underlying threads, and start processing work if we
? // have any.
? void Start();
? // Make sure all outstanding work is finished, and wait for and destroy all
? // of the underlying threads in the pool.
? void JoinAll();
? // It is safe to AddWork() any time, before or after Start().
? // Delegate* should always be a valid pointer, NULL is reserved internally.
? void AddWork(Delegate* work, int repeat_count);
? void AddWork(Delegate* work) {
??? AddWork(work, 1);
? }
? // We implement the Delegate interface, for running our internal threads.
? void Run() override;
?private:
? const std::string name_prefix_;
? int num_threads_;
? std::vector<DelegateSimpleThread*> threads_;
? base::queue<Delegate*> delegates_;
? base::Lock lock_;??????????? // Locks delegates_
? WaitableEvent dry_;??? // Not signaled when there is no work to do.
? DISALLOW_COPY_AND_ASSIGN(DelegateSimpleThreadPool);
};
池子线程数目不可变更
DelegateSimpleThreadPool::DelegateSimpleThreadPool(
??? const std::string& name_prefix,
??? int num_threads)
??? : name_prefix_(name_prefix),
????? num_threads_(num_threads),
????? dry_(WaitableEvent::ResetPolicy::MANUAL,
?????????? WaitableEvent::InitialState::NOT_SIGNALED) {}
启动一系列DelegateSimpleThread,然后运行
void DelegateSimpleThreadPool::Start() {
? DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
? for (int i = 0; i < num_threads_; ++i) {
??? std::string name(name_prefix_);
??? name.push_back('/');
??? name.append(NumberToString(i));
??? DelegateSimpleThread* thread = new DelegateSimpleThread(this, name);
??? thread->Start();
??? threads_.push_back(thread);
? }
}
无休止的执行队列中的任务,直到队列传入空指针任务进来
注意这里我们初始化的是WaitableEvent::ResetPolicy::MANUAL,表明这个信号不会自动重置,每个线程都会不停的被激活去清空任务队列,直到某个线程拿到了空任务,才会调用Reset叫大家停下来
void DelegateSimpleThreadPool::Run() {
? Delegate* work = nullptr;
? while (true) {
??? dry_.Wait();
??? {
????? AutoLock locked(lock_);
????? if (!dry_.IsSignaled())
??????? continue;
????? DCHECK(!delegates_.empty());
????? work = delegates_.front();
????? delegates_.pop();
????? // Signal to any other threads that we're currently out of work.
????? if (delegates_.empty())
??????? dry_.Reset();
??? }
??? // A NULL delegate pointer signals us to quit.
??? if (!work)
????? break;
??? work->Run();
? }
}
给任务队列塞线程数目的空指针任务,然后等待每个线程的退出,最后反初始化所有线程
void DelegateSimpleThreadPool::JoinAll() {
? DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
? // Tell all our threads to quit their worker loop.
? AddWork(nullptr, num_threads_);
? // Join and destroy all the worker threads.
? for (int i = 0; i < num_threads_; ++i) {
? ? threads_[i]->Join();
? ? delete threads_[i];
? }
? threads_.clear();
? DCHECK(delegates_.empty());
}
新建一个任务,可以指定这个任务重复执行多少次
void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
? AutoLock locked(lock_);
? for (int i = 0; i < repeat_count; ++i)
??? delegates_.push(delegate);
? // If we were empty, signal that we have work now.
? if (!dry_.IsSignaled())
??? dry_.Signal();
}
赋值stack_int为7,创建线程进行,Start之前不会进行赋值,Join后判断是否赋值成功,这也是常规线程使用逻辑,最好不要在Start和Join之间做一些逻辑
class SetIntRunner : public DelegateSimpleThread::Delegate {
public:
??? SetIntRunner(int* ptr, int val) : ptr_(ptr), val_(val) {}
??? ~SetIntRunner() override = default;
private:
??? void Run() override { *ptr_ = val_; }
??? int* ptr_;
??? int val_;
??? DISALLOW_COPY_AND_ASSIGN(SetIntRunner);
};
int stack_int = 0;
SetIntRunner runner(&stack_int, 7);
EXPECT_EQ(0, stack_int);
DelegateSimpleThread thread(&runner, "int_setter");
EXPECT_FALSE(thread.HasBeenStarted());
EXPECT_FALSE(thread.HasBeenJoined());
EXPECT_EQ(0, stack_int);
thread.Start();
EXPECT_TRUE(thread.HasBeenStarted());
EXPECT_FALSE(thread.HasBeenJoined());
thread.Join();
EXPECT_TRUE(thread.HasBeenStarted());
EXPECT_TRUE(thread.HasBeenJoined());
EXPECT_EQ(7, stack_int);
创建线程处理一些耗时任务,当前线程阻塞等待执行结果,常适用于一些IO、文件等耗时任务
class WaitEventRunner : public DelegateSimpleThread::Delegate {
public:
??? explicit WaitEventRunner(WaitableEvent* event) : event_(event) {}
??? ~WaitEventRunner() override = default;
private:
??? void Run() override {
??????? EXPECT_FALSE(event_->IsSignaled());
? ? ? // 做一些异步耗时操作,做完后触发信号告诉任务已经完成
??????? event_->Signal();
??????? EXPECT_TRUE(event_->IsSignaled());
??? }
??? WaitableEvent* event_;
??? DISALLOW_COPY_AND_ASSIGN(WaitEventRunner);
};
// Create a thread, and wait for it to signal us.
WaitableEvent event(WaitableEvent::ResetPolicy::MANUAL,
??? WaitableEvent::InitialState::NOT_SIGNALED);
WaitEventRunner runner(&event);
DelegateSimpleThread thread(&runner, "event_waiter");
EXPECT_FALSE(event.IsSignaled());
thread.Start();
event.Wait();
EXPECT_TRUE(event.IsSignaled());
thread.Join();
定义一些顺序执行原子任务放入到线程池中执行,Start线程池会启动执行,并且Start前也可以AddWork。
// AtomicSequenceNumber is a thread safe increasing sequence number generator.
// Its constructor doesn't emit a static initializer, so it's safe to use as a
// global variable or static member.
class AtomicSequenceNumber {
?public:
? constexpr AtomicSequenceNumber() = default;
? // Returns an increasing sequence number starts from 0 for each call.
? // This function can be called from any thread without data race.
? inline int GetNext() { return seq_.fetch_add(1, std::memory_order_relaxed); }
?private:
? std::atomic_int seq_{0};
? DISALLOW_COPY_AND_ASSIGN(AtomicSequenceNumber);
};
class SeqRunner : public DelegateSimpleThread::Delegate {
public:
??? explicit SeqRunner(AtomicSequenceNumber* seq) : seq_(seq) {}
private:
??? void Run() override { seq_->GetNext(); }
??? AtomicSequenceNumber* seq_;
??? DISALLOW_COPY_AND_ASSIGN(SeqRunner);
};
AtomicSequenceNumber seq;
SeqRunner runner(&seq);
DelegateSimpleThreadPool pool("seq_runner", 10);
// Add work before we're running.
pool.AddWork(&runner, 300);
EXPECT_EQ(seq.GetNext(), 0);
pool.Start();
// Add work while we're running.
pool.AddWork(&runner, 300);
pool.JoinAll();
EXPECT_EQ(seq.GetNext(), 601);