之前在协调多个goroutine的时候,使用了通道。基本都是按下面这样来使用的:
package main
import "fmt"
func main() {
done := make(chan struct{})
count := 5
for i := 0; i < count; i++ {
go func(i int) {
defer func() {
done <- struct{}{}
}()
fmt.Println(i)
}(i)
}
for j := 0; j < count; j++ {
<- done
}
fmt.Println("Over")
}
这里有一个问题,要保证主goroutine最后从通道接收元素的的次数需要与之前其他goroutine发送元素的次数相同。
其实,在这种应用场景下,可以选用另外一个同步工具,就是这里要讲的sync包的WaitGroup类型。
sync.WaitGroup类型,它比通道更加适合实现这种一对多的goroutine协作流程。WaitGroup是开箱即用的,也是并发安全的。同时,与之前提到的同步工具一样,它一旦被真正的使用就不能被复制了。
WaitGroup拥有三个指针方法,可以想象该类型中有一个计数器,默认值是0,下面的方法就是操作或判断计数器:
Add(-1)
,可以在defer语句中调用它现在就用WaitGroup来改造开篇的程序:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup // 开箱即用,所以直接声明就好了,没必要用短变量声明
// wg := sync.WaitGroup{} // 短变量声明可以这么写
count := 5
for i := 0; i < count; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
fmt.Println("Over")
}
改造后,在主goroutine最后等待退出的部分现在看着要美观多了。这个就是WaitGroup典型的应用场景了。
计数器不能小于0
在sync.WaitGroup类型值中计数器的值是不可以小于0的。一旦小于0会引发panic,不适当的调用Done方法和Add方法就有可能使它小于0而引发panic。
尽早增加计数器的值
如果在对它的Add方法的首次调用,与对它的Wait方法的调用是同时发起的。比如,在同时启动的两个goroutine中,分别调用这两个方法,那就就有可能会让这里的Add方法抛出一个panic。并且这种情况不太容易,应该予以重视。所以虽然WaitGroup值本身并不需要初始化,但是尽早的增加其计数器的值是非要必要的。
复用的情况
WaitGroup的值是可以被复用的,但需要保证其计数周期的完整性。这里的计数周期指的是这样一个过程:该值中的计数器值由0变为了某个正整数,而后又经过一系列的变化,最终由某个正整数又变回了0。这个过程可以被视为一个计数周期。在一个此类的生命周期中,它可以经历任意多个计数周期。但是,只有在它走完当前的计数周期后,才能够开始下一个计数周期。
也就是说,如果一个此类值的Wait方法在它的某个计数周期中被调用,那么就会立即阻塞当前的goroutine,直至这个计数周期完成。在这种情况下,该值的下一个计数周期必须要等到这个Wait方法执行结束之后,才能够开始。
Wait方法是有一个执行的过程的,如果在这个方法执行期间,跨越了两个计数周期,就会引发一个panic。比如,当前的goroutine调用了Wait方法而阻塞了。另一个goroutine调用了Done方法使计数器变成了0。此时会唤醒之前阻塞的goroutine,并且去执行Wait方法中其余的代码(这里还在这行Wait方法,执行的是源码sync.Wait方法里的代码,不是我们自己写的程序的Wait之后的代码)。在这个时候,又有一个goroutine调用了Add方法,使计数器的值又从0变为了某个正整数。此时正在执行的Wait方法就会立即抛出一个panic。
上面给了3种会引发panic的情况。关于后两种情况,建议如下:
不要把增加计数器值的操作和调用Wait方法的代码,放在不同的goroutine中执行。
就是要杜绝对同一个WatiGroup值的两种操作的并发执行。
后面提到的两种情况,不是每次都会发生,通常需要反复的实验才能够引发panic的情况。虽然不是每次都发生,但是在长期运行的过程中,这种情况是必然会出现的,应该予以重视并且避免。
如果对复现这些异常情况感兴趣,可以看一下sync代码包中的waitgroup_test.go文件。其中的名称以TestWaitGroupMisuse为前缀的测试函数,很好的展示了这些异常情况发生的条件。
与sync.WaitGroup类型一样,Sync.Once类型也属于结构体类型,同样也是开箱即用和并发安全的。由于这个类型中包含了一个sync.Mutex类型的字段,所以复制改类型的值也会导致功能失效。
Do方法
Once类型的Do方法只接收一个参数,参数的类型必须是func(),即无参数无返回的函数。该方法的功能并不是对每一种参数函数都只执行一次,而是只执行首次被调用时传入的那个函数,并且之后不会再执行任何参数函数。所以,如果有多个需要执行一次的函数,应该为它们每一个都分配一个sync.Once类型的值。
基本用法如下:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter uint32
var once sync.Once
once.Do(func() {
atomic.AddUint32(&counter, 1)
})
fmt.Println("counter:", counter)
// 这次调用不会被执行
once.Do(func() {
atomic.AddUint32(&counter, 2)
})
fmt.Println("counter:", counter)
}
done字段
Once类型中还要一个名为done的uint32类型的字段。它的作用是记录所属值的Do方法被调用的次数。不过改字段的值只可能是0或1.一旦Do方法的首次调用完成,它的值就会从0变为1。
关于done的类型,其实用布尔类型就够了,这里只所以用uint32类型的原因是它的操作必须是原子操作,只能使用原子操作支持的数据类型。
Do方法的实现方式
Do方法在一开始就会通过atomic.LoadUint32来获取done字段的值,并且如果发现值为1就直接返回。这步只是初步保证了Do方法只会执行首次调用是传入的函数。
不过单凭上面的判断是不够的。如果两个goroutine都调用了同一个新的Once值的Do方法,并且几乎同时执行到了其中的这个条件判断代码,那么它们就都会因判断结果为false而继续执行Do方法中剩余的代码。
基于上面的可能,在初步保证的判断之后,Do方法会立即锁定其所属值中的那个sync.Mutex类型的m字段。然后,它会在临界区中再次检查done字段的值。此时done的值应该仍然是0,并且已经加锁。此时才认为是条件满足,才会去调用参数函数。并且用原子操作把done的值变为1。
单例模式
如果熟悉设计模式中的单例模式的话,这个Do方法的实现方式,与单例模式有很多相似之处。都会先在临界区之外判断一次关键条件,若条件不满足则立即返回。这通常被称为快路径,或者叫做快速失败路径。
如果条件满足,那么到了临界区中还要再对关键条件进行一次判断,这主要是为了更加严谨。这两次条件判断常被统称为(跨临界区的)双重检查。由于进入临界区前要加锁,显然会降低代码的执行速度,所以其中的第二次条件判断,以及后续的操作就被称为慢路径或者常规路径。
Do方法中的代码不多,但它却应用了一个很经典的编程范式。
一、由于Do方法只会在参数函数执行结束之后把done字段的值变为1,因此,如果参数函数的执行需要很长的时间或者根本就不会结束,那么就有可能会导致相关goroutine的同时阻塞。
比如,有多个goroutine并发的调用了同一个Once值的Do方法,并且传入的函数都会一直执行而不结束。那么,这些goroutine就都会因调用了这个Do方法而阻塞。此时,那个抢先执行了参数函数的goroutine之外,其他的goroutine都会被阻塞在该Once值的互斥锁m的那行代码上。
效果演示的示例代码:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
once := sync.Once{} // 这里换短变量声明
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
// 这个函数会被执行
once.Do(func() {
for i := 0; i < 10; i++ {
fmt.Printf("\r任务[1-%d]执行中...", i)
time.Sleep(time.Millisecond * 400)
}
})
fmt.Printf("\n任务[1]执行完毕\n")
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 300)
// 这句Do方法的调用会一直阻塞,知道上面的函数执行完毕
// 然后Do方法里的函数不会执行
once.Do(func() {
fmt.Println("任务[2]执行中...")
})
// 上面Do方法阻塞结束后,直接会执行下面的代码
fmt.Println("任务[2]执行完毕")
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 300)
once.Do(func() {
fmt.Println("任务[3]执行中...")
})
fmt.Println("任务[3]执行完毕")
}()
wg.Wait()
fmt.Println("Over")
}
二、Do方法在参数函数执行结束后,对done字段的赋值用的是原子操作,并且这一操作是被挂载defer语句中的。因此,不论参数函数的执行会以怎样的方式结束,done字段的值都会变为1。
这样就是说即时参数函数没有执行成功,比如引发了panic。也是无法使用同一个Once值重新执行别的函数了。所以,如果需要为参数函数的执行设定重试机制,就要考虑在适当的时候替换Once值。
参考下面的示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
once := sync.Once{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if p := recover(); p != nil {
fmt.Printf("PANIC: %v\n", p)
// 下面的语句会给once变量替换一个新的Once值,这样下面的第二个任务还能被执行
// once = sync.Once{}
}
}()
once.Do(func() {
fmt.Println("开始执行参数函数,紧接着会引发panic")
panic(fmt.Errorf("主动引发了一个panic")) // panic之后就去调用defer了
fmt.Println("参数函数执行完毕") // 这行不会执行,后面的都不会执行
})
fmt.Println("Do方法调用完毕") // 这行也不会执行
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 500)
once.Do(func() {
fmt.Println("第二个任务执行中...")
time.Sleep(time.Millisecond * 800)
fmt.Println("第二个任务执行结束")
})
fmt.Println("第二个任务结束")
}()
wg.Wait()
fmt.Println("Over")
}
延迟一个昂贵的初始化步骤到有实际需求的时刻是一个很好的实践。这也是sync.Once的一个使用场景。
下面是从书上改的示例代码:
package main
import (
"fmt"
"sync"
)
var once sync.Once
var testmap map[string] int32
// 对testmap进行初始化的函数
func loadTestmap() {
testmap = map[string] int32{
"k1": 1,
"k2": 2,
"k3": 3,
}
}
// 获取testmap对应key的值,如果没有初始化,会先执行初始化
// 书上说这个函数是并发安全的,这里的map初始化之后,内容不会再变
func getKey(key string) int32 {
once.Do(loadTestmap)
// 最后的return这句可能不是并发安全的,不过线程安全的map不是这里的重点
// 假定这里的map在初始化之后只会被多个goroutine读取,其内容不会再改变
return testmap[key]
}
func main() {
fmt.Println(getKey("k1"))
}
这里不考虑map线程安全的问题,而且书上的例子这里的map只用来存放数据,初始化之后不会对其内容进行修改。
这里主要是保证在变量初始化过程中的并发安全。以这种方式来使用sync.Once,可以避免变量在正确构造之前就被其它goroutine分享。否则,在别的goroutine中可能会获取到一个内容不完整的变量。
sync代码包的WaitGroup类型和Once类型都是非常易用的同步工具。它们都是开箱即用和并发安全的。
Once类型使用互斥锁和原子操作实现了功能,而WatiGroup类型中只用到了原子操作。所以可以说,它们都是更高层次的同步工具。它们都基于基本的同步工具,实现了某种特定的功能。sync包中的其他高级同步工具,其实也都是这样的。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。