這篇文章是回答交流時一個老哥的問題,跟go的context相關內容,上一篇(https://www.cnblogs.com/dojo-...)講了一些基礎知識,這一篇繼續在併發處理上進行研究。主要是Go Context的使用、原理。因為時間和精力有限,所以文章中大量引用相關資料中的內容以及圖片,再此致敬。
Go Context
React中Context主要用來跨組件傳遞一些數據,Go中Context其中一個作用也跟傳遞數據有關,不過是在goroutine中相互傳遞數據;Context的另一個作用在於可以便捷關閉被創建出來的goroutine。
在實際中當服務器端收到一個請求時,很可能需要發送幾個請求去請求其他服務的數據,由於Go 語法上的同步阻塞寫法,我們一般會創建幾個goroutine併發去做一些事情;那麼這時候很可能幾個goroutine之間需要共享數據,還有當request被取消時,創建的幾個goroutine也應該被取消掉。那麼這就是Go Context的用武之地。
關於協程泄露:
一般main函數是主協程,主協程執行完畢後子協程也會被銷燬;但是對於服務來説,主協程不會執行完畢就退出。
所以如果每個請求都自己創建協程,而協程有沒有受到完畢信息結束信息,可能處於阻塞狀態,這種情況下才會產生協程泄露
context包中核心是Context接口:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
- Deadline 方法返回當前Context被取消的時間,也就是完成工作的截止時間(deadline);
- Done方法需要返回一個channel,這個Channel會在當前工作完成或者上下文被取消之後關閉,可以在子goroutine中利用select進行監控,來回收子goroutine;多次調用Done方法會返回同一個Channel;
// Done is provided for use in select statements:
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
// See https://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancellation.
- Err方法會返回當前Context結束的原因,它只會在Done返回的Channel被關閉時才會返回空值:
- 如果當前Context被取消就會返回Canceled錯誤;
- 如果當前Context超時就會返回DeadlineExceeded錯誤;
- Value 方法會從Context中返回鍵對應的值,對於同一個上下文來説,多次調用Value並傳入相同的Key會返回相同的結果,該方法僅用於傳遞跨API和進程間跟請求域的數據。
// // Package user defines a User type that's stored in Contexts.
// package user
// import "context"
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
ctx.Value(userKey).(*User)這裏是Go語言中的類型斷言(http://c.biancheng.net/view/4...)
value, ok := x.(T)
x 表示一個接口的類型,T 表示一個具體的類型(也可為接口類型)
該斷言表達式會返回 x 的值(也就是 value)和一個布爾值(也就是 ok),可根據該布爾值判斷 x 是否為 T 類型:
如果 T 是具體某個類型,類型斷言會檢查 x 的動態類型是否等於具體類型 T。如果檢查成功,類型斷言返回的結果是 x 的動態值,其類型是 T。
如果 T 是接口類型,類型斷言會檢查 x 的動態類型是否滿足 T。如果檢查成功,x 的動態值不會被提取,返回值是一個類型為 T 的接口值。
無論 T 是什麼類型,如果 x 是 nil 接口值,類型斷言都會失敗。
在context包中Context一個接口有四個具體實現和六個函數:
emptyCtx
emptyCtx本質是一個整型類型,他對Context接口的實現,非常簡單,其實是什麼也沒做,都是一堆空方法:
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key any) any {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
這裏的String方法挺有意思,因為下面中可以看到background和todo都是一個emptyContext所以,這裏直接case進行對比background和todo;
Background和TODO這兩個公共方法是返回background和todo;官方建議Background用做頂層的context,todo看起來用來佔位使用,不過實現來説兩個沒區別
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}
cancelCtx
通過WithCancel來創建的就是cancelCtx,WithCancel返回一個ctx和cancel方法,通過調用cancel方法,可以將Context取消,來控制協程,具體看下面例子:
在這個例子中,通過defer調用cancel,在FixLeakingByContext函數結束時去掉context,在CancelByContext中配合select和context的done方式來使用,可以避免協程資源沒有被回收引起的內存泄露。
func FixLeakingByContex() {
//創建上下文用於管理子協程
ctx, cancel := context.WithCancel(context.Background())
//結束前清理未結束協程
defer cancel()
ch := make(chan int)
go CancelByContext(ctx, ch)
go CancelByContext(ctx, ch)
go CancelByContext(ctx, ch)
// 隨機觸發某個子協程退出
ch <- 1
}
func CancelByContext(ctx context.Context, ch chan (int)) int {
select {
case <-ctx.Done():
//fmt.Println("cancel by ctx.")
return 0
case n := <-ch :
return n
}
}
看下WithCancel的源碼:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
// WithCancel通過一個父級Context來創建出一個cancelCtx
c := newCancelCtx(parent)
// 調用propagateCancel根據父級context的狀態來關聯cancelCtx的cancel行為
propagateCancel(parent, &c)
// 返回c和一個方法,方法中調用c.cancel並傳遞Canceled變量
return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
var Canceled = errors.New("context canceled")
WithCancel通過一個父級Context來創建出一個cancelCtx,然後調用propagateCancel根據父級context的狀態來關聯cancelCtx的cancel行為(感覺這裏不應該叫propagate,冒泡一般理解是自下向上,這個函數明顯是自下向上,應該叫cascade更為合理一些)。隨後返回c和一個方法,方法中調用c.cancel並傳遞Canceled變量(其實是一個error實例);
cancelCtx是WidthDeadline和WidthTimeout的基石,所以cancelCtx的實現相對複雜,我們重點講解。
newCancelCtx方法可以看到是創建了一個cancelCtx實例
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
我們也看下cancelCtx的定義:
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context // 內嵌結構體
mu sync.Mutex // protects following fields
done atomic.Value // of chan struct{}, created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
cancelCtx有一個內嵌的Context類型,實際存儲的都是父級上下文對象,還有四個獨立的字段:
- mu:一個互斥量,用來加鎖保證某些操作的線程安全性
- done:atomic.Value一個可以對任意類型進行原子型操作的結構;提供Load和Store方法;看Go源碼這裏存的是一個struct{}類型的channel
- children:一個key為canceler值為struct{}的map類型;
- err:存放error的字段
這裏的cancelder是一個接口,代表可以直接被cancel的Context類型,基本指的是 cancelCtx和 timerCtx兩種context,也被他倆實現
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
下面看下propagateCancel,據父級context的狀態來關聯cancelCtx的cancel行為
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
// 如果父元素的Done方法返回為空,也就是説父context是emptyCtx
// 直接返回,因為父上下文不會做任何處理
done := parent.Done()
if done == nil {
return // parent is never canceled
}
// 如果父上下文不是emptyCtx類型,使用select來判斷一下父上下文的done channel是不是已經被關閉掉了
// 關閉則調用child的cancel方法
// select其實會阻塞,但這裏給了一個default方法,所以如果父上下文的done channel沒有被關閉則繼續之心後續代碼
// 這裏相當於利用了select的阻塞性來做if-else判斷
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
// parentCancelCtx目的在於尋找父上下文中最底層的cancelCtx,因為像timerCtx等會內嵌cancelCtx
if p, ok := parentCancelCtx(parent); ok {
// 如果找的到,就把最內層的cancelCtx跟child的設置好關聯關係
// 這裏要考慮到多線程環境,所以是加鎖處理
p.mu.Lock()
if p.err != nil {
// 如果祖先cancelCtx已經被取消了,那麼也調用child的cancel方法
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 這裏設置內層cancelCtx與child的父子層級關係
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 這裏代表沒有找到祖先cancelCtx,單啓了一個協程來進行監聽(因為select是阻塞的),如果父上下文的done 關閉了,則子上下文取消
// goroutines在別的地方代碼中沒有使用,不知道為什麼要做增加操作,看源碼英文解釋也是為了測試使用
// 單獨的協程會在阻塞完畢後被GC回收,不會有泄露風險
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
裏面調用了一個parentCancelCtx函數,這個函數比較晦澀,市面上資料也還沒有人去仔細研究,這裏我來講解一下;
這個函數中最重要的就是12行,通過cancelCtxKey獲取最近的內嵌cancelCtx;然後讓在propagateCancel中設置內嵌cancelCtx與child的關聯關係;
同時這個函數也考慮了幾種情況,如果parent的done已經是closedchan或者是nil那麼沒必要去拿內層的cancelCtx來建立層級關係,直接用parent本身與child做好關聯cancel即可。這是9-11行代碼乾的事。
16行-19行,看源碼解釋是如果這個內嵌cancelCtx可能加了一些自定義方法,比如複寫了Done或者cancel,那麼它就不是這裏的timerCtx、cancelCtx或者valueCtx,這種情況下用户自己負責處理;放到propagateCancel這個函數中就是把parent和child直接關聯起來,不建立層級關係。及時子child自己cancel也不去跟parent的children有什麼關聯。
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
done := parent.Done()
if done == closedchan || done == nil {
return nil, false
}
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}
那麼這裏就有了一個問題,propagateCancel函數中一定建立parent和child的children關係麼?我理解是不用的,因為這個else部分代碼我理解完全可以實現父級上下文結束後,child也進行取消;我猜這裏儘量建立children的map關係,是如果不這麼做就要起一個goroutine來處理,相當於一個監護線程,goroutine資源的消耗以及調度成本,比單純的children層級關係更大,所以這裏盡力使用map結構來建立層級關係。這也可以看到作者在寫代碼時候還是很花心思去考量各種情況的。
} else {
// 這裏代表沒有找到祖先cancelCtx,單啓了一個協程來進行監聽(因為select是阻塞的),如果父上下文的done 關閉了,則子上下文取消
// goroutines在別的地方代碼中沒有使用,不知道為什麼要做增加操作,看源碼英文解釋也是為了測試使用
// 單獨的協程會在阻塞完畢後被GC回收,不會有泄露風險
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
接下來看下cancelCtx中Value、Done、Err以及私有方法cancel的實現;
Value方法
源碼如下
func (c *cancelCtx) Value(key any) any {
if key == &cancelCtxKey {
return c
}
return value(c.Context, key)
}
首先要介紹下cancelCtxKey,這是一個context包中的私有變量,當對cancelCtx調用Value方法並用這個key作為參數時,返回cancelCtx本身;
如果沒有找到則是調用的context包中的私有方法value,來在父級上下文中key對應的值;
這個方法首先進行類型斷言,判斷Context是否是valueCtx、cancelCtx、timerCtx以及emptyCtx等;根據不同的類型做不同處理,比如cancelCtx和timerCtx先進行cancelCtxKey判斷,emptyCtx直接返回nil,valueCtx則判斷是否是自己實例化時候傳入的key,否則就去自己的內層context也就是parent層級上冒泡獲取對應的值。
func value(c Context, key any) any {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case *emptyCtx:
return nil
default:
return c.Value(key)
}
}
}
Done方法
func (c *cancelCtx) Done() <-chan struct{} {
// 返回atomic.Value中存儲的值
d := c.done.Load()
if d != nil {
// atomic.Value類型的Load方法返回的是ifaceWords類型,所以這裏是利用了類型斷言
// 把ifaceWords類型轉換為 struct類型的chan
return d.(chan struct{})
}
// 這裏是併發場景要考慮的問題,因為會存在多個線程併發進行的過程,所以不一定哪個goroutine就對c.done進行了修改
// 所以這裏不能直接像單線程一樣,if d!=nil else。。。;首先得搶鎖。
c.mu.Lock()
defer c.mu.Unlock()
d = c.done.Load()
// 上面搶鎖的過程可能搶到了,也可能沒搶到,所以到這裏是搶到了鎖,但是c.done未必還是nil;
// 所以這裏要再次做判斷
if d == nil {
d = make(chan struct{})
c.done.Store(d)
}
return d.(chan struct{})
}
看到上面鎖的過程,發現併發情況的處理要比js這種單線程考慮的多得多。併發對一個變量的處理不能簡單的if-else;要結合鎖、CAS、原子操作一起考慮(對於atomic.Value中的ifaceWords的部分可以看這篇文章:https://www.cnblogs.com/dojo-...中原子操作部分)。
Err方法
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
這個方法比較簡單只是獲取了cancelCtx的err屬性,這個屬性在cancel中會會被設置。
cancel方法
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
// 因為後面要對c.err和c.done進行更新,所以這裏要搶鎖
c.mu.Lock()
if c.err != nil {
// if這部分放到鎖的外部是否可以?看起來是可以的,但是如果放到外面,if判斷不通過此時c.err為nil
// 接着進行搶鎖,那麼在搶到鎖之後仍然要對c.err判斷是否還是nil,才能進行更新
// 因為在搶鎖過程中,可能c.err已經被某個協程修改了
// 所以把這部分放到鎖之後是合理的。
c.mu.Unlock()
return // already canceled
}
c.err = err // 賦值
d, _ := c.done.Load().(chan struct{})
// 讀取done的值
if d == nil {
// 如果done為nil,就把一個內部的closedchan存入c.done中;
// closedchan是一個channel類型,在context包的init函數中就會把它close掉
c.done.Store(closedchan)
} else {
close(d)
}
// 遍歷c的children調用他們的cancel;
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
// 這部分沒有在鎖的代碼中,是因為函數中會自己加鎖?
if removeFromParent {
removeChild(c.Context, c)
}
}
代碼最後調用removeChild方法,這部分為什麼沒在c.mu鎖中,我猜是因為這個函數的代碼自己會進行鎖的處理。
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
可以看到代碼中的鎖部分,是在第7行開始的,那麼為什麼parentCancelCtx沒有被包含在鎖中,這裏猜測下,因為parentCancelCtx的主要目的是為了獲取父級上下文內層的cancelCtx,而這個值是在實例化時候就已經確定的,這裏只是讀取所以可以不用放在互斥鎖的臨界區代碼中,避免性能浪費。
接下來就是p.mu來搶鎖,完成對層級結構的接觸。
timerCtx
WithTimeout和WithDeadline創建的都是timerCtx,timerCtx內部內嵌了cancelCtx;
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
因為內嵌了cancelCtx,而cancelCtx實現了Done、Value、Err以及cancel(私有)方法,所以timerCtx上也可以直接調用這幾個方法(http://c.biancheng.net/view/7...);cancelCtx並未實現Deadline方法,但是emptyCtx實現了,如果他的父級上下文是emptyCtx那麼cancelCtx也可以調用Deadline方法。
看完cancelCtx的方法之後,對比起來timerCtx的方法都比較簡單,不做過多解釋
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
c.deadline.String() + " [" +
time.Until(c.deadline).String() + "])"
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
可以看到cancel方法中先調用了內嵌的cancelCtx的cancel方法;然後利用cancelCtx的互斥鎖搶鎖來對c.timer進行操作修改;cancel方法第13-16行需要注意,因為withDeadline在創建時把parent和timerCtx建立了層級關係,所以這裏根據條件進行移除操作。
下面來看下withDeadline函數:
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
// 如果parent的deadline小於當前時間,直接創建cancelCtx,裏面會調用propagateCancel方法
// 來根據父上下文狀態進行處理
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
// 創建timerCtx,這裏可以看到cancelCtx是私有變量,而cancelCtx中的Context字段是公有變量
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 設置層級取消關聯
propagateCancel(parent, c)
dur := time.Until(d)
// 如果已經超時直接取消
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
// 如果沒有超時並且沒有被調用過cancel,那麼設置timer,超時則調用cancel方法;
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
瞭解上面內容之後,WithTimeout就很簡單了,只是調用了WidthDeadline方法
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
valeCtx
這個結構體相對簡單,有一個Context公共變量,一個任意類型的key和任意類型的any:
type valueCtx struct {
Context
key, val any
}
withValue方法也比較簡單,這裏就不做過多介紹
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
還有一個Value方法:
如果key與WithValue調用時相同,則返回對應的val,否則進入value方法,在內嵌的Context中查找key對應的值,這個方法上面介紹過,根據Context類型先做一些類型判斷,來判斷一些關鍵的key如cancelCtxKey,不然繼續在內嵌Context中查找。
func (c *valueCtx) Value(key any) any {
if c.key == key {
return c.val
}
return value(c.Context, key)
}
參考資料
本文大量引用了相關參考資料的圖片和語言,尤其是CPU硬件部分圖片大部分來自於小林coding(https://xiaolincoding.com/os/...)的圖片。版權問題請與我聯繫,侵刪。
深入理解Go Context:https://article.itxueyuan.com...
context源碼:https://github.com/golang/go/...
聊一聊Go的Context上下文:https://studygolang.com/artic...
go context詳解:https://www.cnblogs.com/juanm...
Go語言Context(上下文):http://c.biancheng.net/view/5...
atomic原理以及實現:https://blog.csdn.net/u010853...
atomic前世今生:https://blog.betacat.io/post/...