当前位置:主页 > 查看内容

golang 系列:waitgroup 解析

发布时间:2021-08-16 00:00| 位朋友查看

简介:摘要 Golang 提供了简洁的 go 关键字来让开发者更容易的进行并发编程,同时也提供了 WaitGroup 对象来辅助并发控制。今天我们就来分析下 WaitGroup 的使用方法,顺便瞧一瞧它的底层源码。 WaitGroup 的使用场景和方法 当我们有很多任务要同时进行时,如果并……
摘要

Golang 提供了简洁的 go 关键字来让开发者更容易的进行并发编程,同时也提供了 WaitGroup 对象来辅助并发控制。今天我们就来分析下 WaitGroup 的使用方法,顺便瞧一瞧它的底层源码。

WaitGroup 的使用场景和方法

当我们有很多任务要同时进行时,如果并不需要关心各个任务的执行进度,那直接使用 go 关键字即可。

如果我们需要关心所有任务完成后才能往下运行时,则需要 WaitGroup 来阻塞等待这些并发任务了。

WaitGroup 如同它的字面意思,就是等待一组 goroutine 运行完成,主要有三个方法组成:

Add(delta int) :添加任务数Wait():阻塞等待所有任务的完成Done():完成任务

下面是它们的具体用法,具体的作用都在注释上:

package main
import (
 "fmt"
 "sync"
 "time"
func worker(wg *sync.WaitGroup) {
 doSomething()
 wg.Done() // 2.1、完成任务
func main() {
 var wg sync.WaitGroup
 wg.Add(5) // 1、添加 5 个任务
 for i := 1; i i++ {
 go worker( wg) // 2、每个任务并发执行
 wg.Wait() // 3、阻塞等待所有任务完成
}
WaitGroup 源码分析

上面 WaitGroup 的使用很简单,接下来我们到 src/sync/waitgroup.go 里分析下它的源码。首先,是 WaitGroup 的结构体:

type WaitGroup struct {
 noCopy noCopy
 state1 [3]uint32
}
noCopy

其中,noCopy 表示 WaitGroup 是不可复制的。那么什么叫不可复制呢?

举个例子,当我们对函数参数定义了这个不可复制的类型时,开发者只能通过指针来传递函数参数。而规定使用指针传递又有什么好处呢?

好处在于如果有多个函数都定义了这个不可复制的参数时,那么这多个函数参数就可以共用同一个指针变量,来同步执行结果。而 WaitGroup 就是需要这样的约束规定。

state1 字段

接下来我们来看看 WaitGroup 的 state1 字段。state1 是一个包含了 counter 总数、 waiter 等待数、sema 信号量的 uint32 数组。

每当有 goroutine 调用了 Wait() 方法阻塞等待时,就会对 waiter 数量 + 1,然后等待信号量的唤起通知。

当我们调用 Add() 方法时,就会对 state1 的 counter 数量 + 1。

当调用 Done() 方法时就会对 counter 数量 -1。

直到 counter == 0 时就可以通过信号量唤起对应 waiter 数量的 goroutine 了,也就是唤起刚刚阻塞等待的 goroutine 们。

关于信号量的解释,可以参考下 golang 重要知识:mutex 里的相关介绍:

PV 原语解释:
通过操作信号量 S 来处理进程间的同步与互斥的问题。
S 0:表示有 S 个资源可用;S=0 表示无资源可用;S 0 绝对值表示等待队列或链表中的进程个数。信号量 S 的初值应大于等于 0。
P 原语:表示申请一个资源,对 S 原子性的减 1,若 减 1 后仍 S =0,则该进程继续执行;若 减 1 后 S 0,表示已无资源可用,需要将自己阻塞起来,放到等待队列上。
V 原语:表示释放一个资源,对 S 原子性的加 1;若 加 1 后 S 0,则该进程继续执行;若 加 1 后 S =0,表示等待队列上有等待进程,需要将第一个等待的进程唤醒。

此处操作系统可以理解为 Go 的运行时 runtime,进程可以理解为协程。

源码解释

最后,我们来深入 WaitGroup 的三个方法,进行源码分析。大家感兴趣的可以继续往下看,主要是对源码的分析注释。

Add(delta int) 方法
func (wg *WaitGroup) Add(delta int) {
 statep, semap := wg.state()
 if race.Enabled { // 此处是 go 的竞争检测,可以不用关心
 _ = *statep
 if delta 0 {
 race.ReleaseMerge(unsafe.Pointer(wg))
 race.Disable()
 defer race.Enable()
 state := atomic.AddUint64(statep, uint64(delta) 32)
 v := int32(state 32) // 获取 counter
 w := uint32(state) // 获取 waiter
 if race.Enabled delta 0 v == int32(delta) { // go 的竞争检测,可以不用关心
 race.Read(unsafe.Pointer(semap))
 if v 0 {
 panic("sync: negative WaitGroup counter")
 if w != 0 delta 0 v == int32(delta) {
 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
 if v 0 || w == 0 { // counter 0:还有任务在执行;waiter == 0 表示没有在阻塞等待的 goroutine
 return
 if *statep != state {
 panic("sync: WaitGroup misuse: Add called concurrently with Wait")
 // 执行到此处相当于 countr = 0,即所有的任务都已执行完,需要唤起等待的 goroutine了
 *statep = 0
 for ; w != 0; w-- {
 runtime_Semrelease(semap, false, 0)
}
Done 方法
func (wg *WaitGroup) Done() {
 wg.Add(-1) // 直接调用 Add 方法 对 counter -1
}
Wait 方法
func (wg *WaitGroup) Wait() {
 statep, semap := wg.state()
 if race.Enabled { // go 的竞争检测,可以不用关心
 _ = *statep
 race.Disable()
 for {
 state := atomic.LoadUint64(statep)
 v := int32(state 32)
 w := uint32(state)
 if v == 0 {
 // counter 为 0, 不需要再等待了。
 if race.Enabled {
 race.Enable()
 race.Acquire(unsafe.Pointer(wg))
 return
 // waiters 数目 +1.
 if atomic.CompareAndSwapUint64(statep, state, state+1) {
 if race.Enabled w == 0 {
 race.Write(unsafe.Pointer(semap)) // go 的竞争检测,可以不用关心
 runtime_Semacquire(semap) // 阻塞等待唤起
 if *statep != 0 {
 panic("sync: WaitGroup is reused before previous Wait has returned")
 if race.Enabled {
 race.Enable()
 race.Acquire(unsafe.Pointer(wg))
 return
}

从这几个方法的源码,我们可以看出,Go 并没有使用 mutex 等锁去做字段值修改,而是采用了 atomic 原子操作来进行修改的。这是在底层硬件上支持的,所以性能更好。

总结

WaitGroup 比较简单,就是一些计数值的维护和 goroutine 的阻塞唤起。它的运用也简单,Add、Done、Wait 这三个方法经常是同时出现的。相信大伙深入到源码也能瞧出个大概,这里就献丑了 ?。


本文转自网络,原文链接:https://developer.aliyun.com/article/787101
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
上一篇:试用esc服务器 下一篇:没有了

推荐图文


随机推荐