博客 / 詳情

返回

GO GMP調度實現原理 5w字長文史上最全

1 Runtime簡介

Go語言是互聯網時代的C,因為其語法簡潔易學,對高併發擁有語言級別的親和性。而且不同於虛擬機的方案。Go通過在編譯時嵌入平台相關的系統指令可直接編譯為對應平台的機器碼,同時嵌入Go Runtime,在運行時實現自身的調度算法和各種併發控制方案,避免進入操作系統級別的進程/線程上下文切換,以及通過原子操作、自旋、信號量、全局哈希表、等待隊列多種技術避免進入操作系統級別鎖,以此來提升整體性能。

0

Go的runtime是與用户代碼一起打包在一個可執行文件中,是程序的一部分,而不是向Java需要單獨安裝,與程序獨立。所以用户代碼與runtime代碼在執行時沒有界限都是函數調用。在Go語言中的關鍵字編譯時會變成runtime中的函數調用。

0

Go Runtime核心主要涉及三大部分:內存分配、調度算法、垃圾回收;本篇文章我們主要介紹GMP調度原理。關於具體應該叫GPM還是GMP,我更傾向於成為GMP,因為在runtime代碼中經常看到如下調用:

1 buf := &getg().m.p.ptr().wbBuf

其中getg代表獲取當前正在運行的g即goroutine,m代表對應的邏輯處理器,p是邏輯調度器;所以我們還是稱為GMP。

(以上部分圖文來自:https://zhuanlan.zhihu.com/p/...)

2 GMP概覽

下面這個圖雖然有些抽象(不如花花綠綠的圖片),確是目前我看到對整個調度算法設計的重要概念覆蓋最全的。

 1                             +-------------------- sysmon ---------------//------+ 
 2                             |                                                   |
 3                             |                                                   |
 4                +---+      +---+-------+                   +--------+          +---+---+
 5 go func() ---> | G | ---> | P | local | <=== balance ===> | global | <--//--- | P | M |
 6                +---+      +---+-------+                   +--------+          +---+---+
 7                             |                                 |                 | 
 8                             |      +---+                      |                 |
 9                             +----> | M | <--- findrunnable ---+--- steal <--//--+
10                                    +---+ 
11                                      |
12                                    mstart
13                                      |
14               +--- execute <----- schedule 
15               |                      |   
16               |                      |
17               +--> G.fn --> goexit --+ 

我們來看下其中的三大主要概念:

  • G:Groutine協程,擁有運行函數的指針、棧、上下文(指的是sp、bp、pc等寄存器上下文以及垃圾回收的標記上下文),在整個程序運行過程中可以有無數個,代表一個用户級代碼執行流(用户輕量級線程);
  • P:Processor,調度邏輯處理器,同樣也是Go中代表資源的分配主體(內存資源、協程隊列等),默認為機器核數,可以通過GOMAXPROCS環境變量調整
  • M:Machine,代表實際工作的執行者,對應到操作系統級別的線程;M的數量會比P多,但不會太多,最大為1w個。

其中G分為三類:

  • 主協程,用來執行用户main函數的協程
  • 主協程創建的協程,也是P調度的主要成員
  • G0,每個M都有一個G0協程,他是runtime的一部分,G0是跟M綁定的,主要用來執行調度邏輯的代碼,所以不能被搶佔也不會被調度(普通G也可以執行runtime_procPin禁止搶佔),G0的棧是系統分配的,比普通的G棧(2KB)要大,不能擴容也不能縮容
  • sysmon協程,sysmon協程也是runtime的一部分,sysmon協程直接運行在M不需要P,主要做一些檢查工作如:檢查死鎖、檢查計時器獲取下一個要被觸發的計時任務、檢查是否有ready的網絡調用以恢復用户G的工作、檢查一個G是否運行時間太長進行搶佔式調度。

M分為兩類:

  • 普通M,用來與P綁定執行G中任務
  • m0:Go程序是一個進程,進程都有一個主線程,m0就是Go程序的主線程,通過一個與其綁定的G0來執行runtime啓動加載代碼;一個Go程序只有一個m0
  • 運行sysmon的M,主要用來運行sysmon協程。

剛才説道P是用來調度G的執行,所以每個P都有自己的一個G的隊列,當G隊列都執行完畢後,會從global隊列中獲取一批G放到自己的本地隊列中,如果全局隊列也沒有待運行的G,則P會再從其他P中竊取一部分G放到自己的隊列中。而調度的時機一般有三種:

  • 主動調度,協程通過調用runtime.Goshed方法主動讓渡自己的執行權利,之後這個協程會被放到全局隊列中,等待後續被執行
  • 被動調度,協程在休眠、channel通道阻塞、網絡I/O堵塞、執行垃圾回收時被暫停,被動式讓渡自己的執行權利。大部分場景都是被動調度,這是Go高性能的一個原因,讓M永遠不停歇,不處於等待的協程讓出CPU資源執行其他任務。
  • 搶佔式調度,這個主要是sysmon協程上的調度,當發現G處於系統調用(如調用網絡io)超過20微秒或者G運行時間過長(超過10ms),會搶佔G的執行CPU資源,讓渡給其他協程;防止其他協程沒有執行的機會;(系統調用會進入內核態,由內核線程完成,可以把當前CPU資源讓渡給其他用户協程)

Go的協程調度與操作系統線程調度區別主要存在四個方面:

  • 調度發生地點:Go中協程的調度發生在runtime,屬於用户態,不涉及與內核態的切換;一個協程可以被切換到多個線程執行
  • 上下文切換速度:協程的切換速度遠快於線程,不需要經過內核與用户態切換,同時需要保存的狀態和寄存器非常少;線程切換速度為1-2微秒,協程切換速度為0.2微秒左右
  • 調度策略:線程調度大部分都是搶佔式調度,操作系統通過發出中斷信號強制線程切換上下文;Go的協程基本是主動和被動式調度,調度時機可預期
  • 棧大小:線程棧一般是2MB,而且運行時不能更改大小;Go的協程棧只有2kb,而且可以動態擴容(64位機最大為1G)

以上基本是整個調度器的概括,不想看原理的同學可以不用往下看了,下面會進行源碼級介紹;

3 GMP的源碼結構

源碼部分主要涉及三個文件:

1 runtime/amd_64.s 涉及到進程啓動以及對CPU執行指令進行控制的彙編代碼,進程的初始化部分也在這裏面
2 runtime/runtime2.go 這裏主要是運行時中一些重要數據結構的定義,比如g、m、p以及涉及到接口、defer、panic、map、slice等核心類型
3 runtime/proc.go 一些核心方法的實現,涉及gmp調度等核心代碼在這裏

這裏我們主要關心gmp中與調度相關的代碼;

3.1 G源碼部分

3.1.1 G的結構

先來看下g的結構定義:

 1 // runtime/runtime2.go
 2 type g struct {
 3    // 記錄協程棧的棧頂和棧底位置
 4    stack       stack   // offset known to runtime/cgo
 5    // 主要作用是參與一些比較計算,當發現容量要超過棧分配空間後,可以進行擴容或者收縮
 6    stackguard0 uintptr // offset known to liblink
 7    stackguard1 uintptr // offset known to liblink
 8 
 9    // 當前與g綁定的m
10    m         *m      // current m; offset known to arm liblink
11    // 這是一個比較重要的字段,裏面保存的一些與goroutine運行位置相關的寄存器和指針,如rsp、rbp、rpc等寄存器
12    sched     gobuf
13    syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
14    syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
15    stktopsp  uintptr // expected sp at top of stack, to check in traceback
16  
17    // 用於做參數傳遞,睡眠時其他goroutine可以設置param,喚醒時該g可以讀取這些param
18    param        unsafe.Pointer
19    // 記錄當前goroutine的狀態
20    atomicstatus uint32
21    stackLock    uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
22    // goroutine的唯一id
23    goid         int64
24    schedlink    guintptr
25    
26    // 標記是否可以被搶佔
27    preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
28    preemptStop   bool // transition to _Gpreempted on preemption; otherwise, just deschedule
29    preemptShrink bool // shrink stack at synchronous safe point
30 
31    // 如果調用了LockOsThread方法,則g會綁定到某個m上,只在這個m上運行
32    lockedm        muintptr
33    sig            uint32
34    writebuf       []byte
35    sigcode0       uintptr
36    sigcode1       uintptr
37    sigpc          uintptr
38    // 創建該goroutine的語句的指令地址
39    gopc           uintptr         // pc of go statement that created this goroutine
40    ancestors      *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
41    // goroutine函數的指令地址
42    startpc        uintptr         // pc of goroutine function
43    racectx        uintptr
44    waiting        *sudog         // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
45    cgoCtxt        []uintptr      // cgo traceback context
46    labels         unsafe.Pointer // profiler labels
47    timer          *timer         // cached timer for time.Sleep
48    selectDone     uint32         // are we participating in a select and did someone win the race?
49 }

跟g相關的還有兩個數據結構比較重要:

stack是協程棧的地址信息,需要注意的是m0綁定的g0是在進程被分配的系統棧上分配協程棧的,而其他協程棧都是在堆上進行分配的。

gobuf中保存了協程執行的上下文信息,這裏也可以看到協程切換的上下文信息極少;sp代表cpu的rsp寄存器的值,pc代表CPU的rip寄存器值、bp代表CPU的rbp寄存器值;ret用來保存系統調用的返回值,ctxt在gc的時候使用。

其中幾個寄存器作用如下:

  • SP:永遠指向棧頂位置
  • BP:某一時刻的棧頂位置,當新函數調用時,把當前SP地址賦值給BP、SP指向新的棧頂位置
  • PC:代表代碼經過編譯為機器碼後,當前執行的機器指令(可以理解為當前語句)
 1 // Stack describes a Go execution stack.
 2 // The bounds of the stack are exactly [lo, hi),
 3 // with no implicit data structures on either side.
 4 // goroutine協程棧的棧頂和棧底
 5 type stack struct {
 6    lo uintptr // 棧頂,低地址
 7    hi uintptr // 棧底,高地址
 8 }
 9 
10 // gobuf中保存了非常重要的上下文執行信息,
11 type gobuf struct {
12     // 代表cpu的rsp寄存器的值,永遠指向棧頂位置
13    sp   uintptr
14    // 代表代碼經過編譯為機器碼後,當前執行的機器指令(可以理解為當前語句)
15    pc   uintptr
16    // 指向所保存執行上下文的goroutine
17    g    guintptr
18    // gc時候使用
19    ctxt unsafe.Pointer
20    // 用來保存系統調用的返回值
21    ret  uintptr
22    lr   uintptr
23    // 某一時刻的棧頂位置,當新函數調用時,把當前SP地址賦值給BP、SP指向新的棧頂位置
24    bp   uintptr // for framepointer-enabled architectures
25 }

3.1.2 G的狀態

就像線程有自己的狀態一樣,goroutine也有自己的狀態,主要記錄在atomicstatus字段上:

 1 // defined constants
 2 const (
 3     // 代表協程剛開始創建時的狀態,當新創建的協程初始化後,為變為_Gdead狀態,_Gdread也是協程被銷燬時的狀態;
 4     // 剛創建時也被會置為_Gdead主要是考慮GC可以去用去掃描dead狀態下的協程棧
 5    _Gidle = iota // 0
 6     // 代表協程正在運行隊列中,等待被運行
 7    _Grunnable // 1
 8     // 代表當前協程正在被運行,已經被分配了邏輯處理的線程,即p和m
 9    _Grunning // 2
10     // 代表當前協程正在執行系統調用
11    _Gsyscall // 3
12     // 表示當前協程在運行時被鎖定,陷入阻塞,不能執行用户代碼
13    _Gwaiting // 4
14 
15    _Gmoribund_unused // 5
16     // 新創建的協程初始化後,或者協程被銷燬後的狀態
17    _Gdead // 6
18 
19    // _Genqueue_unused is currently unused.
20    _Genqueue_unused // 7
21     // 代表在進行協程棧掃描時發現需要擴容或者縮容,將協程中的棧轉移到新棧時的狀態;這個時候不執行用户代碼,也不在p的runq中
22    _Gcopystack // 8
23 
24     // 代表g被搶佔後的狀態
25    _Gpreempted // 9
26 
27     // 這幾個狀態是垃圾回收時涉及,後續文章進行介紹
28    _Gscan          = 0x1000
29    _Gscanrunnable  = _Gscan + _Grunnable  // 0x1001
30    _Gscanrunning   = _Gscan + _Grunning   // 0x1002
31    _Gscansyscall   = _Gscan + _Gsyscall   // 0x1003
32    _Gscanwaiting   = _Gscan + _Gwaiting   // 0x1004
33    _Gscanpreempted = _Gscan + _Gpreempted // 0x1009
34 )

這裏是利用常量定義的枚舉。

Go的狀態變更可以看下圖:

0

 3.1.3 G的創建

當我們使用go關鍵字新建一個goroutine時,編譯器會編譯為runtime中對應的函數調用(newproc,而go 關鍵字後面的函數成為協程的任務函數),進行創建,整體步驟如下:

  1. 用 systemstack 切換到系統堆棧,調用 newproc1 ,newproc1 實現g的獲取。
  2. 嘗試從p的本地g空閒鏈表和全局g空閒鏈表找到一個g的實例。
  3. 如果上面未找到,則調用 malg 生成新的g的實例,且分配好g的棧和設置好棧的邊界,接着添加到 allgs 數組裏面,allgs保存了所有的g。
  4. 保存g切換的上下文,這裏很關鍵,g的切換依賴 sched 字段。
  5. 生成唯一的goid,賦值給該g。
  6. 調用 runqput 將g插入隊列中,如果本地隊列還有剩餘的位置,將G插入本地隊列的尾部,若本地隊列已滿,插入全局隊列。
  7. 如果有空閒的p 且 m沒有處於自旋狀態 且 main goroutine已經啓動,那麼喚醒或新建某個m來執行任務。

 這裏對應的是newproc函數:

 1 func newproc(siz int32, fn *funcval) {
 2    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
 3    gp := getg()
 4    // 獲取調用者的指令地址,也就是調用newproc時又call指令壓棧的函數返回地址
 5    pc := getcallerpc()
 6    // systemstack的作用是切換到m0對應的g0所屬的系統棧
 7    // 使用g0所在的系統棧創建goroutine對象
 8    // 傳遞參數包括goroutine的任務函數、argp參數起始地址、siz是參數長度、調用方的pc指針
 9    systemstack(func() {
10       newg := newproc1(fn, argp, siz, gp, pc)
11       // 創建完成後將g放到創建者(某個g,如果是進程初始化啓動階段則為g0)所在的p的隊列中
12       _p_ := getg().m.p.ptr()
13       runqput(_p_, newg, true)
14 
15       if mainStarted {
16          wakep()
17       }
18    })
19 }

其中systemstack是一段彙編代碼,位於asm_amd64.s文件中,主要是寄存器指令的操作,筆者不懂彙編這裏先不做介紹。
 

newproc1是獲取newg的函數,主要步驟:

1、首先防止當前g被搶佔,綁定m

2、對傳入的參數佔用的內存空間進行對齊處理

3、從p的空閒隊列中獲取一個空閒的g,如果麼有就創建一個g,並在堆上創建協程棧,並設置狀態為_Gdead添加到全局allgs中

4、計算整體協程任務函數的參數空間大小,並設置sp指針

5、執行參數從getg的堆棧到newg堆棧的複製

6、設置newg的sched和startpc、gopc等跟上下文相關的字段值

7、設置newg狀態為runable並設置goid

8、接觸getg與m的防搶佔狀態

 代碼註釋如下:

  1 func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
  2     .....
  3     // 如果是初始化時候這個代表g0
  4    _g_ := getg()
  5 
  6    if fn == nil {
  7       _g_.m.throwing = -1 // do not dump full stacks
  8       throw("go of nil func value")
  9    }
 10    // 使_g_.m.locks++,來防止這個時候g對應的m被搶佔
 11    acquirem() // disable preemption because it can be holding p in a local var
 12    // 參數的地址,下面一句目的是為了做到內存對齊
 13    siz := narg
 14    siz = (siz + 7) &^ 7
 15 
 16    // We could allocate a larger initial stack if necessary.
 17    // Not worth it: this is almost always an error.
 18    // 4*PtrSize: extra space added below
 19    // PtrSize: caller's LR (arm) or return address (x86, in gostartcall).
 20    if siz >= _StackMin-4*sys.PtrSize-sys.PtrSize {
 21       throw("newproc: function arguments too large for new goroutine")
 22    }
 23 
 24    _p_ := _g_.m.p.ptr()
 25    newg := gfget(_p_) // 首先從p的gfree隊列中看看有沒有空閒的g,有則使用
 26    if newg == nil {
 27        // 如果沒找到就使用new關鍵字來創建一個g並在堆上分配棧空間
 28       newg = malg(_StackMin)
 29       // 將newg的狀態設置為_Gdead,因為這樣GC就不會去掃描一個沒有初始化的協程棧
 30       casgstatus(newg, _Gidle, _Gdead)
 31       // 添加到全局的allg切片中(需要加鎖訪問)
 32       allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
 33    }
 34    // 下面是檢查協程棧的創建情況和狀態
 35    if newg.stack.hi == 0 {
 36       throw("newproc1: newg missing stack")
 37    }
 38 
 39    if readgstatus(newg) != _Gdead {
 40       throw("newproc1: new g is not Gdead")
 41    }
 42    // 計算運行空間大小並進行內存對齊
 43    totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
 44    totalSize += -totalSize & (sys.StackAlign - 1)               // align to StackAlign
 45    // 計算sp寄存器指針的位置
 46    sp := newg.stack.hi - totalSize
 47    // 確定參數入棧位置
 48    spArg := sp
 49    .........
 50    if narg > 0 {
 51        // 將參數從newproc函數的棧複製到新的協程的棧中,memove是一段彙編代碼
 52        // 從argp位置挪動narg大小的內存到sparg位置
 53       memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
 54       // 因為涉及到從棧到堆棧上的複製,go在垃圾回收中使用了三色標記和寫入屏障等手段,所以這裏要考慮屏障複製
 55       // 目標棧可能會有垃圾存在,所以設置屏障並且標記為灰色
 56       if writeBarrier.needed && !_g_.m.curg.gcscandone { // 如果啓用了寫入屏障並且源堆棧為灰色(目標始終為黑色),則執行屏障複製。
 57          f := findfunc(fn.fn)
 58          stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
 59          if stkmap.nbit > 0 {
 60             // We're in the prologue, so it's always stack map index 0.
 61             bv := stackmapdata(stkmap, 0)
 62             bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
 63          }
 64       }
 65    }
 66     // 把newg的sched結構體成員的所有字段都設置為0,其實就是初始化
 67    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
 68    newg.sched.sp = sp
 69    newg.stktopsp = sp
 70    // pc指針表示當newg被調度起來時從這個位置開始執行
 71    // 這裏是先設置為goexit,在gostartcallfn中會進行處理,更改sp為這裏的pc,將pc改為真正的協程任務函數fn的指令位置
 72    // 這樣使得任務函數執行完畢後,會繼續執行goexit中相關的清理工作
 73    newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
 74    newg.sched.g = guintptr(unsafe.Pointer(newg)) // 保存當前的g
 75    gostartcallfn(&newg.sched, fn) // 在這裏完成g啓動時所有相關上下文指針的設置,主要為sp、pc和ctxt,ctxt被設置為fn
 76    newg.gopc = callerpc // 保存newproc的pc,即調用者創建時的指令位置
 77    newg.ancestors = saveAncestors(callergp)
 78    // 設置startpc為任務函數,主要用於函數調用棧的trackback和棧收縮工作
 79    // newg的執行開始位置並不依賴這個字段,而是通過sched.pc確定
 80    newg.startpc = fn.fn
 81    if _g_.m.curg != nil {
 82       newg.labels = _g_.m.curg.labels
 83    }
 84    // 判斷newg的任務函數是不是runtime系統的任務函數,是則sched.ngsys+1;
 85    // 主協程則代表runtime.main函數,在這裏就為判斷為真
 86    if isSystemGoroutine(newg, false) {
 87       atomic.Xadd(&sched.ngsys, +1)
 88    }
 89    // Track initial transition?
 90    newg.trackingSeq = uint8(fastrand())
 91    if newg.trackingSeq%gTrackingPeriod == 0 {
 92       newg.tracking = true
 93    }
 94    // 更改當前g的狀態為_Grunnable
 95    casgstatus(newg, _Gdead, _Grunnable)
 96     // 設置g的goid,因為p會每次批量生成16個id,每次newproc如果新建一個g,id就會加1
 97     // 所以這裏m0的g0的id為0,而主協程的goid為1,其他的依次遞增
 98    if _p_.goidcache == _p_.goidcacheend {
 99       // Sched.goidgen is the last allocated id,
100       // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
101       // At startup sched.goidgen=0, so main goroutine receives goid=1.
102       // 使用原子操作修改全局變量,這裏的sched是在runtime2.go中的一個全局變量類型為schedt
103       // 原子操作具有多線程可見性,同時比加鎖性能更高
104       _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
105       _p_.goidcache -= _GoidCacheBatch - 1
106       _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
107    }
108    newg.goid = int64(_p_.goidcache)
109    _p_.goidcache++
110    if raceenabled {
111       newg.racectx = racegostart(callerpc)
112    }
113    if trace.enabled {
114       traceGoCreate(newg, newg.startpc)
115    }
116    // 釋放getg與m的綁定
117    releasem(_g_.m)
118 
119    return newg
120 }

其中有幾個關鍵地方需要強調

3.1.4 協程棧在堆空間的分配

malg函數,用來創建一個新g和對應的棧空間分配,這個函數主要強調的是棧空間分配部分,通過切換到系統棧上進行空間分配,分配完後設置棧底和棧頂的兩個位置的保護字段,當棧上進行分配變量空間發現超過stackguard1時,會進行擴容,同時在某些條件下也會進行縮容

 1 // Allocate a new g, with a stack big enough for stacksize bytes.
 2 func malg(stacksize int32) *g {
 3    newg := new(g)
 4    if stacksize >= 0 {
 5       stacksize = round2(_StackSystem + stacksize)
 6       systemstack(func() {
 7          newg.stack = stackalloc(uint32(stacksize))
 8       })
 9       newg.stackguard0 = newg.stack.lo + _StackGuard
10       newg.stackguard1 = ^uintptr(0)
11       // Clear the bottom word of the stack. We record g
12       // there on gsignal stack during VDSO on ARM and ARM64.
13       *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
14    }
15    return newg
16 }

stackalloc代碼位於runtime/stack.go文件中;

協程棧首先在進程初始化時會創建棧的管理結構:

1、棧池stackpool,這個棧池主要用來對大小為2、4、8kb的小棧做緩存使用,使用的同樣是mspan這種結構來存儲;

2、為大棧分配的stackLarge

 1   OS               | FixedStack | NumStackOrders
 2   -----------------+------------+---------------
 3   linux/darwin/bsd | 2KB        | 4
 4   windows/32       | 4KB        | 3
 5   windows/64       | 8KB        | 2
 6   plan9            | 4KB        | 3
 7 
 8 // Global pool of spans that have free stacks.
 9 // Stacks are assigned an order according to size.
10 //     order = log_2(size/FixedStack)
11 // There is a free list for each order.
12 var stackpool [_NumStackOrders]struct {
13    item stackpoolItem
14    _    [cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize]byte
15 }
16 
17 //go:notinheap
18 type stackpoolItem struct {
19    mu   mutex
20    span mSpanList
21 }
22 
23 // Global pool of large stack spans.
24 var stackLarge struct {
25    lock mutex
26    free [heapAddrBits - pageShift]mSpanList // free lists by log_2(s.npages)
27 }
28 
29 func stackinit() {
30    if _StackCacheSize&_PageMask != 0 {
31       throw("cache size must be a multiple of page size")
32    }
33    for i := range stackpool {
34       stackpool[i].item.span.init()
35       lockInit(&stackpool[i].item.mu, lockRankStackpool)
36    }
37    for i := range stackLarge.free {
38       stackLarge.free[i].init()
39       lockInit(&stackLarge.lock, lockRankStackLarge)
40    }
41 }

stackalloc會首先判斷棧空間大小,是大棧還是固定空間的小棧,

1、對於小棧,如果是還沒有分配棧緩存空間,則進入stackpoolalloc函數進行分配空間(需要加鎖),這裏最終是從全局的mheap也就是堆空間中獲取內存空間;如果有棧緩存空間,則從g對應的mcache中的stackcache上獲取內存空間(無鎖),如果stackcache上沒有足夠空間則調用stackcacherefill方法為stackpool進行擴容(也是從mheap中拿取,加鎖)然後分配給協程

2、對於大棧,先從stackLarge中獲取,如果沒有則從mheap中獲取,兩個步驟都需要加載訪問;

3、最後創建stack結構返回給newg

 1 func stackalloc(n uint32) stack {
 2    // Stackalloc must be called on scheduler stack, so that we
 3    // never try to grow the stack during the code that stackalloc runs.
 4    // Doing so would cause a deadlock (issue 1547).
 5    thisg := getg()
 6 .........
 7 
 8    // Small stacks are allocated with a fixed-size free-list allocator.
 9    // If we need a stack of a bigger size, we fall back on allocating
10    // a dedicated span.
11    var v unsafe.Pointer
12    if n < _FixedStack<<_NumStackOrders && n < _StackCacheSize {
13       order := uint8(0)
14       n2 := n
15       for n2 > _FixedStack {
16          order++
17          n2 >>= 1
18       }
19       var x gclinkptr
20       if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" {
21          // thisg.m.p == 0 can happen in the guts of exitsyscall
22          // or procresize. Just get a stack from the global pool.
23          // Also don't touch stackcache during gc
24          // as it's flushed concurrently.
25          lock(&stackpool[order].item.mu)
26          x = stackpoolalloc(order)
27          unlock(&stackpool[order].item.mu)
28       } else {
29          c := thisg.m.p.ptr().mcache
30          x = c.stackcache[order].list
31          if x.ptr() == nil {
32             stackcacherefill(c, order)
33             x = c.stackcache[order].list
34          }
35          c.stackcache[order].list = x.ptr().next
36          c.stackcache[order].size -= uintptr(n)
37       }
38       v = unsafe.Pointer(x)
39    } else {
40       var s *mspan
41       npage := uintptr(n) >> _PageShift
42       log2npage := stacklog2(npage)
43 
44       // Try to get a stack from the large stack cache.
45       lock(&stackLarge.lock)
46       if !stackLarge.free[log2npage].isEmpty() {
47          s = stackLarge.free[log2npage].first
48          stackLarge.free[log2npage].remove(s)
49       }
50       unlock(&stackLarge.lock)
51 
52       lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)
53 
54       if s == nil {
55          // Allocate a new stack from the heap.
56          s = mheap_.allocManual(npage, spanAllocStack)
57          if s == nil {
58             throw("out of memory")
59          }
60          osStackAlloc(s)
61          s.elemsize = uintptr(n)
62       }
63       v = unsafe.Pointer(s.base())
64    }
65 
66    if raceenabled {
67       racemalloc(v, uintptr(n))
68    }
69    if msanenabled {
70       msanmalloc(v, uintptr(n))
71    }
72    if stackDebug >= 1 {
73       print("  allocated ", v, "\n")
74    }
75    return stack{uintptr(v), uintptr(v) + uintptr(n)}
76 }

非g0的g為什麼要在堆上分配空間?

雖然堆不如棧快,但是goroutine是go模擬的線程,具有動態擴容和縮容的能力,而系統棧是線性空間,在系統棧上發生縮容和擴容會存在空間不足或者棧空間碎片等問題;所以go這裏在堆上分配協程棧;因為是在堆空間也就意味着這部分空間也需要進行垃圾回收和釋放;所以Go的GC是多線程併發標記時,內存屏障是對整個協程棧標記灰色,來讓回收器進行掃描。

3.1.5 G的上下文設置和切換

協程棧的切換主要是在兩個地方,由執行調度邏輯的g0切換到執行用户邏輯的g的過程,以及執行用户邏輯的g退出或者被搶佔切換為g0執行調度的過程,搶佔在下文中介紹

上面代碼中當newg被初始化時,會初始化sched中的pc和sp指針,其中會把pc先設置為goexit函數的第二條指令。

 1     // 把newg的sched結構體成員的所有字段都設置為0,其實就是初始化
 2    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
 3    newg.sched.sp = sp
 4    newg.stktopsp = sp
 5    // pc指針表示當newg被調度起來時從這個位置開始執行
 6    // 這裏是先設置為goexit,在gostartcallfn中會進行處理,更改sp為這裏的pc,將pc改為真正的協程任務函數fn的指令位置
 7    // 這樣使得任務函數執行完畢後,會繼續執行goexit中相關的清理工作
 8    newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
 9    newg.sched.g = guintptr(unsafe.Pointer(newg)) // 保存當前的g
10    gostartcallfn(&newg.sched, fn) // 在這裏完成g啓動時所有相關上下文指針的設置,主要為sp、pc和ctxt,ctxt被設置為fn
11    newg.gopc = callerpc // 保存newproc的pc,即調用者創建時的指令位置

然後進入gostartcallfn函數,最終是在gostartcall函數中進行處理

 1 // gostartcallfn 位於runtime/stack.go中
 2 
 3 func gostartcallfn(gobuf *gobuf, fv *funcval) {
 4    var fn unsafe.Pointer
 5    if fv != nil {
 6       fn = unsafe.Pointer(fv.fn)
 7    } else {
 8       fn = unsafe.Pointer(funcPC(nilfunc))
 9    }
10    gostartcall(gobuf, fn, unsafe.Pointer(fv))
11 }
12 
13 // gostartcall 位於runtime/sys_x86.go中
14 
15 func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
16     // newg的棧頂,目前newg棧上只有fn函數的參數,sp指向的是fn的第一個參數
17    sp := buf.sp
18    // 為返回地址預留空間
19    sp -= sys.PtrSize
20    // buf.pc中設置的是goexit函數中的第二條指令
21    // 因為棧是自頂向下,先進後出,所以這裏偽裝fn是被goexit函數調用的,goexit在前fn在後
22    // 使得fn返回後到goexit繼續執行,以完成一些清理工作。
23    *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
24    buf.sp = sp // 重新設置棧頂
25    // 將pc指向goroutine的任務函數fn,這樣當goroutine獲得執行權時,從任務函數入口開始執行
26    // 如果是主協程,那麼fn就是runtime.main,從這裏開始執行
27    buf.pc = uintptr(fn)
28    buf.ctxt = ctxt
29 }

可以看到在newg初始化時進行的一系列設置工作,將goexit先壓入棧頂,然後偽造sp位置,讓cpu看起來是從goexit中調用的協程任務函數,然後將pc指針指向任務函數,當協程被執行時,從pc處開始執行,任務函數執行完畢後執行goexit;
 

這裏是設置工作,具體的切換工作,需要經由schedule調度函數選中一個g,進入execute函數設置g的相關狀態和棧保護字段等信息,然後進入gogo函數,通過彙編語言,將CPU寄存器以及函數調用棧切換為g的sched中相關指針和協程棧。gogo函數源碼如下:

 1 // gogo的具體彙編代碼位於asm_amd64.s中
 2 
 3 // func gogo(buf *gobuf)
 4 // restore state from Gobuf; longjmp
 5 TEXT runtime·gogo(SB), NOSPLIT, $0-8
 6     // 0(FP)表示第一個參數,即buf=&gp.sched
 7    MOVQ   buf+0(FP), BX     // gobuf
 8    // DX = gp.sched.g,DX代表數據寄存器
 9    MOVQ   gobuf_g(BX), DX
10    MOVQ   0(DX), CX     // make sure g != nil
11    JMP    gogo<>(SB)
12 
13 TEXT gogo<>(SB), NOSPLIT, $0
14     // 將tls保存到CX寄存器
15    get_tls(CX)
16    // 下面這條指令把當前要運行的g(上面第9行中已經把go_buf中的g放入到了DX中),
17    // 放入CX寄存器的g位置即tls[0]這個位置,也就是線程的本地存儲中,
18    // 這樣下次runtime中調用getg時獲取的就是這個g
19    MOVQ   DX, g(CX)
20    MOVQ   DX, R14       // set the g register
21    // 把CPU的SP寄存器設置為g.sched.sp這樣就完成了棧的切換,從g0切換為g
22    MOVQ   gobuf_sp(BX), SP   // restore SP
23    // 將ret、ctxt、bp分別存入對應的寄存器,完成了CPU上下文的切換
24    MOVQ   gobuf_ret(BX), AX
25    MOVQ   gobuf_ctxt(BX), DX
26    MOVQ   gobuf_bp(BX), BP
27    // 清空sched的值,相關值已經存入到寄存器中,這裏清空後可以減少GC的工作量
28    MOVQ   $0, gobuf_sp(BX)   // clear to help garbage collector
29    MOVQ   $0, gobuf_ret(BX)
30    MOVQ   $0, gobuf_ctxt(BX)
31    MOVQ   $0, gobuf_bp(BX)
32    // 把sched.pc放入BX寄存器
33    MOVQ   gobuf_pc(BX), BX
34    // JMP把BX的值放入CPU的IP寄存器,所以這時候CPU從該地址開始繼續執行指令
35    JMP    BX

AX、BX、CX、DX是8086處理器的4個數據寄存器,可以簡單認為相當於4個硬件的變量;

上文總體來説,將g存入到tls中(線程的本地存儲),設置SP和相關寄存器為g.sched中的字段(SP、ret、ctxt、bp),然後跳轉到pc指針位置執行指令

3.1.6 G的退出處理

協程棧的退出需要分為兩種情況,即運行main函數的主協程和普通的用户協程;

主協程的fn任務函數位於proc.go中的main函數中,對於主協程g.shched.pc指向的也是這個位置,這裏會調用用户的mian函數(main_main),main_main運行完畢後,會調用exit(0)直接退出,而不會跑到goexit函數中。

 1 // runtime/proc.go 中的main函數
 2 // The main goroutine.
 3 func main() {
 4    g := getg()
 5 
 6 .................
 7    fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
 8    fn()
 9 ..................
10 ..................
11    exit(0)
12 ..................
13 }

用户協程因為將goexit作為協程棧棧底,所以當執行完協程任務函數時,會執行goexit函數,goexit是一段彙編指令:

1 // The top-most function running on a goroutine
2 // returns to goexit+PCQuantum.
3 TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0
4    BYTE   $0x90  // NOP
5    CALL   runtime·goexit1(SB)    // does not return
6    // traceback from goexit1 must hit code range of goexit
7    BYTE   $0x90  // NOP

這裏直接調用goexit1,goexit1位於runtime/proc.go中

 1 // Finishes execution of the current goroutine.
 2 func goexit1() {
 3    if raceenabled {
 4       racegoend()
 5    }
 6    if trace.enabled {
 7       traceGoEnd()
 8    }
 9    mcall(goexit0)
10 }

通過mcall調用goexit0,mcall是一段彙編代碼它的作用是把執行的棧切換到g0的棧

 1 TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8
 2    MOVQ   AX, DX // DX = fn
 3 
 4    // save state in g->sched
 5    // mcall返回地址放入BX中
 6    MOVQ   0(SP), BX  // caller's PC
 7    // 下面部分是保存g的執行上下文,pc、sp、bp
 8    // g.shced.pc = BX
 9    MOVQ   BX, (g_sched+gobuf_pc)(R14)
10    LEAQ   fn+0(FP), BX   // caller's SP
11    MOVQ   BX, (g_sched+gobuf_sp)(R14)
12    MOVQ   BP, (g_sched+gobuf_bp)(R14)
13 
14    // switch to m->g0 & its stack, call fn
15    // 將g.m保存到BX寄存器中
16    MOVQ   g_m(R14), BX
17    // 這段代碼意思是從m結構體中獲取g0字段保存到SI中
18    MOVQ   m_g0(BX), SI   // SI = g.m.g0
19    CMPQ   SI, R14    // if g == m->g0 call badmcall
20    // goodm中完成了從g的棧切換到g0的棧
21    JNE    goodm
22    JMP    runtime·badmcall(SB)
23 goodm:
24    MOVQ   R14, AX       // AX (and arg 0) = g
25    MOVQ   SI, R14       // g = g.m.g0
26    get_tls(CX)       // Set G in TLS
27    MOVQ   R14, g(CX)
28    MOVQ   (g_sched+gobuf_sp)(R14), SP    // sp = g0.sched.sp
29    PUSHQ  AX // open up space for fn's arg spill slot
30    MOVQ   0(DX), R12
31    // 這裏意思是調用goexit0(g)
32    CALL   R12       // fn(g)
33    POPQ   AX
34    JMP    runtime·badmcall2(SB)
35    RET

goexit0代碼位於runtime/proc.go中,他主要完成最後的清理工作:

1、把g的狀態從——Gruning變為Gdead

2、清空g的一些字段

3、接觸g與m的綁定關係,即g.m = nil;m.currg = nil

4、把g放入p的freeg隊列中,下次創建g可以直接獲取,而不用從內存分配

5、調用schedule進入下一次調度循環

 1 // 這段代碼執行在g0的棧上,gp是我們要處理退出的g的結構體指針
 2 // goexit continuation on g0.
 3 func goexit0(gp *g) {
 4    _g_ := getg() // 獲取g0
 5     // 更改g的狀態為_Gdead
 6    casgstatus(gp, _Grunning, _Gdead)
 7    if isSystemGoroutine(gp, false) {
 8       atomic.Xadd(&sched.ngsys, -1)
 9    }
10    // 清空g的一些字段
11    gp.m = nil
12    locked := gp.lockedm != 0
13    gp.lockedm = 0
14    _g_.m.lockedg = 0
15    gp.preemptStop = false
16    gp.paniconfault = false
17    gp._defer = nil // should be true already but just in case.
18    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
19    gp.writebuf = nil
20    gp.waitreason = 0
21    gp.param = nil
22    gp.labels = nil
23    gp.timer = nil
24 
25    if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
26       // Flush assist credit to the global pool. This gives
27       // better information to pacing if the application is
28       // rapidly creating an exiting goroutines.
29       assistWorkPerByte := float64frombits(atomic.Load64(&gcController.assistWorkPerByte))
30       scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
31       atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
32       gp.gcAssistBytes = 0
33    }
34     // 接觸g與m的綁定關係
35    dropg()
36 
37    if GOARCH == "wasm" { // no threads yet on wasm
38       gfput(_g_.m.p.ptr(), gp)
39       schedule() // never returns
40    }
41 
42    if _g_.m.lockedInt != 0 {
43       print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
44       throw("internal lockOSThread error")
45    }
46    // 將g加入p的空閒隊列
47    gfput(_g_.m.p.ptr(), gp)
48    if locked {
49       // The goroutine may have locked this thread because
50       // it put it in an unusual kernel state. Kill it
51       // rather than returning it to the thread pool.
52 
53       // Return to mstart, which will release the P and exit
54       // the thread.
55       if GOOS != "plan9" { // See golang.org/issue/22227.
56          gogo(&_g_.m.g0.sched)
57       } else {
58          // Clear lockedExt on plan9 since we may end up re-using
59          // this thread.
60          _g_.m.lockedExt = 0
61       }
62    }
63    // 執行下一輪調度
64    schedule()
65 }

3.2 P源碼部分

3.2.1 P的結構

 1 // runtime/runtime2.go
 2 
 3 type p struct {
 4     // 全局變量allp中的索引位置
 5    id          int32
 6    // p的狀態標識
 7    status      uint32 // one of pidle/prunning/...
 8    link        puintptr
 9    // 調用schedule的次數,每次調用schedule這個值會加1
10    schedtick   uint32     // incremented on every scheduler call
11    // 系統調用的次數,每次進行系統調用加1
12    syscalltick uint32     // incremented on every system call
13    // 用於sysmon協程記錄被監控的p的系統調用時間和運行時間
14    sysmontick  sysmontick // last tick observed by sysmon
15    // 指向綁定的m,p如果是idle狀態這個值為nil
16    m           muintptr   // back-link to associated m (nil if idle)
17    // 用於分配微小對象和小對象的一個塊的緩存空間,裏面有各種不同等級的span
18    mcache      *mcache
19    // 一個chunk大小(512kb)的內存空間,用來對堆上內存分配的緩存優化達到無鎖訪問的目的
20    pcache      pageCache
21    raceprocctx uintptr
22 
23    deferpool    [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
24    deferpoolbuf [5][32]*_defer
25 
26    // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
27    // 可以分配給g的id的緩存,每次會一次性申請16個
28    goidcache    uint64
29    goidcacheend uint64
30 
31    // Queue of runnable goroutines. Accessed without lock.
32    // 本地可運行的G隊列的頭部和尾部,達到無鎖訪問
33    runqhead uint32
34    runqtail uint32
35    // 本地可運行的g隊列,是一個使用數組實現的循環隊列
36    runq     [256]guintptr
37    // 下一個待運行的g,這個g的優先級最高
38    // 如果當前g運行完後還有剩餘可用時間,那麼就應該運行這個runnext的g
39    runnext guintptr
40 
41    // Available G's (status == Gdead)
42    // p上的空閒隊列列表
43    gFree struct {
44       gList
45       n int32
46    }
47 
48    ............
49     // 用於內存對齊
50    _ uint32 // Alignment for atomic fields below
51 .......................
52     // 是否被搶佔
53    preempt bool
54 
55    // Padding is no longer needed. False sharing is now not a worry because p is large enough
56    // that its size class is an integer multiple of the cache line size (for any of our architectures).
57 }

通過這裏的結構可以看出,雖然P叫做邏輯處理器Processor,實際上它更多是資源的管理者,其中包含了可運行的g隊列資源、內存分配的資源、以及對調度循環、系統調用、sysmon協程的相關記錄。通過P的資源管理來儘量實現無鎖訪問,提升應用性能。

3.2.2 P的狀態

當程序剛開始運行進行初始化時,所有的P都處於_Pgcstop狀態,隨着的P的初始化(runtime.procresize),會被設置為_Pidle狀態。

當M需要運行時會調用runtime.acquirep來使P變為_Prunning狀態,並通過runtime.releasep來釋放,重新變為_Pidele。

當G執行時需要進入系統調用,P會被設置為_Psyscall,如果這個時候被系統監控搶奪(runtime.retake),則P會被重新修改為_Pidle。

如果在程序中發生GC,則P會被設置為_Pgcstop,並在runtime.startTheWorld時重新調整為_Prunning。

0

(這部分文字來自《Go程序員面試寶典》,圖片來自這裏)
 

3.2.3 P的創建

P的初始化是在schedinit函數中調用的,schedinit函數是在runtime的彙編啓動代碼裏調用的。

1 ...........................
2 CALL    runtime·args(SB)
3 CALL   runtime·osinit(SB)
4 CALL   runtime·schedinit(SB)
5 ...........................

shcedinit中通過調用procresize進行P的分配。P的個數默認等於CPU核數,如果設置了GOMAXPROCS環境變量,則會採用設置的值來確定P的個數。所以runtime.GOMAXPROCS是限制的並行線程數量,而不是系統線程即M的總數,M是按需創建。

 1 func schedinit() {
 2 .................
 3    lock(&sched.lock)
 4    sched.lastpoll = uint64(nanotime())
 5    procs := ncpu
 6    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
 7       procs = n
 8    }
 9    if procresize(procs) != nil {
10       throw("unknown runnable goroutine during bootstrap")
11    }
12    unlock(&sched.lock)
13 
14    // World is effectively started now, as P's can run.
15    worldStarted()
16    .....................
17 }

上面獲取ncpu的個數,然後傳遞給procresize函數。

無論是初始化時的分配,還是後期調整,都是通過procresize來創建p以及初始化

 1 func procresize(nprocs int32) *p {
 2 .............................
 3    old := gomaxprocs
 4 ......................
 5    if nprocs > int32(len(allp)) {
 6       // Synchronize with retake, which could be running
 7       // concurrently since it doesn't run on a P.
 8       lock(&allpLock)
 9       if nprocs <= int32(cap(allp)) {
10           // 如果需要的p小於allp這個全局變量(切片)的cap能力,取其中的一部分
11          allp = allp[:nprocs]
12       } else {
13           // 否則創建nprocs數量的p,並把allp的中複製給nallp
14          nallp := make([]*p, nprocs)
15          // Copy everything up to allp's cap so we
16          // never lose old allocated Ps.
17          copy(nallp, allp[:cap(allp)])
18          allp = nallp
19       }
20 ....................................
21       unlock(&allpLock)
22    }
23 
24    // 進行p的初始化
25    for i := old; i < nprocs; i++ {
26       pp := allp[i]
27       if pp == nil {
28          pp = new(p)
29       }
30       pp.init(i)
31       atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
32    }
33 ...............................
34    return runnablePs
35 }

在啓動時候會根據CPU核數和runtime.GOMAXPROCS來設置p的個數,並由一個叫做allp的切片來為主,後期可以通過設置GOMAXPROCS來調整P的個數,但是性能消耗很大,會進行STW;

3.3 M源碼部分

3.3.1 M的結構

M即Machine,代表一個進程中的工作線程,結構體m保存了M自身使用的棧信息、當前正在M上執行的G,以及綁定M的P信息等。

我們來看下m的結構體:

 1 type m struct {
 2     // 每個m都有一個對應的g0線程,用來執行調度代碼,
 3     // 當需要執行用户代碼的時候,g0會與用户goroutine發生協程棧切換
 4    g0      *g     // goroutine with scheduling stack
 5    morebuf gobuf  // gobuf arg to morestack
 6 ........................
 7    // tls作為線程的本地存儲
 8    // 其中可以在任意時刻獲取綁定到當前線程上的協程g、結構體m、邏輯處理器p、特殊協程g0等信息
 9    tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
10    mstartfn      func()
11    // 指向正在運行的goroutine對象
12    curg          *g       // current running goroutine
13    caughtsig     guintptr // goroutine running during fatal signal
14    // 與當前工作線程綁定的p
15    p             puintptr // attached p for executing go code (nil if not executing go code)
16    nextp         puintptr
17    oldp          puintptr // the p that was attached before executing a syscall
18    id            int64
19    mallocing     int32
20    throwing      int32
21    // 與禁止搶佔相關的字段,如果該字段不等於空字符串,要保持curg一直在這個m上運行
22    preemptoff    string // if != "", keep curg running on this m
23    // locks也是判斷g能否被搶佔的一個標識
24    locks         int32
25    dying         int32
26    profilehz     int32
27    // spining為true標識當前m正在處於自己找工作的自旋狀態,
28    // 首先檢查全局隊列看是否有工作,然後檢查network poller,嘗試執行GC任務
29    //或者偷一部分工作,如果都沒有則會進入休眠狀態
30    spinning      bool // m is out of work and is actively looking for work
31    // 表示m正阻塞在note上
32    blocked       bool // m is blocked on a note
33 .........................
34    doesPark      bool        // non-P running threads: sysmon and newmHandoff never use .park
35    // 沒有goroutine需要運行時,工作線程睡眠在這個park成員上
36    park          note
37    // 記錄所有工作線程的一個鏈表
38    alllink       *m // on allm
39    schedlink     muintptr
40    lockedg       guintptr
41    createstack   [32]uintptr // stack that created this thread.
42 .............................
43 }

3.3.2 M的狀態

M的狀態並沒有向P和G那樣有多個狀態常量,它只有自旋和非自旋兩種狀態

 1 mstart
 2     |
 3     v        
 4 +------+       找不到可執行任務           +-------+
 5 |unspin| ----------------------------> |spining| 
 6 |      | <---------------------------- |       |
 7 +------+       找到可執行任務            +-------+
 8     ^                                      | stopm
 9     |               wakep                  v
10 notewakeup <-------------------------  notesleep 

當M沒有工作時,它會自旋的來找工作,首先檢查全局隊列看是否有工作,然後檢查network poller,嘗試執行GC任務,或者偷一部分工作,如果都沒有則會進入休眠狀態。當被其他工作線程喚醒,又會進入自旋狀態。

3.3.3 M的創建

runtime/proc.go中的newm函數用來新建m,而最終是根據不同的系統,通過系統調用來創建線程。

 1 // newm創建一個新的m,將從fn或者調度程序開始執行
 2 func newm(fn func(), _p_ *p, id int64) {
 3     // 這裏實現m的創建
 4    mp := allocm(_p_, fn, id)
 5    mp.doesPark = (_p_ != nil)
 6    mp.nextp.set(_p_)
 7    mp.sigmask = initSigmask
 8    if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
 9       // 我們處於鎖定的M或可能由C啓動的線程。此線程的內核狀態可能很奇怪(用户可能已將其鎖定為此目的)。
10       // 我們不想將其克隆到另一個線程中。相反,請求一個已知良好的線程為我們創建線程。
11       lock(&newmHandoff.lock)
12       if newmHandoff.haveTemplateThread == 0 {
13          throw("on a locked thread with no template thread")
14       }
15       mp.schedlink = newmHandoff.newm
16       newmHandoff.newm.set(mp)
17       if newmHandoff.waiting {
18          newmHandoff.waiting = false
19          notewakeup(&newmHandoff.wake)
20       }
21       unlock(&newmHandoff.lock)
22       return
23    }
24    // 這裏分配真正的的操作系統線程
25    newm1(mp)
26 }

allocm函數中實現m的創建,以及對應的g0協程的設置

 1 func allocm(_p_ *p, fn func(), id int64) *m {
 2     // 獲取當前運行的g
 3    _g_ := getg()
 4    // 將_g_對應的m的locks加1,防止被搶佔
 5    acquirem() // disable GC because it can be called from sysmon
 6    if _g_.m.p == 0 {
 7       acquirep(_p_) // temporarily borrow p for mallocs in this function
 8    }
 9 
10    // 檢查是有有空閒的m可以釋放,主要目的是釋放m上的g0佔用的系統棧
11    if sched.freem != nil {
12       lock(&sched.lock)
13       var newList *m
14       for freem := sched.freem; freem != nil; {
15          if freem.freeWait != 0 {
16             next := freem.freelink
17             freem.freelink = newList
18             newList = freem
19             freem = next
20             continue
21          }
22          // stackfree must be on the system stack, but allocm is
23          // reachable off the system stack transitively from
24          // startm.
25          systemstack(func() {
26             stackfree(freem.g0.stack)
27          })
28          freem = freem.freelink
29       }
30       sched.freem = newList
31       unlock(&sched.lock)
32    }
33     // 創建一個m結構體
34    mp := new(m)
35    mp.mstartfn = fn // 將fn設置為m啓動後執行的函數
36    mcommoninit(mp, id)
37 
38    // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
39    // Windows and Plan 9 will layout sched stack on OS stack.
40    if iscgo || mStackIsSystemAllocated() {
41       mp.g0 = malg(-1)
42    } else {
43        // 設置m對應的g0,並設置對應大小的g0協程棧,g0是8kb
44       mp.g0 = malg(8192 * sys.StackGuardMultiplier)
45    }
46    // 設置g0對應的m
47    mp.g0.m = mp
48 
49    if _p_ == _g_.m.p.ptr() {
50       releasep()
51    }
52    // 解除_g_的m的禁止搶佔狀態。
53    releasem(_g_.m)
54 
55    return mp
56 }

為什麼m.locks加1可以禁止搶佔,防止GC;因為判斷是否可以搶佔,其中一個因素就是要m.locks=0

1 func canPreemptM(mp *m) bool {
2    return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
3 }

allocm函數實現了m的創建,但是這只是runtime層面的一個數據結構,還並沒有在系統中有真正的線程。再來看newm1函數:

1 func newm1(mp *m) {
2 ..............
3    execLock.rlock() // Prevent process clone.
4    // 創建一個系統線程,並且傳入該 mp 綁定的 g0 的棧頂指針
5    // 讓系統線程執行 mstart 函數,後面的邏輯都在 mstart 函數中
6    newosproc(mp)
7    execLock.runlock()
8 }

實際是通過newostproc來創建系統線程;

0

可以看到這個函數在不同的系統中有不同的實現,我們主要看linux系統,即os_linux.go文件代碼:

1 func newosproc(mp *m) {
2    stk := unsafe.Pointer(mp.g0.stack.hi)
3 .........................
4    var oset sigset
5    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
6    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
7    sigprocmask(_SIG_SETMASK, &oset, nil)
8 .........................
9 }

在linux平台,是通過clone這個系統調用來創建的線程;值得注意的是,這個系統線程的棧是在堆上;因為其中的stk指向mp.go.stack.hi,所以g0的堆棧也就是這個系統線程的堆棧。

3.3.4 M的休眠

M的自旋指的是m.spining為true,這個時候它會在P的本地G隊列、全局G隊列、nerpoller、偷其他P的G,一直循環找可運行的G的過程中。

自旋狀態用 m.spinning 和 sched.nmspinning 表示。其中 m.spinning 表示當前的M是否為自旋狀態,sched.nmspinning 表示runtime中一共有多少個M在自旋狀態。

當自旋了一段時間後,發現仍然找不到工作,就會進入stopm函數中,使M對應的線程進行休眠。

 1 func stopm() {
 2    _g_ := getg()
 3 .....................
 4    lock(&sched.lock)
 5    // 首先將m放到全局空閒鏈表中,這裏要加鎖訪問全局鏈表
 6    mput(_g_.m)
 7    unlock(&sched.lock)
 8    // 進入睡眠狀態
 9    mPark()
10    // 將m與p解綁
11    acquirep(_g_.m.nextp.ptr())
12    _g_.m.nextp = 0
13 }
14 
15 func mPark() {
16    g := getg()
17    for {
18        // 使工作線程休眠在park字段上
19       notesleep(&g.m.park)
20       noteclear(&g.m.park)
21       if !mDoFixup() {
22          return
23       }
24    }
25 }

實際將線程進行休眠的代碼,是通過彙編語言進行Futex系統調用來事項的。Futex機制是Linux提供的一種用户態和內核態混合的同步機制。Linux底層也是使用futex機制實現的鎖。

1      //uaddr指向一個地址,val代表這個地址期待的值,當*uaddr==val時,才會進行wait
2      int futex_wait(int *uaddr, int val);
3      //喚醒n個在uaddr指向的鎖變量上掛起等待的進程
4      int futex_wake(int *uaddr, int n);

可以看到在futex_wait中當一個地址等於某個期待值時,就會進行wait;所以當m中的park的key等於某個值時則進入休眠狀態。

3.3.5 M的喚醒

M的喚醒是在wakep函數中處理的。當一個新的goroutine創建或者有多個goroutine進入_Grunnable狀態時,會先判斷是否有自旋的M,如果有則不會喚醒其他的M而使用自旋的M,當沒有自旋的M,但m空閒隊列中有空閒的M則會喚醒M,否則會創建一個新的M

 1 // Tries to add one more P to execute G's.
 2 // Called when a G is made runnable (newproc, ready).
 3 func wakep() {
 4    if atomic.Load(&sched.npidle) == 0 {
 5       return
 6    }
 7    // be conservative about spinning threads
 8    // 如果有其他的M處於自旋狀態,那麼就不管了,直接返回,因為自旋的M會拼命找G來運行的
 9    if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
10       return
11    }
12    startm(nil, true)
13 }

startm先判斷是否有空閒的P,如果沒有則返回,如果有空閒的P,在嘗試看看有沒有空閒的M,有則喚醒該M來工作。如果沒有空閒M,則新建一個M,然後也進入喚醒操作。

 1 func startm(_p_ *p, spinning bool) {
 2    // 禁止搶佔,防止GC垃圾回收
 3    mp := acquirem()
 4    lock(&sched.lock)
 5    // 如果P為nil,則嘗試獲取一個空閒P
 6    if _p_ == nil {
 7       _p_ = pidleget()
 8       if _p_ == nil { // 沒有空閒的P,則解除禁止搶佔,直接返回
 9          unlock(&sched.lock)
10          if spinning {
11             if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
12                throw("startm: negative nmspinning")
13             }
14          }
15          releasem(mp)
16          return
17       }
18    }
19    // 獲取一個空閒的M
20    nmp := mget()
21    if nmp == nil {
22       // 如果沒有空閒的m則新建一個
23       id := mReserveID()
24       unlock(&sched.lock)
25 
26       var fn func()
27       if spinning {
28          // The caller incremented nmspinning, so set m.spinning in the new M.
29          fn = mspinning
30       }
31       newm(fn, _p_, id)
32       // Ownership transfer of _p_ committed by start in newm.
33       // Preemption is now safe.
34       releasem(mp)
35       return
36    }
37    unlock(&sched.lock)
38 ......................
39    //標記該M是否在自旋
40    nmp.spinning = spinning
41    // 暫存P
42    nmp.nextp.set(_p_)
43    // 喚醒M
44    notewakeup(&nmp.park)
45    // Ownership transfer of _p_ committed by wakeup. Preemption is now
46    // safe.
47    releasem(mp)
48 }

喚醒線程的底層操作同樣是依賴futex機制

1      //喚醒n個在uaddr指向的鎖變量上掛起等待的進程
2      int futex_wake(int *uaddr, int n);

4 啓動過程

4.1 Go scheduler結構

在runtime中全局變量sched代表全局調度器,數據結構為schedt結構體,保存了調度器的狀態信息、全局可運行G隊列

 1 type schedt struct {
 2    // 用來為goroutine生成唯一id,需要以原子訪問形式進行訪問
 3    // 放在struct頂部,以便在32位系統上可以進行對齊
 4    goidgen   uint64
 5 ...................
 6    lock mutex
 7     // 空閒的m組成的鏈表
 8    midle        muintptr // idle m's waiting for work
 9    // 空閒的工作線程數量
10    nmidle       int32    // number of idle m's waiting for work
11    // 空閒的且被lock的m的數量
12    nmidlelocked int32    // number of locked m's waiting for work
13    mnext        int64    // number of m's that have been created and next M ID
14    // 表示最多能夠創建的工作線程數量
15    maxmcount    int32    // maximum number of m's allowed (or die)
16    nmsys        int32    // number of system m's not counted for deadlock
17    nmfreed      int64    // cumulative number of freed m's
18     // 整個goroutine的數量,能夠自動保持更新
19    ngsys uint32 // number of system goroutines; updated atomically
20     // 由空閒的p結構體對象組成的鏈表
21    pidle      puintptr // idle p's
22    // 空閒的p結構體對象的數量
23    npidle     uint32
24    // 自旋的m的數量
25    nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
26 
27    // Global runnable queue.
28    // 全局的的g的隊列
29    runq     gQueue
30    runqsize int32
31 
32    disable struct {
33       // user disables scheduling of user goroutines.
34       user     bool
35       runnable gQueue // pending runnable Gs
36       n        int32  // length of runnable
37    }
38 
39    // Global cache of dead G's.
40    // 空閒的g隊列,這裏面g的狀態為_Gdead
41    gFree struct {
42       lock    mutex
43       stack   gList // Gs with stacks
44       noStack gList // Gs without stacks
45       n       int32
46    }
47 .................
48     // 空閒的m隊列
49    freem *m
50 .....................
51     // 上次修改gomaxprocs的時間
52    procresizetime int64 // nanotime() of last change to gomaxprocs
53 ......................
54 }

這裏面大部分是記錄一些空閒的g、m、p等,在runtime2.go中還有很多相關的全局變量:

 1 // runtime/runtime2.go
 2 var (
 3     // 保存所有的m
 4    allm       *m
 5    // p的最大個數,默認等於cpu核數
 6    gomaxprocs int32
 7    // cpu的核數,程序啓動時會調用osinit獲取ncpu值
 8    ncpu       int32
 9    // 調度器結構體對象,記錄了調度器的工作狀態
10    sched      schedt
11    newprocs   int32
12 
13    allpLock mutex
14    // 全局的p隊列
15    allp []*p
16 )
17 
18 // runtime/proc.go
19 var (
20     // 代表進程主線程的m0對象
21    m0           m
22    // m0的g0
23    g0           g
24    // 全局的mcache對象,管理各種類型的span隊列
25    mcache0      *mcache
26    raceprocctx0 uintptr
27 )

4.2 啓動流程

調度器的初始化和啓動調度循環是在進程初始化是處理的,整個進程初始化流程如下:

0

Go 進程的啓動是通過彙編代碼進行的,入口函數在asm_amd64.s這個文件中的runtime.rt0_go部分代碼;

 1 // runtime·rt0_go
 2 
 3 // 程序剛啓動的時候必定有一個線程啓動(主線程)
 4 // 將當前的棧和資源保存在g0
 5 // 將該線程保存在m0
 6 // tls: Thread Local Storage
 7 // set the per-goroutine and per-mach "registers"
 8 get_tls(BX)
 9 LEAQ    runtime·g0(SB), CX
10 MOVQ    CX, g(BX)
11 LEAQ    runtime·m0(SB), AX
12 
13 // m0和g0互相綁定
14 // save m->g0 = g0
15 MOVQ    CX, m_g0(AX)
16 // save m0 to g0->m
17 MOVQ    AX, g_m(CX)
18 // 處理args
19 CALL    runtime·args(SB) 
20 // os初始化, os_linux.go
21 CALL    runtime·osinit(SB) 
22 // 調度系統初始化, proc.go
23 CALL    runtime·schedinit(SB) 
24 
25 // 創建一個goroutine,然後開啓執行程序
26 // create a new goroutine to start program
27 MOVQ    $runtime·mainPC(SB), AX        // entry
28 PUSHQ    AX
29 PUSHQ    $0            // arg size
30 CALL    runtime·newproc(SB)
31 POPQ    AX
32 POPQ    AX
33 
34 // start this M
35 // 啓動線程,並且啓動調度系統
36 CALL    runtime·mstart(SB)

可以看到通過彙編代碼:

1、將m0與g0互相綁定,然後調用runtime.osinit函數,這個函數不同的操作系統有不同的實現;

2、然後調用runtime.schedinit進行調度系統的初始化;

3、然後創建主協程;主goroutine創建完成後被加入到p的運行隊列中,等待調度;

4、在g0上啓動調用runtime.mstart啓動調度循環,首先可以被調度執行的就是主goroutine,然後主協程獲得運行的cpu則執行runtime.main進而執行到用户代碼的main函數。
 

初始化過程和堆棧圖可以參考下圖:

0

堆棧上,g0、m0、p0與主協程關係如圖所示:

0

4.3 調度器初始化

調度器初始化

 1 func schedinit() {
 2 ................
 3     // g0
 4    _g_ := getg()
 5    if raceenabled {
 6       _g_.racectx, raceprocctx0 = raceinit()
 7    }
 8     // 最多啓動10000個工作線程
 9    sched.maxmcount = 10000
10 
11    // The world starts stopped.
12    worldStopped()
13 
14    moduledataverify()
15    // 初始化協程堆棧,包括專門分配小棧的stackpool和分配大棧的stackLarge
16    stackinit()
17    // 整個堆內存的初始化分配
18    mallocinit()
19    fastrandinit() // must run before mcommoninit
20    // 初始化m0
21    mcommoninit(_g_.m, -1)
22    cpuinit()       // must run before alginit
23    alginit()       // maps must not be used before this call
24    modulesinit()   // provides activeModules
25    typelinksinit() // uses maps, activeModules
26    itabsinit()     // uses activeModules
27 
28    sigsave(&_g_.m.sigmask)
29    initSigmask = _g_.m.sigmask
30 
31    if offset := unsafe.Offsetof(sched.timeToRun); offset%8 != 0 {
32       println(offset)
33       throw("sched.timeToRun not aligned to 8 bytes")
34    }
35 
36    goargs()
37    goenvs()
38    parsedebugvars()
39    gcinit()
40    // 這部分是初始化p,
41    // cpu有多少個核數就初始化多少個p
42    lock(&sched.lock)
43    sched.lastpoll = uint64(nanotime())
44    procs := ncpu
45    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
46       procs = n
47    }
48    if procresize(procs) != nil {
49       throw("unknown runnable goroutine during bootstrap")
50    }
51    unlock(&sched.lock)
52 
53    // World is effectively started now, as P's can run.
54    worldStarted()
55 }

4.4 啓動調度系統

調度系統時在runtime.mstart0函數中啓動的。這個函數是在m0的g0上執行的。

 1 func mstart0() {
 2     // 這裏獲取的是g0在系統棧上執行
 3    _g_ := getg()
 4 .............
 5    mstart1()
 6 .............
 7 }
 8 
 9 func mstart1(dummy int32) {
10     _g_ := getg()
11     // 確保g是系統棧上的g0
12     // 調度器只在g0上執行
13     if _g_ != _g_.m.g0 {
14         throw("bad runtime·mstart")
15     }
16     ...
17     // 初始化m,主要是設置線程的備用信號堆棧和信號掩碼
18     minit()
19     // 如果當前g的m是初始m0,執行mstartm0()
20     if _g_.m == &m0 {
21         // 對於初始m,需要一些特殊處理,主要是設置系統信號量的處理函數
22         mstartm0()
23     }
24     // 如果有m的起始任務函數,則執行,比如 sysmon 函數
25     // 對於m0來説,是沒有 mstartfn 的
26     if fn := _g_.m.mstartfn; fn != nil {
27         fn()
28     }
29     if _g_.m.helpgc != 0 {
30         _g_.m.helpgc = 0
31         stopm()
32     } else if _g_.m != &m0 { // 如果不是m0,需要綁定p
33         // 綁定p
34         acquirep(_g_.m.nextp.ptr())
35         _g_.m.nextp = 0
36     }
37     // 進入調度循環,永不返回
38     schedule()
39 }

4.5 runtime.main函數

當經過初始的調度,主協程獲取執行權後,首先進入的就是runtime.main函數:

 1 // The main goroutine.
 2 func main() {
 3     // 獲取 main goroutine
 4     g := getg()
 5     ...
 6     // 在系統棧上運行 sysmon
 7     systemstack(func() {
 8         // 分配一個新的m,運行sysmon系統後台監控
 9         // (定期垃圾回收和調度搶佔)
10         newm(sysmon, nil)
11     })
12     ...
13     // 確保是主線程
14     if g.m != &m0 {
15         throw("runtime.main not on m0")
16     }
17     // runtime 內部 init 函數的執行,編譯器動態生成的。
18     runtime_init() // must be before defer
19     ...
20     // gc 啓動一個goroutine進行gc清掃
21     gcenable()
22     ...
23     // 執行init函數,編譯器動態生成的,
24     // 包括用户定義的所有的init函數。
25     // make an indirect call,
26     // as the linker doesn't know the address of
27     // the main package when laying down the runtime
28     fn := main_init 
29     fn()
30     ...
31     // 真正的執行main func in package main
32     // make an indirect call,
33     // as the linker doesn't know the address of
34     // the main package when laying down the runtime
35     fn = main_main 
36     fn()
37     ...
38     // 退出程序
39     exit(0)
40     // 為何這裏還需要for循環?
41     // 下面的for循環一定會導致程序崩掉,這樣就確保了程序一定會退出
42     for {
43         var x *int32
44         *x = 0
45     }
46 }

runtime.main函數中需要注意的是在系統棧上創建了一個新的m,來執行sysmon協程,這個協程是用來定期執行垃圾回收和調度搶佔。

其中可以看到首先獲取了main_init函數,來執行runtime包中的init函數,然後是獲取main_main來執行用户的main函數。

接着main函數執行完成後,調用exit讓主進程退出,如果進程沒有退出,就讓for循環一直訪問非法地址,讓操作系統把進程殺死,這樣來確保進程一定會退出。

 

5 協程的調度策略

調度循環啓動之後,便會進入一個無限循環中,不斷的執行schedule->execute->gogo->goroutine任務->goexit->goexit1->mcall->goexit0->schedule;

其中調度的過程是在m的g0上執行的,而goroutine任務->goexit->goexit1->mcall則是在goroutine的堆棧空間上執行的。

5.1 調度策略

其中schedule函數處理具體的調度策略;execute函數執行一些具體的狀態轉移、協程g與結構體m之間的綁定;gogo函數是與操作系統相關的函數,執行彙編代碼完成棧的切換以及CPU寄存器的恢復。

先來看下schedule的代碼:

 1 func schedule() {
 2     _g_ := getg()
 3     ...
 4 top:
 5     // 如果當前GC需要停止整個世界(STW), 則調用gcstopm休眠當前的M
 6     if sched.gcwaiting != 0 {
 7         // 為了STW,停止當前的M
 8         gcstopm()
 9         // STW結束後回到 top
10         goto top
11     }
12     ...
13     var gp *g
14     var inheritTime bool
15     ...
16     if gp == nil {
17         // 為了防止全局隊列中的g永遠得不到運行,所以go語言讓p每執行61調度,
18                 // 就需要優先從全局隊列中獲取一個G到當前P中,並執行下一個要執行的G
19         if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
20             lock(&sched.lock)
21             gp = globrunqget(_g_.m.p.ptr(), 1)
22             unlock(&sched.lock)
23         }
24     }
25     if gp == nil {
26         // 從p的本地隊列中獲取
27         gp, inheritTime = runqget(_g_.m.p.ptr())
28         if gp != nil && _g_.m.spinning {
29             throw("schedule: spinning with local work")
30         }
31     }
32     if gp == nil {
33         // 想盡辦法找到可運行的G,找不到就不用返回了
34         gp, inheritTime = findrunnable() // blocks until work is available
35     }
36     ...
37     // println("execute goroutine", gp.goid)
38     // 找到了g,那就執行g上的任務函數
39     execute(gp, inheritTime)
40 }

findrunnalbe中首先還是從本地隊列中檢查,然後從全局隊列中尋找,再從就緒的網絡協程,如果這幾個沒有就去其他p的本地隊列偷一些任務。

 1 func findrunnable() (gp *g, inheritTime bool) {
 2    _g_ := getg()
 3 
 4 top:
 5 ............................
 6    // 本地隊列中檢查
 7    if gp, inheritTime := runqget(_p_); gp != nil {
 8       return gp, inheritTime
 9    }
10    // 從全局隊列中尋找
11    if sched.runqsize != 0 {
12       lock(&sched.lock)
13       gp := globrunqget(_p_, 0)
14       unlock(&sched.lock)
15       if gp != nil {
16          return gp, false
17       }
18    }
19    // 從就緒的網絡協程中查找
20    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
21       if list := netpoll(0); !list.empty() { // non-blocking
22          gp := list.pop()
23          injectglist(&list)
24          casgstatus(gp, _Gwaiting, _Grunnable)
25          if trace.enabled {
26             traceGoUnpark(gp, 0)
27          }
28          return gp, false
29       }
30    }
31 
32     // 進入自旋狀態
33     procs := uint32(gomaxprocs)
34     if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
35        if !_g_.m.spinning {
36           _g_.m.spinning = true
37           atomic.Xadd(&sched.nmspinning, 1)
38        }
39         // 從其他p的本地隊列中偷工作
40        gp, inheritTime, tnow, w, newWork := stealWork(now)
41 ..............................
42     }
43 }

整個調度的優先級如下:

0

 

5.2 調度時機

5.1講了調度的策略,什麼時機發生調度呢,主要有三種方式,主動調度、被動調度、搶佔式調度。

5.2.1 主動調度

協程可以選擇主動讓渡自己的執行權利,大多數情況下不需要這麼做,但通過runtime.Goched可以做到主動讓渡。

 1 func Gosched() {
 2    checkTimeouts()
 3    mcall(gosched_m)
 4 }
 5 
 6 // Gosched continuation on g0.
 7 func gosched_m(gp *g) {
 8    if trace.enabled {
 9       traceGoSched()
10    }
11    goschedImpl(gp)
12 }
13 
14 func goschedImpl(gp *g) {
15    status := readgstatus(gp)
16    if status&^_Gscan != _Grunning {
17       dumpgstatus(gp)
18       throw("bad g status")
19    }
20    // 更改g的運行狀態
21    casgstatus(gp, _Grunning, _Grunnable)
22    // 接觸g和m的綁定關係
23    dropg()
24    // 將g放入全局隊列中
25    lock(&sched.lock)
26    globrunqput(gp)
27    unlock(&sched.lock)
28 
29    schedule()
30 }

5.2.2 被動調度

大部分情況下的調度都是被動調度,當協程在休眠、channel通道阻塞、網絡IO阻塞、執行垃圾回收時會暫停,被動調度可以保證最大化利用CPU資源。被動調度是協程發起的操作,所以調度時機相對明確。

首先從當前棧切換到g0協程,被動調度不會將G放入全局運行隊列,所以被動調度需要一個額外的喚醒機制。

這裏面涉及的函數主要是gopark和ready函數。

gopark函數用來完成被動調度,有_Grunning變為_Gwaiting狀態;

 1 func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
 2    if reason != waitReasonSleep {
 3       checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
 4    }
 5    // 禁止搶佔和GC
 6    mp := acquirem()
 7    gp := mp.curg // 過去當前m上運行的g
 8    status := readgstatus(gp)
 9    if status != _Grunning && status != _Gscanrunning {
10       throw("gopark: bad g status")
11    }
12    // 設置相關的wait字段
13    mp.waitlock = lock
14    mp.waitunlockf = unlockf
15    gp.waitreason = reason
16    mp.waittraceev = traceEv
17    mp.waittraceskip = traceskip
18    releasem(mp)
19    // can't do anything that might move the G between Ms here.
20    mcall(park_m)
21 }
22 
23 // park continuation on g0.
24 func park_m(gp *g) {
25    _g_ := getg()
26     // 變更g的狀態
27    casgstatus(gp, _Grunning, _Gwaiting)
28    // 接觸g與m的綁定關係
29    dropg()
30     // 根據被動調度不同原因,執行不同的waitunlockf函數
31    if fn := _g_.m.waitunlockf; fn != nil {
32       ok := fn(gp, _g_.m.waitlock)
33       _g_.m.waitunlockf = nil
34       _g_.m.waitlock = nil
35       if !ok {
36          if trace.enabled {
37             traceGoUnpark(gp, 2)
38          }
39          casgstatus(gp, _Gwaiting, _Grunnable)
40          execute(gp, true) // Schedule it back, never returns.
41       }
42    }
43    // 進入下一輪調度
44    schedule()
45 }

當協程要被喚醒時,會進入ready函數中,更改協程狀態由_Gwaiting變更為_Grunnable,放入本地運行隊列等待被調度。

 1 func ready(gp *g, traceskip int, next bool) {
 2 ..............
 3    status := readgstatus(gp)
 4 
 5    // Mark runnable.
 6    _g_ := getg()
 7    mp := acquirem() // disable preemption because it can be holding p in a local var
 8 ...............
 9    // 變更狀態之後,放入p的局部運行隊列中
10    casgstatus(gp, _Gwaiting, _Grunnable)
11    runqput(_g_.m.p.ptr(), gp, next)
12    wakep()
13    releasem(mp)
14 }

5.2.3 搶佔式調度

如果一個g運行時間過長就會導致其他g難以獲取運行機會,當進行系統調用時也存在會導致其他g無法運行情況;當出現這兩種情況時,為了讓其他g有運行機會,則會進行搶佔式調度。

是否進行搶佔式調度主要是在sysmon協程上判斷的。sysmon協程是一個不需要p的協程,它作用主要是運行後台監控,進行netpool(獲取fd事件)、retake(搶佔式調度)、forcegc(按時間強制執行GC)、scavenge heap(強制釋放閒置堆內存,減少內存佔用)
 

其中與搶佔式調度相關的就是retake函數,

這裏我們需要知道的是連續運行10ms則認為時間過長,進行搶佔

發生系統調用時,當前正在工作的線程會陷入等待狀態,等待內部完成系統調用並返回,所以也需要讓渡執行權給其他g,但這裏當只有滿足幾種情況才會進行調度:

1、如果p的運行隊列中有等待運行的g則搶佔

2、如果沒有空閒的p則進行搶佔

3、系統調用時間超過10ms則進行搶佔

 1 func retake(now int64) uint32 {
 2    n := 0
 3    lock(&allpLock)
 4    // 遍歷所有的P
 5    for i := 0; i < len(allp); i++ {
 6       _p_ := allp[i]
 7       if _p_ == nil {
 8          // This can happen if procresize has grown
 9          // allp but not yet created new Ps.
10          continue
11       }
12       pd := &_p_.sysmontick
13       s := _p_.status
14       sysretake := false
15       
16       if s == _Prunning || s == _Psyscall {
17          // 判斷如果g的運行時間過長則搶佔
18          t := int64(_p_.schedtick)
19          if int64(pd.schedtick) != t {
20             pd.schedtick = uint32(t)
21             pd.schedwhen = now
22          } else if pd.schedwhen+forcePreemptNS <= now {
23              // 如果連續運行10ms則進行搶佔
24             preemptone(_p_)
25             sysretake = true
26          }
27       }
28       // 針對系統調用情況進行搶佔
29       // 如果p的運行隊列中有等待運行的g則搶佔
30       // 如果沒有空閒的p則進行搶佔
31       // 系統調用時間超過10ms則進行搶佔
32       if s == _Psyscall {
33          // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
34          t := int64(_p_.syscalltick)
35          if !sysretake && int64(pd.syscalltick) != t {
36             pd.syscalltick = uint32(t)
37             pd.syscallwhen = now
38             continue
39          }
40          // On the one hand we don't want to retake Ps if there is no other work to do,
41          // but on the other hand we want to retake them eventually
42          // because they can prevent the sysmon thread from deep sleep.
43          if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
44             continue
45          }
46          // Drop allpLock so we can take sched.lock.
47          unlock(&allpLock)
48          // Need to decrement number of idle locked M's
49          // (pretending that one more is running) before the CAS.
50          // Otherwise the M from which we retake can exit the syscall,
51          // increment nmidle and report deadlock.
52          incidlelocked(-1)
53          if atomic.Cas(&_p_.status, s, _Pidle) {
54             if trace.enabled {
55                traceGoSysBlock(_p_)
56                traceProcStop(_p_)
57             }
58             n++
59             _p_.syscalltick++
60             handoffp(_p_)
61          }
62          incidlelocked(1)
63          lock(&allpLock)
64       }
65    }
66    unlock(&allpLock)
67    return uint32(n)
68 }

在進行搶佔式調度,Go還涉及到利用操作系統信號方式來進行搶佔,這裏就不在介紹,感興趣可以自己去研究。

 

另外,本文圖片沒有一張原創,畫圖非常耗費時間,我沒有那麼多時間,所以只能到處借用大家的圖片,侵權請聯繫。

6 參考資料

  • Go語言的調度模型:https://www.cnblogs.com/lvpen...
  • 深度揭秘Go語言 sync.Pool:https://www.cnblogs.com/qcrao...
  • https://toutiao.io/posts/2gic...
  • 萬字長文深入淺出go runtime:https://zhuanlan.zhihu.com/p/...
  • 深入go runtime的調度:https://zboya.github.io/post/...
  • 字節Go面試:https://leetcode-cn.com/circl...
  • golang調度學習-調度過程:https://blog.csdn.net/diaosss...
  • Go調度器中的三種結構:G、P、M:https://blog.csdn.net/chenxun...
  • Go語言的調度模型:https://www.cnblogs.com/lvpen...
  • Go GMP的調度模型:https://zhuanlan.zhihu.com/p/...
  • 詳細剖析Go語言調度模型的設計:https://www.elecfans.com/d/16...
  • Go的核心goroutine sysmon:https://blog.csdn.net/RA681t5...
  • 一文教你搞懂Go中棧操作:https://zhuanlan.zhihu.com/p/...
  • 詳解Go語言調度循環代碼實現:https://www.luozhiyun.com/arc...
  • goroutine的創建、休眠與恢復:https://zhuanlan.zhihu.com/p/...
user avatar wbccb 頭像
1 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.