深入理解 go sync.Once
在很多情况下,我们可能需要控制某一段代码只执行一次,比如做某些初始化操作,如初始化数据库连接等。 对于这种场景,go 为我们提供了 sync.Once 对象,它保证了某个动作只被执行一次。 当然我们也是可以自己通过 Mutex 实现 sync.Once 的功能,但是相比来说繁琐了那么一点, 因为我们不仅要自己去控制锁,还要通过一个标识来标志是否已经执行过。
# Once 的实现
Once 的实现非常简单,如下,就只有 20 来行代码,但里面包含了 go 并发、同步的一些常见处理方法。
package sync
import (
"sync/atomic"
)
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
简要说明:
done字段指示了操作是否已执行,也就是我们传递给Do的函数是否已经被执行。Do方法接收一个函数参数,这个函数参数只会被执行一次。Once内部是通过Mutex来实现不同协程之间的同步的。
# 使用示例
在下面的例子中,once.Do(test) 被执行了 3 次,但是最终 test 只被执行了一次。
package sync
import (
"fmt"
"sync"
"testing"
)
var once sync.Once
var a = 0
func test() {
a++
}
func TestOnce(t *testing.T) {
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go func() {
// once.Do 会调用 3 次,但最终只会执行一次
once.Do(test)
wg.Done()
}()
}
wg.Wait()
fmt.Println(a) // 1
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Once 的一些工作机制
Once的Do方法可以保证,在多个 goroutine 同时执行Do方法的时候, 在第一个抢占到Do执行权的 goroutine 执行返回之前,其他 goroutine 都会阻塞在Once.Do的调用上, 只有第一个Do调用返回的时候,其他 goroutine 才可以继续执行下去,并且其他所有的 goroutine 不会再执行传递给Do的函数。(如果是初始化的场景,这可以避免尚未初始化完成就执行其他的操作)- 如果
Once.Do发生panic的时候,传递给Do的函数依然被标记为已完成。后续对Do的调用也不会再执行传给Do的函数参数。 - 我们不能简单地通过
atomic.CompareAndSwapUint32来决定是否执行f(),因为在多个 goroutine 同时执行的时候,它无法保证f()只被执行一次。所以Once里面用了Mutex,这样就可以有效地保护临界区。
// 错误实现,这不能保证 f 只被执行一次
if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
f()
}
2
3
4
Once.Do的函数参数是没有参数的,如果我们需要传递一些参数,可以再对f做一层包裹。
config.once.Do(func() { config.init(filename) })
# Once 详解
# hotpath
这里说的 hotpath 指的是 Once 里的第一个字段 done:
type Once struct {
// hotpath
done uint32
m Mutex
}
2
3
4
5
Once 结构体的第一个字段是 done,这是因为 done 的访问是远远大于 Once 中另外一个字段 m 的, 放在第一个字段中,编译器就可以做一些优化,因为结构体的地址其实就是结构体第一个字段的地址, 这样一来,在访问 done 字段的时候,就不需要通过结构体地址 + 偏移量的方式来访问, 这在一定程度上提高了性能。
结构体地址计算示例:
type person struct {
name string
age int
}
func TestStruct(t *testing.T) {
var p = person{
name: "foo",
age: 10,
}
// p 和 p.name 的地址相同
// 0xc0000100a8, 0xc0000100a8
fmt.Printf("%p, %p\n", &p, &p.name)
// p.age 的地址
// 0xc0000100b8
fmt.Printf("%p\n", &p.age)
// p.age 的地址也可以通过:结构体地址 + age 字段偏移量 计算得出。
// 0xc0000100b8
fmt.Println(unsafe.Add(unsafe.Pointer(&p), unsafe.Offsetof(p.age)))
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# atomic.LoadUint32
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
2
3
4
5
在 Do 方法中,是通过 atomic.LoadUint32 的方式来判断 done 是否等于 0 的, 这是因为,如果直接使用 done == 0 的方式的话,就有可能导致在 doSlow 里面对 done 设置为 1 之后, 在 Do 方法里面无法正常观测到。因此用了 atomic.LoadUint32。
而在 doSlow 里面是可以通过 done == 0 来判断的,这是因为 doSlow 里面已经通过 Mutex 保护起来了。 唯一设置 done = 1 的地方就在临界区里面,所以 doSlow 里面通过 done == 0 来判断是完全没有问题的。
# atomic.StoreUint32
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
2
3
4
5
6
7
8
在 doSlow 方法中,设置 done 为 1 也是通过 atomic.StoreUint32 来设置的。 这样就可以保证在设置了 done 为 1 之后,可以及时被其他 goroutine 看到。
# Mutex
doSlow 的实现里面,最终还是要通过 Mutex 来保护临界区, 通过 Mutex 可以实现 f 只被执行一次,并且其他的 goroutine 都可以使用这一次 f 的执行结果。 因为其他 goroutine 在第一次 f 调用未返回之前,都阻塞在获取 Mutex 锁的地方, 当它们获取到 Mutex 锁的时候,得以继续往下执行,但这个时候 f 已经执行完毕了, 所以当它们获取到 Mutex 锁之后其实什么也没有干。
但是它们的阻塞状态被解除了,可以继续往下执行。
# 总结
Once保证了传入的函数只会执行一次,这常常用在一些初始化的场景、或者单例模式。Once可以保证所有对Do的并发调用都是安全的,所有对Once.Do调用之后的操作,一定会在第一次对f调用之后执行。(没有获取到f执行权的 goroutine 会阻塞)- 即使
Once.Do里面的f出现了panic,后续也不会再次调用f。