轉載自:go channel原理及使用場景
源碼解析
type hchan struct {
qcount uint // Channel 中的元素個數
dataqsiz uint // Channel 中的循環隊列的長度
buf unsafe.Pointer // Channel 的緩衝區數據指針
elemsize uint16 // 當前 Channel 能夠收發的元素大小
closed uint32
elemtype *_type // 當前 Channel 能夠收發的元素類型
sendx uint // Channel 的發送操作處理到的位置
recvx uint // Channel 的接收操作處理到的位置
recvq waitq // 當前 Channel 由於緩衝區空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)
sendq waitq // 當前 Channel 由於緩衝區空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
創建channel
channel的初始化有2種,一種是沒有緩衝區的channel,一種是有緩衝區的channel。對應的初始化之後hchan也是有區別的。
無緩衝區的channel,初始化的時候只為channel分配內存,緩衝區dataqsiz的長度為0
有緩衝的channel,初始化時會為channel和緩衝區分配內存,dataqsiz長度大於0
同時channel的元素大小和緩衝區的長度都是有大小限制的
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 如果內存超了,或者分配的內存大於channel最大分配內存,或者分配的size小於0,直接Panic
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 如果沒有緩衝區,分配一段內存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 有緩衝時,如果元素不包含指針類型,會為當前的 Channel 和底層的數組分配一塊連續的內存空間
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 有緩衝區,且元素包含指針類型,channel和buf數組各自分配內存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小,元素類型,循環數組長度,更新到channel
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
發送數據(ch <- i)
-
發送數據前會加鎖,防止多個線程併發修改數據。如果channel已經關閉,直接Panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } -
當存在等待的接收者時,通過
runtime.send直接將數據發送給阻塞的接收者當channel的recvq隊列不為空,而且channel是沒有數據數據寫入的。這個時候如果有數據寫入,會直接把數據拷貝到接收者變量所在的內存地址上。即使這是一個有緩衝的channel,當有等待的接收者時,也是直接給接收者,不會先保存到循環隊列
// 如果目標 Channel 沒有被關閉並且已經有處於讀等待的 Goroutine,那麼 runtime.chansend 會從接收隊列 recvq 中取出最先陷入等待的 Goroutine 並直接向它發送數據 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { // 調用 runtime.sendDirect 將發送的數據直接拷貝到 x = <-c 表達式中變量 x 所在的內存地址上 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) // 調用 runtime.goready 將等待接收數據的 Goroutine 標記成可運行狀態 Grunnable 並把該 Goroutine 放到發送方所在的處理器的 runnext 上等待執行,該處理器在下一次調度時會立刻喚醒數據的接收方; // 需要注意的是,發送數據的過程只是將接收方的 Goroutine 放到了處理器的 runnext 中,程序沒有立刻執行該 Goroutine goready(gp, skip+1) } -
當緩衝區存在空餘空間時,將發送的數據寫入 Channel 的緩衝區
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 如果當前元素數小於循環隊列的長度 if c.qcount < c.dataqsiz { // 使用 runtime.chanbuf 計算出下一個可以存儲數據的位置 qp := chanbuf(c, c.sendx) // 將發送的數據拷貝到緩衝區中 typedmemmove(c.elemtype, qp, ep) // 發送的位置索引+1 c.sendx++ // 如果循環隊列滿了就從0開始 // 因為這裏的 buf 是一個循環數組,所以當 sendx 等於 dataqsiz 時會重新回到數組開始的位置 if c.sendx == c.dataqsiz { c.sendx = 0 } // 增加當前元素數 c.qcount++ unlock(&c.lock) return true } ... } -
當不存在緩衝區或者緩衝區已滿時,等待其他 Goroutine 從 Channel 接收數據
當因為不存在緩衝區或者緩衝區已滿無法寫入時,會構造sudog等待執行的gorutine結構,放到hchan的等待隊列中,直到被喚醒,把數據放到緩衝區或者直接拷貝給接收者
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 使用 select 關鍵字可以向 Channel 非阻塞地發送消息 if !block { unlock(&c.lock) return false } // 獲取發送數據使用的 Goroutine gp := getg() // 獲取 runtime.sudog 結構 mysg := acquireSudog() // 設置待發送數據的內存地址 mysg.elem = ep // 設置發送數據的goroutine mysg.g = gp mysg.isSelect = false // 設置發送的channel mysg.c = c // 設置到goroutine的waiting上 gp.waiting = mysg // 加入到發送等待隊列 c.sendq.enqueue(mysg) // 阻塞等待喚醒 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
接收數據(<- ch)
-
從一個空 Channel 接收數據
goroutine會讓出使用權,並阻塞等待
if c == nil { if !block { return } // 讓出使用權 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 不獲取鎖的情況下,檢查失敗的非阻塞操作 if !block && empty(c) { // 顯示未關閉,繼續返回false,因為channel不會重新打開 if atomic.Load(&c.closed) == 0 { return } if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } // Channel 已經被關閉並且緩衝區中不存在任何數據,那麼會清除 ep 指針中的數據並立刻返回 if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) // Channel 已經被關閉並且緩衝區中不存在任何數據,那麼會清除 ep 指針中的數據並立刻返回 if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } -
當存在等待的發送者時,通過
runtime.recv從阻塞的發送者或者緩衝區中獲取數據如果是無緩衝的channel,當有接收者進來時,會直接從阻塞的發送者拷貝數據
如果是有緩衝的channel,當有接收者進來時,會先從緩衝區拿數據,接着等待的發送者會把數據拷貝到緩衝區
注意這個時候並沒有直接去喚醒發送者,而是放到下次p的執行隊列中中,下次調度時會喚醒發送者,發送者會做一些釋放資源的操作
if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // 如果無緩存,直接從發送者拷貝數據 recvDirect(c.elemtype, sg, ep) } } else { // 由於隊列已滿,接收數據的索引和發送數據的索引一致 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 數據從隊列拷貝到目標內存地址 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 數據從發送者拷貝到緩衝區 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 無論發生哪種情況,運行時都會調用 runtime.goready 將當前處理器的 runnext 設置成發送數據的 Goroutine,在調度器下一次調度時將阻塞的發送方喚醒。 goready(gp, skip+1) } -
當緩衝區存在數據時,從 Channel 的緩衝區中接收數據
if c.qcount > 0 { // 直接從隊列取數據 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } // 放到目標內存 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清空隊列中對應的元素 typedmemclr(c.elemtype, qp) // 接收索引+1 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 隊列元素-1 c.qcount-- unlock(&c.lock) return true, true } -
當緩衝區中不存在數據時,等待其他 Goroutine 向 Channel 發送數據
if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // 阻塞等待,讓出使用權 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 喚醒之後清空sudog if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success
關閉channel
-
當 Channel 是一個空指針或者已經被關閉時,Go 語言運行時都會直接崩潰並拋出異常
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } -
將
recvq和sendq兩個隊列中的數據加入到 Goroutine 列表gList中,與此同時該函數會清除所有runtime.sudog上未被處理的元素c.closed = 1 var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // 為所有被阻塞的 Goroutine 調用 runtime.goready 觸發調度。 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }
使用場景
報錯情形
- 往一個關閉的channel發送數據會報錯:panic: send on closed channel
- 關閉一個nil的chan會報錯:panic: close of nil channel
- 關閉一個已經關閉的channel報錯:panic: close of closed channel
1、一個經典的算法題
有4個goroutine,編號為1、2、3、4。每秒鐘會有一個goroutine打印出自己的編號,要求寫一個程序,讓輸出的編號總是按照1、2、3、4、1、2、3、4...的順序打印出來
package main
import (
"fmt"
"time"
)
func main() {
// 4個channel
chs := make([]chan int, 4)
for i, _ := range chs {
chs[i] = make(chan int)
// 開4個協程
go func(i int) {
for {
// 獲取當前channel值並打印
v := <-chs[i]
fmt.Println(v + 1)
time.Sleep(time.Second)
// 把下一個值寫入下一個channel,等待下一次消費
chs[(i+1)%4] <- (v + 1) % 4
}
}(i)
}
// 往第一個塞入0
chs[0] <- 0
select {}
}
2、限流器
package main
import (
"fmt"
"time"
)
func main() {
// 每次處理3個請求
chLimit := make(chan struct{}, 3)
for i := 0; i < 20; i++ {
chLimit <- struct{}{}
go func(i int) {
fmt.Println("下游服務處理邏輯...", i)
time.Sleep(time.Second * 3)
<-chLimit
}(i)
}
time.Sleep(30 * time.Second)
}
如果覺得sleep太醜太暴力,可以用waitGroup控制結束時機
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
// 每次處理3個請求
chLimit := make(chan struct{}, 3)
for i := 0; i < 20; i++ {
chLimit <- struct{}{}
wg.Add(1)
go func(i int) {
fmt.Println("下游服務處理邏輯...", i)
time.Sleep(time.Second * 3)
<-chLimit
wg.Done()
}(i)
}
wg.Wait()
}
3、優雅退出
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
for {
select {
case <-closing:
return
default:
fmt.Println("業務邏輯...")
time.Sleep(1 * time.Second)
}
}
}()
termChan := make(chan os.Signal)
// 監聽退出信號
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
// 退出中
close(closing)
// 退出之前清理一下
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
log.Println("清理超時不等了")
}
log.Println("優雅退出")
}
func doCleanup(closed chan struct{}) {
time.Sleep(time.Minute)
// 清理完後退出
close(closed)
}
4、實現互斥鎖
初始化一個緩衝區為1的channel,放入元素代表一把鎖,誰獲取到這個元素就代表獲取了這把鎖,釋放鎖的時候再把這個元素放回channel
package main
import (
"log"
"time"
)
type Mutex struct {
ch chan struct{}
}
// 初始化鎖
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 加鎖,阻塞獲取
func (m *Mutex) Lock() {
<- m.ch
}
// 釋放鎖
func (m *Mutex) Unlock() {
select {
// 成功寫入channel代表釋放成功
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 嘗試獲取鎖
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
// 成功獲取鎖關閉定時器
timer.Stop()
return true
case <-timer.C:
}
// 獲取鎖超時
return false
}
// 是否上鎖
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
log.Printf("locked v %v\n", ok)
ok = m.TryLock()
log.Printf("locked v %v\n", ok)
go func() {
time.Sleep(5*time.Second)
m.Unlock()
}()
ok = m.LockTimeout(10*time.Second)
log.Printf("LockTimeout v %v\n", ok)
}
參考:
極刻時間《go 併發編程實戰》