博客 / 詳情

返回

go channel原理及使用場景

轉載自:go channel原理及使用場景

源碼解析

type hchan struct {
    qcount   uint           // Channel 中的元素個數
    dataqsiz uint           // Channel 中的循環隊列的長度
    buf      unsafe.Pointer // Channel 的緩衝區數據指針
    elemsize uint16 // 當前 Channel 能夠收發的元素大小
    closed   uint32
    elemtype *_type // 當前 Channel 能夠收發的元素類型
    sendx    uint   // Channel 的發送操作處理到的位置
    recvx    uint   // Channel 的接收操作處理到的位置
  recvq    waitq  // 當前 Channel 由於緩衝區空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)
    sendq    waitq  // 當前 Channel 由於緩衝區空間不足而阻塞的 Goroutine 列表,雙向鏈表(sugog)

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662024619.png

創建channel

channel的初始化有2種,一種是沒有緩衝區的channel,一種是有緩衝區的channel。對應的初始化之後hchan也是有區別的。

無緩衝區的channel,初始化的時候只為channel分配內存,緩衝區dataqsiz的長度為0

有緩衝的channel,初始化時會為channel和緩衝區分配內存,dataqsiz長度大於0

同時channel的元素大小和緩衝區的長度都是有大小限制的

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

  // 如果內存超了,或者分配的內存大於channel最大分配內存,或者分配的size小於0,直接Panic
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // 如果沒有緩衝區,分配一段內存
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // 有緩衝時,如果元素不包含指針類型,會為當前的 Channel 和底層的數組分配一塊連續的內存空間
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 有緩衝區,且元素包含指針類型,channel和buf數組各自分配內存
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

  // 元素大小,元素類型,循環數組長度,更新到channel
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

發送數據(ch <- i)

  • 發送數據前會加鎖,防止多個線程併發修改數據。如果channel已經關閉,直接Panic

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        lock(&c.lock)
    
        if c.closed != 0 {
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
  • 當存在等待的接收者時,通過 runtime.send 直接將數據發送給阻塞的接收者

    當channel的recvq隊列不為空,而且channel是沒有數據數據寫入的。這個時候如果有數據寫入,會直接把數據拷貝到接收者變量所在的內存地址上。即使這是一個有緩衝的channel,當有等待的接收者時,也是直接給接收者,不會先保存到循環隊列

    // 如果目標 Channel 沒有被關閉並且已經有處於讀等待的 Goroutine,那麼 runtime.chansend 會從接收隊列 recvq 中取出最先陷入等待的 Goroutine 並直接向它發送數據
    if sg := c.recvq.dequeue(); sg != nil {
            send(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true
        }
    
    // 
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if sg.elem != nil {
        // 調用 runtime.sendDirect 將發送的數據直接拷貝到 x = <-c 表達式中變量 x 所在的內存地址上
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
      // 調用 runtime.goready 將等待接收數據的 Goroutine 標記成可運行狀態 Grunnable 並把該 Goroutine 放到發送方所在的處理器的 runnext 上等待執行,該處理器在下一次調度時會立刻喚醒數據的接收方;
      // 需要注意的是,發送數據的過程只是將接收方的 Goroutine 放到了處理器的 runnext 中,程序沒有立刻執行該 Goroutine
        goready(gp, skip+1)
    }
  • 當緩衝區存在空餘空間時,將發送的數據寫入 Channel 的緩衝區

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ...
      // 如果當前元素數小於循環隊列的長度
        if c.qcount < c.dataqsiz {
        // 使用 runtime.chanbuf 計算出下一個可以存儲數據的位置
            qp := chanbuf(c, c.sendx)
        // 將發送的數據拷貝到緩衝區中
            typedmemmove(c.elemtype, qp, ep)
        // 發送的位置索引+1
            c.sendx++
        // 如果循環隊列滿了就從0開始
        // 因為這裏的 buf 是一個循環數組,所以當 sendx 等於 dataqsiz 時會重新回到數組開始的位置
            if c.sendx == c.dataqsiz {
                c.sendx = 0
            }
        // 增加當前元素數
            c.qcount++
            unlock(&c.lock)
            return true
        }
        ...
    }
  • 當不存在緩衝區或者緩衝區已滿時,等待其他 Goroutine 從 Channel 接收數據

    當因為不存在緩衝區或者緩衝區已滿無法寫入時,會構造sudog等待執行的gorutine結構,放到hchan的等待隊列中,直到被喚醒,把數據放到緩衝區或者直接拷貝給接收者

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ...
      // 使用 select 關鍵字可以向 Channel 非阻塞地發送消息
        if !block {
            unlock(&c.lock)
            return false
        }
    
      // 獲取發送數據使用的 Goroutine
        gp := getg()
      //  獲取 runtime.sudog 結構
        mysg := acquireSudog()
      // 設置待發送數據的內存地址
        mysg.elem = ep
      // 設置發送數據的goroutine
        mysg.g = gp
      mysg.isSelect = false
      // 設置發送的channel
        mysg.c = c
      // 設置到goroutine的waiting上
        gp.waiting = mysg
      // 加入到發送等待隊列
        c.sendq.enqueue(mysg)
      // 阻塞等待喚醒
        atomic.Store8(&gp.parkingOnChan, 1)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
        KeepAlive(ep)
    
        // someone woke us up.
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        gp.activeStackChans = false
        closed := !mysg.success
        gp.param = nil
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        mysg.c = nil
        releaseSudog(mysg)
        if closed {
            if c.closed == 0 {
                throw("chansend: spurious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        return true
    }

接收數據(<- ch)

  • 從一個空 Channel 接收數據

    goroutine會讓出使用權,並阻塞等待

        if c == nil {
            if !block {
                return
            }
        // 讓出使用權
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    
        // 不獲取鎖的情況下,檢查失敗的非阻塞操作
        if !block && empty(c) {
            // 顯示未關閉,繼續返回false,因為channel不會重新打開
            if atomic.Load(&c.closed) == 0 {
                return
            }
    
            if empty(c) {
                // The channel is irreversibly closed and empty.
                if raceenabled {
                    raceacquire(c.raceaddr())
                }
          // Channel 已經被關閉並且緩衝區中不存在任何數據,那麼會清除 ep 指針中的數據並立刻返回
                if ep != nil {
                    typedmemclr(c.elemtype, ep)
                }
                return true, false
            }
        }
    
        var t0 int64
        if blockprofilerate > 0 {
            t0 = cputicks()
        }
    
        lock(&c.lock)
    
        if c.closed != 0 && c.qcount == 0 {
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
        // Channel 已經被關閉並且緩衝區中不存在任何數據,那麼會清除 ep 指針中的數據並立刻返回
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  • 當存在等待的發送者時,通過 runtime.recv 從阻塞的發送者或者緩衝區中獲取數據

    如果是無緩衝的channel,當有接收者進來時,會直接從阻塞的發送者拷貝數據

    如果是有緩衝的channel,當有接收者進來時,會先從緩衝區拿數據,接着等待的發送者會把數據拷貝到緩衝區

    注意這個時候並沒有直接去喚醒發送者,而是放到下次p的執行隊列中中,下次調度時會喚醒發送者,發送者會做一些釋放資源的操作

    if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    
    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if c.dataqsiz == 0 {
            if raceenabled {
                racesync(c, sg)
            }
            if ep != nil {
                // 如果無緩存,直接從發送者拷貝數據
                recvDirect(c.elemtype, sg, ep)
            }
        } else {
            // 由於隊列已滿,接收數據的索引和發送數據的索引一致
        qp := chanbuf(c, c.recvx)
            if raceenabled {
                racenotify(c, c.recvx, nil)
                racenotify(c, c.recvx, sg)
            }
            // 數據從隊列拷貝到目標內存地址
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
            // 數據從發送者拷貝到緩衝區
            typedmemmove(c.elemtype, qp, sg.elem)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
        sg.elem = nil
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        sg.success = true
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
      // 無論發生哪種情況,運行時都會調用 runtime.goready 將當前處理器的 runnext 設置成發送數據的 Goroutine,在調度器下一次調度時將阻塞的發送方喚醒。
        goready(gp, skip+1)
    }
  • 當緩衝區存在數據時,從 Channel 的緩衝區中接收數據

    if c.qcount > 0 {
            // 直接從隊列取數據
            qp := chanbuf(c, c.recvx)
            if raceenabled {
                racenotify(c, c.recvx, nil)
            }
          // 放到目標內存
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
          // 清空隊列中對應的元素
            typedmemclr(c.elemtype, qp)
          // 接收索引+1
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
          // 隊列元素-1
            c.qcount--
            unlock(&c.lock)
            return true, true
        }
  • 當緩衝區中不存在數據時,等待其他 Goroutine 向 Channel 發送數據

    if !block {
            unlock(&c.lock)
            return false, false
        }
    
        // no sender available: block on this channel.
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.param = nil
        c.recvq.enqueue(mysg)
        // 阻塞等待,讓出使用權
        atomic.Store8(&gp.parkingOnChan, 1)
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
        // 喚醒之後清空sudog
        if mysg != gp.waiting {
            throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        gp.activeStackChans = false
        if mysg.releasetime > 0 {
            blockevent(mysg.releasetime-t0, 2)
        }
        success := mysg.success
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        return true, success

關閉channel

  • 當 Channel 是一個空指針或者已經被關閉時,Go 語言運行時都會直接崩潰並拋出異常

    func closechan(c *hchan) {
        if c == nil {
            panic(plainError("close of nil channel"))
        }
    
        lock(&c.lock)
        if c.closed != 0 {
            unlock(&c.lock)
            panic(plainError("close of closed channel"))
        }
  • recvqsendq 兩個隊列中的數據加入到 Goroutine 列表 gList 中,與此同時該函數會清除所有 runtime.sudog 上未被處理的元素

    c.closed = 1
    
        var glist gList
    
        // release all readers
        for {
            sg := c.recvq.dequeue()
            if sg == nil {
                break
            }
            if sg.elem != nil {
                typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil
            }
            if sg.releasetime != 0 {
                sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = unsafe.Pointer(sg)
            sg.success = false
            if raceenabled {
                raceacquireg(gp, c.raceaddr())
            }
            glist.push(gp)
        }
    
        // release all writers (they will panic)
        for {
            sg := c.sendq.dequeue()
            if sg == nil {
                break
            }
            sg.elem = nil
            if sg.releasetime != 0 {
                sg.releasetime = cputicks()
            }
            gp := sg.g
            gp.param = unsafe.Pointer(sg)
            sg.success = false
            if raceenabled {
                raceacquireg(gp, c.raceaddr())
            }
            glist.push(gp)
        }
        unlock(&c.lock)
    
        // 為所有被阻塞的 Goroutine 調用 runtime.goready 觸發調度。
        for !glist.empty() {
            gp := glist.pop()
            gp.schedlink = 0
            goready(gp, 3)
        }

使用場景

報錯情形

  • 往一個關閉的channel發送數據會報錯:panic: send on closed channel
  • 關閉一個nil的chan會報錯:panic: close of nil channel
  • 關閉一個已經關閉的channel報錯:panic: close of closed channel

1、一個經典的算法題

有4個goroutine,編號為1、2、3、4。每秒鐘會有一個goroutine打印出自己的編號,要求寫一個程序,讓輸出的編號總是按照1、2、3、4、1、2、3、4...的順序打印出來

package main

import (
    "fmt"
    "time"
)

func main() {
  // 4個channel
    chs := make([]chan int, 4)
    for i, _ := range chs {
        chs[i] = make(chan int)
    // 開4個協程
        go func(i int) {
            for {
        // 獲取當前channel值並打印
                v := <-chs[i]
                fmt.Println(v + 1)
                time.Sleep(time.Second)
        // 把下一個值寫入下一個channel,等待下一次消費
                chs[(i+1)%4] <- (v + 1) % 4
            }

        }(i)
    }

  // 往第一個塞入0
    chs[0] <- 0
    select {}
}

2、限流器

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每次處理3個請求
    chLimit := make(chan struct{}, 3)
    for i := 0; i < 20; i++ {
        chLimit <- struct{}{}
        go func(i int) {
            fmt.Println("下游服務處理邏輯...", i)
            time.Sleep(time.Second * 3)
            <-chLimit
        }(i)
    }
    time.Sleep(30 * time.Second)
}

如果覺得sleep太醜太暴力,可以用waitGroup控制結束時機

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    // 每次處理3個請求
    chLimit := make(chan struct{}, 3)
    for i := 0; i < 20; i++ {
        chLimit <- struct{}{}
        wg.Add(1)
        go func(i int) {
            fmt.Println("下游服務處理邏輯...", i)
            time.Sleep(time.Second * 3)
            <-chLimit
            wg.Done()
        }(i)
    }
    wg.Wait()
}

3、優雅退出

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    var closing = make(chan struct{})
    var closed = make(chan struct{})

    go func() {
        for {
            select {
            case <-closing:
                return
            default:
                fmt.Println("業務邏輯...")
                time.Sleep(1 * time.Second)
            }
        }
    }()

    termChan := make(chan os.Signal)
  // 監聽退出信號
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan

  // 退出中
    close(closing)

    // 退出之前清理一下
    go doCleanup(closed)

    select {
    case <-closed:
    case <-time.After(time.Second):
        log.Println("清理超時不等了")
    }

    log.Println("優雅退出")
}

func doCleanup(closed chan struct{}) {
    time.Sleep(time.Minute)
  // 清理完後退出
    close(closed)
}

4、實現互斥鎖

初始化一個緩衝區為1的channel,放入元素代表一把鎖,誰獲取到這個元素就代表獲取了這把鎖,釋放鎖的時候再把這個元素放回channel

package main

import (
    "log"
    "time"
)

type Mutex struct {
    ch chan struct{}
}

// 初始化鎖
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// 加鎖,阻塞獲取
func (m *Mutex) Lock()  {
    <- m.ch
}

// 釋放鎖
func (m *Mutex) Unlock()  {
    select {
    // 成功寫入channel代表釋放成功
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// 嘗試獲取鎖
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:

    }
    return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)

    select {
    case <-m.ch:
    // 成功獲取鎖關閉定時器
        timer.Stop()
        return true
    case <-timer.C:

    }
  // 獲取鎖超時
    return false
}

// 是否上鎖
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}


func main()  {
    m := NewMutex()
    ok := m.TryLock()
    log.Printf("locked v %v\n", ok)
    ok = m.TryLock()
    log.Printf("locked v %v\n", ok)

    go func() {
        time.Sleep(5*time.Second)
        m.Unlock()
    }()

    ok = m.LockTimeout(10*time.Second)
    log.Printf("LockTimeout v %v\n", ok)
}

參考:

極刻時間《go 併發編程實戰》

user avatar cyningsun 頭像 tekin_cn 頭像
2 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.