在之前的 缓存击穿和缓存雪崩 中提到了 singleflight
的概念,singleflight
是 Go
官方扩展库 x
中提供的扩展并发原语,能够将多个并发请求合并为一个,降低服务端压力。本文记录深入理解 singleflight
源码的过程,帮助自己更好地掌握其用法和实现原理。
singleflight 的核心结构体
singleflight
的核心结构体只有两个,分别是 call
和 Group
。其中 call
代表一个正在进行或已完成的函数调用,而 Group
则表示一类工作,形成一个命名空间,在该命名空间中可以执行具有重复抑制的工作单元。call
和 Group
的定义如下:
1 | // call is an in-flight or completed singleflight.Do call |
其中 Group
代表一个 singleflight
的命名空间,它有两个字段,mu
和 m
。mu
是一个互斥锁,用于保护 m
的并发读写;m
是一个懒初始化的 map[string]*call
,用于存储正在进行或已完成的函数调用(懒加载会在后面 singleflight.Do
方法的源码中看到)。
而 call
作为一个仅包内可见的结构体,包含了 sync.WaitGroup
、val
、err
、dups
和 chans
等字段。它的作用是表示一个正在进行或已完成的 fn
函数调用。而 chans
表示一个 Result
类型的通道切片,这里通道是单向的,只能写入不能读区(send-only)。至于 Result
则是 singleflight
中的一个结构体,包含的 val
和 err
字段,分别表示函数调用的返回值和错误信息:
1 | // Result holds the results of Do, so they can be passed |
singleflight.Do 方法
Group类型有三个方法,分别是 Do
、DoChan
和 Forget
。其中 Do
方法是 singleflight
的核心方法,用于执行一个函数并返回结果。它的定义如下:
1 | // Do executes and returns the results of the given function, making |
Do
方法的参数包括一个 key
字符串和一个函数 fn
,它会执行 fn
函数并返回结果。方法的返回值包括 v
、err
和 shared
,其中 v
是函数调用的返回值,err
是错误信息,shared
表示是否有多个调用者共享同一个结果。
进入方法后,首先获取 mu
锁,然后检查 m
是否为 nil
,如果是,则初始化 m
。这就是前面说的 map
懒加载的地方。
接着检查 m
中是否已经存在 key
,如果存在,则将 dups
加一,表示有一个新的调用者来请求同一个 key
的结果。然后释放锁,等待原始调用完成(c.wg.Wait()
),并返回原始调用的结果。这里的返回调用结果,是从 call
结构体中获取的,c.val
和 c.err
分别表示函数调用的返回值和错误信息。注意这里判断了 err
的错误类型,如果是 panicError
类型的错误,则会抛出异常;如果是 errGoexit
,则会调用 runtime.Goexit()
直接退出当前协程。其中 panicError
和 errGoexit
是 singleflight
中定义的两个错误类型,分别表示函数调用中的异常和协程退出的错误:
1 | // errGoexit indicates the runtime.Goexit was called in |
这些错误对象的生成和处理会在后续 singleflight.doCall
方法中看到。而 singleflight.doCall
方法的执行则是在之前检查 m
中不存在 key
的情况下进行的。如果不存在,则创建一个新的 call
对象 c
,并将其添加到 m
中。然后释放锁,调用 g.doCall(c, key, fn)
方法执行函数调用。
那么现在可以整理出 Do
方法的执行流程:
- 获取
mu
锁,检查m
是否为nil
,如果是,则初始化m
。 - 检查
m
中是否存在key
,如果存在,则将dups
加一,释放锁,等待原始调用完成,并返回原始调用的结果。 - 如果不存在,则创建一个新的
call
对象c
,并将其添加到m
中。然后释放锁,调用g.doCall(c, key, fn)
方法执行fn
函数调用。 fn
函数调用完成后,将结果存储到c.val
和c.err
中,并返回fn
函数的执行结果。同时如果有其他调用者在等待fn
函数的结果(c.wg.Wait()
),则会通知所有等待的调用者。
至此,Do
方法的执行流程就完成了,而对于如何捕获异常和处理错误以及如何通知等待的调用者,则是在 singleflight.doCall
方法里实现的。
singleflight.doCall 方法
doCall
方法是 singleflight
中的一个私有方法,用于执行函数调用并处理结果。它的定义如下:
1 | // doCall handles the single call for a key. |
第一眼看上去可能会觉得 doCall
方法的实现有些复杂,实际上逐步分析这个方法的实现后,会发现它的核心逻辑是通过 defer
语句来处理函数调用的异常和结果通知。
首先看到 doCall
方法的参数包括一个 call
对象 c
、一个 key
字符串和一个函数 fn
。方法的返回值没有,所有的结果都通过 call
对象 c
来传递。
根据 go
语言的 defer
语句的特性是先入后出、延迟执行,所以先看到下面一个匿名函数:
1 | func() { |
首先运行 c.val, c.err = fn()
来执行函数 fn
,并将返回值存储到 c.val
和 c.err
中。然后将 normalReturn
设置为 true
,表示函数调用正常返回。并且此时 defer
语句中也因为 normalReturn
为 true
,所以不会执行其他代码。但是当 fn()
函数执行过程中发生了 panic
,则此时就不会执行 normalReturn = true
,而是会停止运行该匿名函数的执行直接进入 defer
语句中,而由于此时 normalReturn
为 false
,所以会执行 defer
语句中的 if r := recover(); r != nil
语句来捕获异常,并将异常信息存储到 c.err
中。这里的 newPanicError(r)
是一个函数,用于创建一个新的 panicError
对象,包含异常信息和堆栈信息。它的实现如下:
1 | func newPanicError(v interface{}) error { |
debug.Stack()
函数用于获取当前协程的堆栈信息,返回一个字节切片。然后通过 bytes.IndexByte
函数查找第一个换行符的位置,并将其无效部分去掉,最后返回一个新的 panicError
对象。之前在看 singleflight.Do
函数中有一个错误类型断言 c.err.(*panicError)
,错误信息就是在这里通过调用 newPanicError
创建并赋值给 c.err
的。
匿名函数执行完成后,代码会开始执行这里:
1 | if !normalReturn { |
normalReturn
和 recovered
都是函数一开始定义的变量,normalReturn
用于标记函数是否正常返回,在前面的匿名函数中已经被标记了,至于这里根据的 if !normalReturn
判断是否需要将 recovered
设置为 true
的原因,则需要看源码,现在看到最开始的 defer
语句,也就是 doCall
函数最后执行的代码:
1 | // use double-defer to distinguish panic from runtime.Goexit, |
首先来分析:为什么需要两个 defer
语句?为什么不可以只用 normalReturn
来判断是否是 panic
,而需要用 recovered
来判断?
上面的问题其实可以通过分析源码来理解。首先根据注释(use double-defer to distinguish panic from runtime.Goexit
)可以知道两个 defer
语句是用来区分 panic
和 runtime.Goexit
。而在 Go
语言中,panic
和 runtime.Goexit
都会导致协程退出。
根据注释的这个思路具体分析上面的代码,第一个 defer
语句执行时,有几种情况: singleflight.doCall
函数正常返回和 runtime.Goexit
正常退出。
- 如果是
runtime.Goexit
导致singleflight.doCall
函数退出,则不会执行if !normalReturn
的判断,自然也不会对变量recovered
进行赋值,那么这种情况就需要把c.err
设置为errGoexit
,表示函数协程退出的原因是因为程序正常退出。这里的errGoexit
是一个错误对象,表示函数是因为整个程序退出而被停止运行协程,而不是因为panic
导致的异常。 - 如果
singleflight.doCall
函数正常返回,则会根据normalReturn
的值来判断是否需要将recovered
设置为true
。如果normalReturn
为false
,则表示函数调用过程中发生了panic
,此时会将recovered
设置为true
,表示已经捕获到异常。然后在第二个defer
语句中会判断c.err
是否是panicError
类型,如果是,则会将其抛出;如果不是,则表示函数调用正常返回。
然后再看这个函数中其余的细节,代码基于读写锁的 mu
锁来保护 m
的并发读写,使用 defer g.mu.Unlock()
来释放锁。通过调用 c.wg.Done()
来通知等待的调用者,表示函数调用已经完成。接着判断 g.m[key] == c
,如果是,则删除 m
中的 key
,表示函数调用已经完成。这里删除 m
中的 key
是因为该函数已经完成了,后续如果有其他调用者请求同一个 key
的结果,则会重新创建一个新的 call
对象 c
,并重新运行 fn
函数。这里如果不删除 m
中的 key
,则会导致后续的调用者无法获取到新的结果。
后面的代码则是判断 c.err
的类型,如果是 panicError
类型,则会重新使用 panic
抛出异常;如果是 errGoexit
,则表示函数调用正常退出,不需要再次调用 runtime.Goexit()
。而需要注意的地方在于无论是 panic
还是正常运行得到结果,都要判断 c.chans
的长度,如果大于 0
,则需要将错误信息或者结果通过 chans
通道发送给所有等待的调用者。这是因为前面使用的不是 singleflight
的 Do
方法,而是 DoChan
方法。
DoChan
方法的定义和分析暂且按下不表,这里最后来总结一下 doCall
方法的执行流程:
- 定义两个变量
normalReturn
和recovered
,分别用于标记函数是否正常返回和是否捕获到异常。 - 执行函数
fn
,并将返回值存储到c.val
和c.err
中。 - 处理函数
fn
执行的结果或者异常:- 如果函数正常返回,则将
normalReturn
设置为true
。 - 如果函数执行过程中发生了
panic
,则捕获异常并将其存储到c.err
中。
- 如果函数正常返回,则将
- 通过
defer
语句来区分fn
函数的正常返回、panic
异常和runtime.Goexit
正常退出,并设置相应的错误信息。 - 根据错误信息的类型,分发
fn
函数的结果或者异常给所有等待的调用者。
这段代码中双层 defer
的设计非常值得学习,第一层 defer
用于处理函数调用的异常和结果,第二层 defer
用于处理 runtime.Goexit
的异常。
singleflight.DoChan 方法
了解完 singleflight.Do
和 singleflight.doCall
方法后,接下来就可以看 DoChan
方法了。DoChan
方法的定义如下:
1 | // DoChan is like Do but returns a channel that will receive the |
DoChan
方法的参数和返回值与 Do
方法类似,只不过返回值是一个通道 ch
,而不是函数调用的结果。方法的实现逻辑与 Do
方法类似,主要区别在于 DoChan
方法会将通道 ch
添加到 call
对象 c
的 chans
切片中,而不是直接返回函数调用的结果。
这样做的好处是可以将函数调用的结果通过通道发送给所有等待的调用者,而不是直接返回结果。这样可以避免在函数调用过程中阻塞主线程,提高并发性能。
singleflight.Forget 方法
还剩下一个方法 Forget
,它的定义如下:
1 | // Forget tells the singleflight to forget about a key. Future calls |
代码很简单,Forget
方法用于删除 m
中的 key
,表示忘记该 key
的结果。这样后续的调用者请求同一个 key
的结果时,会重新执行函数 fn
。这个方法可以用于清除缓存或者重置状态。
这里很显然该方法并不应该被频繁调用,因为它会导致 singleflight
的缓存失效,因为如果频繁使其失效,那么 singleflight
的意义就不大了。一般情况下,Forget
方法应该在以下情况下使用:
- 当调用超时时,可以调用
Forget
方法来清除缓存。因为如果某个函数执行时间过长,业务已经不需要该结果或者已经需要返回结果为超时了,那么就可以调用Forget
方法来清除缓存。 - 当调用结果有问题时,可以调用
Forget
方法来清除缓存。因为如果某个函数执行的结果有问题,那么就可以调用Forget
方法来清除缓存,防止后续的调用者获取到错误的结果。 - 当调用的函数需要重新执行时,可以调用
Forget
方法来清除缓存。因为如果某个函数有一个重试机制,那么就需要对同一个key
的结果进行清除,来方式其他调用复用之前的结果。
结语
根据 singleflight
的实现原理和源码分析,可以看出它的设计非常巧妙,充分利用了 Go
语言的并发特性和 defer
语句的特性。通过 singleflight
,可以有效地减少重复请求的开销,提高系统的性能和稳定性。
其本质是合并重复的并发调用,从而避免重复的计算和资源浪费。它的实现原理和设计思路可以借鉴到其他并发场景中,比如缓存、数据库连接池等。所以据此,不仅仅在解决缓存击穿和缓存雪崩的问题上有用,在其他场景诸如:
远程过程调用、定时任务去重、消费者处理消息去重等场景中也可以使用 singleflight
来实现类似的功能。