博客 / 詳情

返回

腦抽研究生Go併發-2-Channel-基本用法、實現原理、消息交流、數據傳遞、信號通知、任務編排等

Channel

圖片

CSP(Communicating Sequential Process):CSP 允許使用進程組件來描述系統,它們獨立運行,並且只通過消息傳遞的方式通信

Channel 類型是 Go 語言內置的類型,可以直接使用

Don’t communicate by sharing memory, share memory by communicating. --Go Proverbs by Rob Pike

Channel用法:

  • 數據交流:當作併發的 buffer 或者 queue,解決生產者 - 消費者問題。多個 goroutine 可以併發當作生產者(Producer)和消費者(Consumer)。
  • 數據傳遞:一個 goroutine 將數據交給另一個 goroutine,相當於把數據的擁有權 (引用) 託付出去。
  • 信號通知:一個 goroutine 可以將信號 (closing、closed、data ready 等) 傳遞給另一個或者另一組 goroutine 。
  • 任務編排:可以讓一組 goroutine 按照一定的順序併發或者串行的執行,這就是編排的功能。
  • 鎖:利用 Channel 也可以實現互斥鎖的機制。
chan string          // 可以發送接收string
chan<- struct{}      // 只能發送struct{}
<-chan int           // 只能從chan接收int
ch <- 2000 // 發送數據
x := <-ch // 把接收的一條數據賦值給變量x foo(<-ch) // 把接收的一個的數據作為參數傳給函數 <-ch // 丟棄接收的一條數據

close:關閉 chan 關閉掉

cap:返回 chan 的容量

chan 還可以應用於 for-range 語句中:

    for v := range ch {
        fmt.Println(v)
    }

使用 Channel 容易犯的錯誤

圖片

panic

  • close 為 nil 的 chan;
  • send 已經 close 的 chan;
  • close 已經 close 的 chan。

goroutine 泄漏

unbuffered chan(初始化時不指定容量)Writer和reader,這兩個事件必須同時發生。如果一個先發生,它所在的 goroutine 就會被 Go 的調度器阻塞

併發原語和Channel的選擇方法

  1. 共享資源的併發訪問使用傳統併發原語;
  2. 複雜的任務編排和消息傳遞使用 Channel;
  3. 消息通知機制使用 Channel,除非只想 signal 一個 goroutine,才使用 Cond;
  4. 簡單等待所有任務的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  5. 需要和 Select 語句結合,使用 Channel;
  6. 需要和超時配合時,使用 Channel 和 Context。

使用反射操作 Channel

通過 reflect.Select 函數,可以將一組運行時的 case clause 傳入,當作參數執行,無需寫100個Case,可以動態創建case

func main() {
    var ch1 = make(chan int, 10)
    var ch2 = make(chan int, 10)

    // 創建SelectCase
    var cases = createCases(ch1, ch2)

    // 執行10次select
    for i := 0; i < 10; i++ {
        chosen, recv, ok := reflect.Select(cases)
        if recv.IsValid() { // recv case
            fmt.Println("recv:", cases[chosen].Dir, recv, ok)
        } else { // send case
            fmt.Println("send:", cases[chosen].Dir, ok)
        }
    }
}

func createCases(chs ...chan int) []reflect.SelectCase {
    var cases []reflect.SelectCase


    // 創建recv case
    for _, ch := range chs {
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(ch),
        })
    }

    // 創建send case
    for i, ch := range chs {
        v := reflect.ValueOf(i)
        cases = append(cases, reflect.SelectCase{
            Dir:  reflect.SelectSend,
            Chan: reflect.ValueOf(ch),
            Send: v,
        })
    }

    return cases
}

典型的應用場景

消息交流

chan的內部實現是循環隊列,所以有時會被當成線程安全的隊列和 buffer 使用

例子:

  • 通過chan Job實現的worker 池
  • etcd 中的 node節點

數據傳遞

令牌(token)傳遞

type Token struct{}

func newWorker(id int, ch chan Token, nextCh chan Token) {
    for {
        token := <-ch         // 取得令牌
        fmt.Println((id + 1)) // id從1開始
        time.Sleep(time.Second)
        nextCh <- token
    }
}
func main() {
    chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}

    // 創建4個worker
    for i := 0; i < 4; i++ {
        go newWorker(i, chs[i], chs[(i+1)%4])
    }

    //首先把令牌交給第一個worker
    chs[0] <- struct{}{}
  
    select {}
}

信號通知

使用 chan 實現程序的 graceful shutdown,在退出之前執行一些連接關閉、文件 close、緩存落盤等一些動作

func main() {
    var closing = make(chan struct{})
    var closed = make(chan struct{})

    go func() {
        // 模擬業務處理
        for {
            select {
            case <-closing:
                return
            default:
                // ....... 業務計算
                time.Sleep(100 * time.Millisecond)
            }
        }
    }()

    // 處理CTRL+C等中斷信號
    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan

    close(closing)
    // 執行退出之前的清理動作
    go doCleanup(closed)

    select {
    case <-closed:
    case <-time.After(time.Second):
        fmt.Println("清理超時,不等了")
    }
    fmt.Println("優雅退出")
}

func doCleanup(closed chan struct{}) {
    time.Sleep((time.Minute))
    close(closed)
}

互斥鎖,利用 select+chan 的方式,很容易實現 TryLock、Timeout 的功能

任務編排

Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce

Or-Done 模式

如果有多個任務,只要有任意一個任務執行完,我們就想獲得這個信號

扇入模式

多個源 Channel 輸入、一個目的 Channel 輸出

扇出模式

扇出模式只有一個輸入源 Channel,有多個目標 Channel

Stream

流式管道,提供跳過幾個元素,或者是隻取其中的幾個元素等方法

map-reduce

第一步是映射(map),處理隊列中的數據,第二步是規約(reduce),把列表中的每一個元素按照一定的處理方式處理成結果,放入到結果隊列中

內存模型

圖片

多線程同時訪問同一個變量的可見性和順序

happens-before

👍在一個 goroutine 內部,程序的執行順序和它們的代碼指定的順序是一樣的,即使編譯器或者 CPU 重排了讀寫順序,從行為上來看,也和代碼指定的順序一樣

😡Go 只保證 goroutine 內部重排對讀寫的順序沒有影響,如果要保證多個 goroutine 之間對一個共享變量的讀寫順序,在 Go 語言中,可以使用併發原語為讀寫操作建立 happens-before 關係

  1. 在 Go 語言中,對變量進行零值的初始化就是一個寫操作。
  2. 如果對超過機器 word(64bit、32bit 或者其它)大小的值進行讀寫,那麼,就可以看作是對拆成 word 大小的幾個讀寫無序進行。
  3. Go 並不提供直接的 CPU 屏障(CPU fence)來提示編譯器或者 CPU 保證順序性,而是使用不同架構的內存屏障指令來實現統一的併發原語。

Go 語言中保證的 happens-before 關係

init 函數

main 函數一定在導入的包的 init 函數之後執行

goroutine

啓動 goroutine 的 go 語句的執行,一定 happens before 此 goroutine 內的代碼執行

Channel
  • 第 1 條規則是,往 Channel 中的發送操作,happens before 從該 Channel 接收相應數據的動作完成之前,即第 n 個 send 一定 happens before 第 n 個 receive 的完成。
  • 第 2 條規則是,close 一個 Channel 的調用,肯定 happens before 從關閉的 Channel 中讀取出一個零值。
  • 第 3 條規則是,對於 unbuffered 的 Channel,也就是容量是 0 的 Channel,從此 Channel 中讀取數據的調用一定 happens before 往此 Channel 發送數據的調用完成。
  • 第 4 條規則是,如果 Channel 的容量是 m(m>0),那麼,第 n 個 receive 一定 happens before 第 n+m 個 send 的完成。

Mutex/RWMutex

  1. 第 n 次的 m.Unlock 一定 happens before 第 n+1 m.Lock 方法的返回;
  2. 對於讀寫鎖 RWMutex m,如果它的第 n 個 m.Lock 方法的調用已返回,那麼它的第 n 個 m.Unlock 的方法調用一定 happens before 任何一個 m.RLock 方法調用的返回,只要這些 m.RLock 方法調用 happens after 第 n 次 m.Lock 的調用的返回。這就可以保證,只有釋放了持有的寫鎖,那些等待的讀請求才能請求到讀鎖。
  3. 對於讀寫鎖 RWMutex m,如果它的第 n 個 m.RLock 方法的調用已返回,那麼它的第 k (k<=n)個成功的 m.RUnlock 方法的返回一定 happens before 任意的 m.RUnlockLock 方法調用,只要這些 m.Lock 方法調用 happens after 第 n 次 m.RLock。

WaitGroup

Wait 方法等到計數值歸零之後才返回

Once

對於 once.Do(f) 調用,f 函數的那個單次調用一定 happens before 任何 once.Do(f) 調用的返回

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.