在之前的 缓存击穿和缓存雪崩 中提到了 singleflight 的概念,singleflightGo 官方扩展库 x 中提供的扩展并发原语,能够将多个并发请求合并为一个,降低服务端压力。本文记录深入理解 singleflight 源码的过程,帮助自己更好地掌握其用法和实现原理。

singleflight 的核心结构体

singleflight 的核心结构体只有两个,分别是 callGroup。其中 call 代表一个正在进行或已完成的函数调用,而 Group 则表示一类工作,形成一个命名空间,在该命名空间中可以执行具有重复抑制的工作单元。
callGroup 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup

// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val interface{}
err error

// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}

其中 Group 代表一个 singleflight 的命名空间,它有两个字段,mummu 是一个互斥锁,用于保护 m 的并发读写;m 是一个懒初始化的 map[string]*call,用于存储正在进行或已完成的函数调用(懒加载会在后面 singleflight.Do 方法的源码中看到)。

call 作为一个仅包内可见的结构体,包含了 sync.WaitGroupvalerrdupschans 等字段。它的作用是表示一个正在进行或已完成的 fn 函数调用。而 chans 表示一个 Result 类型的通道切片,这里通道是单向的,只能写入不能读区(send-only)。至于 Result 则是 singleflight 中的一个结构体,包含的 valerr 字段,分别表示函数调用的返回值和错误信息:

1
2
3
4
5
6
7
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}

singleflight.Do 方法

Group类型有三个方法,分别是 DoDoChanForget。其中 Do 方法是 singleflight 的核心方法,用于执行一个函数并返回结果。它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()

if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}

Do 方法的参数包括一个 key 字符串和一个函数 fn,它会执行 fn 函数并返回结果。方法的返回值包括 verrshared,其中 v 是函数调用的返回值,err 是错误信息,shared 表示是否有多个调用者共享同一个结果。

进入方法后,首先获取 mu 锁,然后检查 m 是否为 nil,如果是,则初始化 m。这就是前面说的 map 懒加载的地方。

接着检查 m 中是否已经存在 key,如果存在,则将 dups 加一,表示有一个新的调用者来请求同一个 key 的结果。然后释放锁,等待原始调用完成(c.wg.Wait()),并返回原始调用的结果。这里的返回调用结果,是从 call 结构体中获取的,c.valc.err 分别表示函数调用的返回值和错误信息。注意这里判断了 err 的错误类型,如果是 panicError 类型的错误,则会抛出异常;如果是 errGoexit,则会调用 runtime.Goexit() 直接退出当前协程。其中 panicErrorerrGoexitsingleflight 中定义的两个错误类型,分别表示函数调用中的异常和协程退出的错误:

1
2
3
4
5
6
7
8
9
10
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")

// A panicError is an arbitrary value recovered from a panic
// with the stack trace during the execution of given function.
type panicError struct {
value interface{}
stack []byte
}

这些错误对象的生成和处理会在后续 singleflight.doCall 方法中看到。而 singleflight.doCall 方法的执行则是在之前检查 m 中不存在 key 的情况下进行的。如果不存在,则创建一个新的 call 对象 c,并将其添加到 m 中。然后释放锁,调用 g.doCall(c, key, fn) 方法执行函数调用。

那么现在可以整理出 Do 方法的执行流程:

  1. 获取 mu 锁,检查 m 是否为 nil,如果是,则初始化 m
  2. 检查 m 中是否存在 key,如果存在,则将 dups 加一,释放锁,等待原始调用完成,并返回原始调用的结果。
  3. 如果不存在,则创建一个新的 call 对象 c,并将其添加到 m 中。然后释放锁,调用 g.doCall(c, key, fn) 方法执行 fn 函数调用。
  4. fn 函数调用完成后,将结果存储到 c.valc.err 中,并返回 fn 函数的执行结果。同时如果有其他调用者在等待 fn 函数的结果(c.wg.Wait()),则会通知所有等待的调用者。

至此,Do 方法的执行流程就完成了,而对于如何捕获异常和处理错误以及如何通知等待的调用者,则是在 singleflight.doCall 方法里实现的。

singleflight.doCall 方法

doCall 方法是 singleflight 中的一个私有方法,用于执行函数调用并处理结果。它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false

// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}

g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}

if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()

func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()

c.val, c.err = fn()
normalReturn = true
}()

if !normalReturn {
recovered = true
}
}

第一眼看上去可能会觉得 doCall 方法的实现有些复杂,实际上逐步分析这个方法的实现后,会发现它的核心逻辑是通过 defer 语句来处理函数调用的异常和结果通知。

首先看到 doCall 方法的参数包括一个 call 对象 c、一个 key 字符串和一个函数 fn。方法的返回值没有,所有的结果都通过 call 对象 c 来传递。

根据 go 语言的 defer 语句的特性是先入后出、延迟执行,所以先看到下面一个匿名函数:

1
2
3
4
5
6
7
8
9
10
11
12
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()

c.val, c.err = fn()
normalReturn = true
}()

首先运行 c.val, c.err = fn() 来执行函数 fn,并将返回值存储到 c.valc.err 中。然后将 normalReturn 设置为 true,表示函数调用正常返回。并且此时 defer 语句中也因为 normalReturntrue,所以不会执行其他代码。但是当 fn() 函数执行过程中发生了 panic,则此时就不会执行 normalReturn = true,而是会停止运行该匿名函数的执行直接进入 defer 语句中,而由于此时 normalReturnfalse,所以会执行 defer 语句中的 if r := recover(); r != nil 语句来捕获异常,并将异常信息存储到 c.err 中。这里的 newPanicError(r) 是一个函数,用于创建一个新的 panicError 对象,包含异常信息和堆栈信息。它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
func newPanicError(v interface{}) error {
stack := debug.Stack()

// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}

debug.Stack() 函数用于获取当前协程的堆栈信息,返回一个字节切片。然后通过 bytes.IndexByte 函数查找第一个换行符的位置,并将其无效部分去掉,最后返回一个新的 panicError 对象。之前在看 singleflight.Do 函数中有一个错误类型断言 c.err.(*panicError),错误信息就是在这里通过调用 newPanicError 创建并赋值给 c.err 的。

匿名函数执行完成后,代码会开始执行这里:

1
2
3
if !normalReturn {
recovered = true
}

normalReturnrecovered 都是函数一开始定义的变量,normalReturn 用于标记函数是否正常返回,在前面的匿名函数中已经被标记了,至于这里根据的 if !normalReturn 判断是否需要将 recovered 设置为 true 的原因,则需要看源码,现在看到最开始的 defer 语句,也就是 doCall 函数最后执行的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}

g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}

if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()

首先来分析:为什么需要两个 defer 语句?为什么不可以只用 normalReturn 来判断是否是 panic,而需要用 recovered 来判断?

上面的问题其实可以通过分析源码来理解。首先根据注释(use double-defer to distinguish panic from runtime.Goexit)可以知道两个 defer 语句是用来区分 panicruntime.Goexit。而在 Go 语言中,panicruntime.Goexit 都会导致协程退出。

根据注释的这个思路具体分析上面的代码,第一个 defer 语句执行时,有几种情况: singleflight.doCall 函数正常返回和 runtime.Goexit 正常退出。

  • 如果是 runtime.Goexit 导致 singleflight.doCall 函数退出,则不会执行 if !normalReturn 的判断,自然也不会对变量 recovered 进行赋值,那么这种情况就需要把 c.err 设置为 errGoexit,表示函数协程退出的原因是因为程序正常退出。这里的 errGoexit 是一个错误对象,表示函数是因为整个程序退出而被停止运行协程,而不是因为 panic 导致的异常。
  • 如果 singleflight.doCall 函数正常返回,则会根据 normalReturn 的值来判断是否需要将 recovered 设置为 true。如果 normalReturnfalse,则表示函数调用过程中发生了 panic,此时会将 recovered 设置为 true,表示已经捕获到异常。然后在第二个 defer 语句中会判断 c.err 是否是 panicError 类型,如果是,则会将其抛出;如果不是,则表示函数调用正常返回。

然后再看这个函数中其余的细节,代码基于读写锁的 mu 锁来保护 m 的并发读写,使用 defer g.mu.Unlock() 来释放锁。通过调用 c.wg.Done() 来通知等待的调用者,表示函数调用已经完成。接着判断 g.m[key] == c,如果是,则删除 m 中的 key,表示函数调用已经完成。这里删除 m 中的 key 是因为该函数已经完成了,后续如果有其他调用者请求同一个 key 的结果,则会重新创建一个新的 call 对象 c,并重新运行 fn 函数。这里如果不删除 m 中的 key,则会导致后续的调用者无法获取到新的结果。

后面的代码则是判断 c.err 的类型,如果是 panicError 类型,则会重新使用 panic 抛出异常;如果是 errGoexit,则表示函数调用正常退出,不需要再次调用 runtime.Goexit()。而需要注意的地方在于无论是 panic 还是正常运行得到结果,都要判断 c.chans 的长度,如果大于 0,则需要将错误信息或者结果通过 chans 通道发送给所有等待的调用者。这是因为前面使用的不是 singleflightDo 方法,而是 DoChan 方法。

DoChan 方法的定义和分析暂且按下不表,这里最后来总结一下 doCall 方法的执行流程:

  1. 定义两个变量 normalReturnrecovered,分别用于标记函数是否正常返回和是否捕获到异常。
  2. 执行函数 fn,并将返回值存储到 c.valc.err 中。
  3. 处理函数 fn 执行的结果或者异常:
    • 如果函数正常返回,则将 normalReturn 设置为 true
    • 如果函数执行过程中发生了 panic,则捕获异常并将其存储到 c.err 中。
  4. 通过 defer 语句来区分 fn 函数的正常返回、 panic 异常和 runtime.Goexit 正常退出,并设置相应的错误信息。
  5. 根据错误信息的类型,分发 fn 函数的结果或者异常给所有等待的调用者。

这段代码中双层 defer 的设计非常值得学习,第一层 defer 用于处理函数调用的异常和结果,第二层 defer 用于处理 runtime.Goexit 的异常。

singleflight.DoChan 方法

了解完 singleflight.Dosingleflight.doCall 方法后,接下来就可以看 DoChan 方法了。DoChan 方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

go g.doCall(c, key, fn)

return ch
}

DoChan 方法的参数和返回值与 Do 方法类似,只不过返回值是一个通道 ch,而不是函数调用的结果。方法的实现逻辑与 Do 方法类似,主要区别在于 DoChan 方法会将通道 ch 添加到 call 对象 cchans 切片中,而不是直接返回函数调用的结果。
这样做的好处是可以将函数调用的结果通过通道发送给所有等待的调用者,而不是直接返回结果。这样可以避免在函数调用过程中阻塞主线程,提高并发性能。

singleflight.Forget 方法

还剩下一个方法 Forget,它的定义如下:

1
2
3
4
5
6
7
8
// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}

代码很简单,Forget 方法用于删除 m 中的 key,表示忘记该 key 的结果。这样后续的调用者请求同一个 key 的结果时,会重新执行函数 fn。这个方法可以用于清除缓存或者重置状态。

这里很显然该方法并不应该被频繁调用,因为它会导致 singleflight 的缓存失效,因为如果频繁使其失效,那么 singleflight 的意义就不大了。一般情况下,Forget 方法应该在以下情况下使用:

  • 当调用超时时,可以调用 Forget 方法来清除缓存。因为如果某个函数执行时间过长,业务已经不需要该结果或者已经需要返回结果为超时了,那么就可以调用 Forget 方法来清除缓存。
  • 当调用结果有问题时,可以调用 Forget 方法来清除缓存。因为如果某个函数执行的结果有问题,那么就可以调用 Forget 方法来清除缓存,防止后续的调用者获取到错误的结果。
  • 当调用的函数需要重新执行时,可以调用 Forget 方法来清除缓存。因为如果某个函数有一个重试机制,那么就需要对同一个 key 的结果进行清除,来方式其他调用复用之前的结果。

结语

根据 singleflight 的实现原理和源码分析,可以看出它的设计非常巧妙,充分利用了 Go 语言的并发特性和 defer 语句的特性。通过 singleflight,可以有效地减少重复请求的开销,提高系统的性能和稳定性。

其本质是合并重复的并发调用,从而避免重复的计算和资源浪费。它的实现原理和设计思路可以借鉴到其他并发场景中,比如缓存、数据库连接池等。所以据此,不仅仅在解决缓存击穿和缓存雪崩的问题上有用,在其他场景诸如:
远程过程调用、定时任务去重、消费者处理消息去重等场景中也可以使用 singleflight 来实现类似的功能。