Stories

Detail Return Return

腦抽研究生Go併發-1-基本併發原語-下-Cond、Once、Map、Pool、Context - Stories Detail

Once

圖片

單例對象:在整個應用程序的生命週期中,只有一個實例存在,並提供一個全局統一的訪問點來獲取這個唯一的實例

  • 應用場景:數據庫連接池全局配置管理器日誌記錄器 (Logger)

Once 是在 Go 語言中實現線程安全的單例模式的完美且最地道的工具

使用 Once 可能出現的 2 種錯誤

  • 第一種錯誤:死鎖

​ once.Do()中再次調用once.Do()

  • 第二種錯誤:未正確初始化

    即使一些資源沒有在once.Do()中初始化成功,也不能再進行once.Do()了

    解決辦法:

    • 實現一個類似 Once 的併發原語,既可以返回當前調用 Do 方法是否正確完成,還可以在初始化失敗後調用 Do 方法再次嘗試初始化,直到初始化成功才不再初始化了。
    • singleflight:對於同一個任務,無論有多少個併發請求,都確保在同一時間內只有一個請求真正在執行,其他請求只會等待這個執行結果,不會繼續嘗試

Map

圖片

Add、Lookup、Delete

順序獲取 Map 的值

  • 如果要按照 key 的順序獲取 map 的值,需要先取出所有的 key 進行排序,然後按照這個排序的 key 依次獲取對應的值

orderedmap:可以記錄插入順序

使用 map 的 2 種常見錯誤

  • 常見錯誤一:未初始化

    map 對象必須在使用之前初始化,m := make(map[int]int)

  • 常見錯誤二:併發讀寫

    Go 內建的 map 對象不是線程(goroutine)安全的,併發讀寫的時候運行時會有檢查,遇到併發問題就會導致 panic。

加讀寫鎖:擴展 map,支持併發讀寫(泛型版本)

泛型:代碼複用 + 類型安全 + 代碼更簡潔、更易讀

// 引入了兩個類型參數:
// K: 代表鍵(Key)的類型。它有一個約束 comparable,因為 map 的鍵必須是可比較的。
// V: 代表值(Value)的類型。它可以是任何類型 (any)。
type RWMap[K comparable, V any] struct {
    sync.RWMutex
    m map[K]V // m 的類型不再是寫死的,而是由 K 和 V 決定
}

// 新建一個泛型的 RWMap
// 注意函數簽名也需要聲明類型參數
func NewRWMap[K comparable, V any](n int) *RWMap[K, V] {
    return &RWMap[K, V]{
        m: make(map[K]V, n),
    }
}

// Get 方法的參數和返回值類型都變成了泛型
func (m *RWMap[K, V]) Get(k K) (V, bool) {
    m.RLock()
    defer m.RUnlock()
    v, existed := m.m[k]
    return v, existed
}

// Set 方法的參數類型也變成了泛型
func (m *RWMap[K, V]) Set(k K, v V) {
    m.Lock()
    defer m.Unlock()
    m.m[k] = v
}

// Delete 方法的參數類型也變成了泛型
func (m *RWMap[K, V]) Delete(k K) {
    m.Lock()
    defer m.Unlock()
    delete(m.m, k)
}

func (m *RWMap[K, V]) Len() int {
    m.RLock()
    defer m.RUnlock()
    return len(m.m)
}

// Each 方法的回調函數 f 的參數類型也變成了泛型
func (m *RWMap[K, V]) Each(f func(k K, v V) bool) {
    m.RLock()
    defer m.RUnlock()
    for k, v := range m.m {
        if !f(k, v) {
            return
        }
    }
}

Map的分片

import (
    "github.com/orcaman/concurrent-map/v2"
)

Map 可以通過分片減少鎖的顆粒度,從而追求更高的性能

應對特殊場景的 sync.Map(很少用)

  • 只會增長的緩存系統中,一個 key 只寫入一次而被讀很多次
  • 多個 goroutine 為不相交的鍵集讀、寫和重寫鍵值對。

Store、Load 和 Delete 方法

  • Store方法 :設置一個鍵值對
  • Load 方法:讀取一個 key 對應的值
  • Delete 方法:根據鍵刪

性能測試(實踐出真知)

拓展

帶有過期功能的timedmap,使用紅黑樹實現的 key 有序的treemap等

Pool

圖片

Go 的自動垃圾回收機制包含 STW:有時會節省空間,但有時會導致重複新建資源,特別是數據庫連接、TCP 的長連接創建非常耗時

對象池:把不用的對象回收起來,避免被垃圾回收掉,就不必在堆上重新創建了

sync.Pool

  • sync.Pool 本身就是線程安全的,多個 goroutine 可以併發地調用它的方法存取對象
  • sync.Pool 不可在使用之後再複製使用。

New:創建新的元素

Get:取走一個元素

Put:將一個元素返還給 Pool

常用場景:內存中的臨時對象(緩衝池)

Go 1.13 之前的 sync.Pool 的實現的 2 大問題:

  • 每次 GC 都會回收創建的對象
  • 底層實現使用了 Mutex,對這個鎖併發請求競爭激烈的時候,會導致性能的下降

Go 1.13

  • private:極致性能,單P私有,無鎖。
  • shared:高性能,本地P優先,但可被其他P“竊取”,有原子操作。
  • local:是 private 和 shared 的組合,每個P獨享一個,這是避免全局鎖的關鍵。
  • victim:是上一輪GC時的local,是對象的“復活區”,用來應對流量波動,提升對象存活率。
  • poolCleanup:就是一次“輪換”,把victim扔掉,把local變成新的victim,再清空local。
Get 一個杯子(對象)的流程 (從快到慢)

一個 goroutine (在 P1 咖啡師上運行) 需要一個杯子:

  1. 先看私有小抽屜 (private):P1 咖啡師先打開自己的 private 抽屜。如果有,直接拿走。這是最快的路徑,無鎖。
  2. 再看共享杯子架 (shared):如果 private 是空的,P1 就從自己面前的 shared 杯子架上拿一個。這需要原子操作,稍慢。
  3. 去“偷”別人的 (work stealing):如果自己的 shared 也空了,P1 會隨機看一眼隔壁 P2 咖啡師的 shared 杯子架,從上面“偷”一半的杯子過來。這是 sync.Pool 性能的第二個秘密,實現了負載均衡。
  4. 最後看備用區 (victim):如果偷也偷不到,P1 就會去吧枱角落的“上一輪用剩的托盤” (victim) 上找一個。
  5. 實在沒了就造新的:如果 victim 也沒有,P1 只能調用 Pool.New 函數,從倉庫裏拿一個全新的杯子。
Put 一個用完的杯子回去的流程
  1. 先放回私有小抽屜 (private):如果 P1 的 private 抽屜是空的,他就把杯子放進去。無鎖,最快。
  2. 再放回共享杯子架 (shared):如果 private 抽屜已經有杯子了,他就把這個新杯子放到自己面前的 shared 杯子架上。

sync.Pool 的坑

  • 內存泄漏:如果裝的是切片,如byte slice,那麼使用之後會變大,放回來就大了,佔很大空間

    • 解決辦法:檢查回收的對象的大小,太大就不要了
  • 內存浪費:池子中的 buffer 都比較大,用不上

    • 解決辦法:buffer 池分層 / 分桶(Bucketing)

第三方庫

bytebufferpool、oxtoacart/bpool(容量控制)

使用場景(網絡連接):

  • 標準庫中的 http client 池:訪問 web 服務器
  • TCP 連接池:fatih/pool、net.Conn
  • 數據庫連接池:sql.DB
  • Memcached Client 連接池

Worker Pool / Goroutine Pool

  • gammazero/workerpool:gammazero/workerpool 可以無限制地提交任務,提供了更便利的 Submit 和 SubmitWait 方法提交任務,還可以提供當前的 worker 數和任務數以及關閉 Pool 的功能。
  • ivpusic/grpool:grpool 創建 Pool 的時候需要提供 Worker 的數量和等待執行的任務的最大數量,任務的提交是直接往 Channel 放入任務。
  • dpaks/goworkers:dpaks/goworkers 提供了更便利的 Submit 方法提交任務以及 Worker 數、任務數等查詢方法、關閉 Pool 的方法。它的任務的執行結果需要在 ResultChan 和 ErrChan 中去獲取,沒有提供阻塞的方法,但是它可以在初始化的時候設置 Worker 的數量和任務數。

總結:

場景一:我需要複用【內存對象】

  • 默認選項:sync.Pool (標準庫)

    • 什麼時候用?

      • 當你需要複用的對象是固定大小的,或者大小變化不大的時候。
      • 當你需要複用的對象有明確的 Reset 方法可以將其恢復到初始狀態時。
      • 最常見場景:複用 bytes.Buffer, json.Encoder, Protobuf 消息對象等。
    • 優點:標準庫自帶,無需引入第三方依賴,經過大規模驗證,性能極高。
    • 缺點:如你總結,對可變大小的對象(尤其是 buffer)支持不佳,可能導致內存浪費或泄漏。
  • 進階選項:bytebufferpool (第三方分桶池)

    • 什麼時候用?

      • 當你需要複用的對象是[]byte 或 bytes.Buffer,並且這些 buffer 的大小會頻繁變化時。
      • 最常見場景:網絡編程中,讀取或寫入不定長的數據包;Web 框架中,渲染模板或構建 HTTP 響應體。
    • 優點:通過分桶機制,極大地減少了內存浪費,避免了 buffer 的頻繁擴容。
    • 決策依據:如果你用 pprof 分析程序,發現大量內存在 bytes.Buffer 上,並且程序性能抖動,那麼換用 bytebufferpool 可能會有奇效。

場景二:我需要複用【網絡連接】

  • database/sql (標準庫數據庫連接池)

    • 什麼時候用?

      • 只要你使用 Go 連接關係型數據庫 (MySQL, PostgreSQL 等),你就必須使用它。它不是一個選項,而是一個標配。
      • sql.DB 對象本身就是一個線程安全的連接池,你只需要在程序啓動時初始化一次,然後在各處共享使用即可。
    • 關鍵點:你不需要自己去實現數據庫連接池。
  • 第三方通用連接池 (如 fatih/pool)

    • 什麼時候用?

      • 當你需要連接非數據庫的服務,並且對方沒有提供官方的連接池實現時。
      • 最常見場景

        • TCP 連接池:你需要和一個自定義的 TCP 服務進行高頻、短時的通信。
        • gRPC Client 連接池:雖然單個 gRPC client conn 內部可以處理併發,但在超高併發下,使用多個 conn 組成的池可以進一步提升吞吐量。
        • Redis Client 連接池:大多數現代 Redis 客户端(如 go-redis)內部已經為你管理好了連接池,通常你也不需要自己再包一層。

場景三:我需要複用【Goroutine】

  • 什麼時候用?

    • 當你有一個任務源源不斷的場景,但你又不希望一有任務就啓動一個 goroutine,導致瞬間併發數失控,最終壓垮系統。
    • 最常見場景:

      • 一個爬蟲系統,需要併發地抓取成千上萬個 URL,但你希望最多隻同時有 50 個抓取任務在運行。
      • 一個數據處理服務,需要從消息隊列(如 Kafka)中消費消息並處理,你希望控制處理消息的併發度。
      • 一個圖片處理服務,用户上傳圖片後需要進行多種處理(縮放、加水印),你希望用一個協程池來統一管理這些處理任務。

Context

圖片

上下文:在 API 之間或者方法調用之間,所傳遞的除了業務參數之外的額外信息

  • 上下文信息傳遞 (request-scoped),比如處理 http 請求、在請求處理鏈路上傳遞信息;
  • 控制子 goroutine 的運行;
  • 超時控制的方法調用;
  • 可以取消的方法調用。

Deadline 方法:返回這個 Context 被取消的截止日期

Done 方法:返回一個 Channel 對象;如果 Done 沒有被 close,Err 方法返回 nil;如果 Done 被 close,Err 方法會返回 Done 被 close 的原因

Value :返回此 ctx 中和指定的 key 相關聯的 value

context.Background():返回一個非 nil 的、空的 Context,沒有任何值,不會被 cancel,不會超時,沒有截止日期。

context.TODO():(不咋用)

約定俗成的規則:

首參傳 Context,
空值 Background。
切莫長久存,
傳值不傳形。
Key 用自定義,
安全不衝突。

特殊用途 Context

  • context.WithValue:可以攜帶額外的信息(KV對)
  • context.WithCancel:可以被手動喊停
  • context.WithTimeout:指定時間自動取消
  • context.WithDeadline:具體的未來時間點會自動取消(更適合跨多個節點、有統一截止時間的分佈式任務)

應用場景

場景一:優雅地控制 Goroutine 的“生死”
func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done(): // 監聽到取消信號
                fmt.Println("任務已取消,正在退出...")
                return
            default:
                // 正常執行任務...
                fmt.Println("正在工作中...")
                time.Sleep(1 * time.Second)
            }
        }
    }(ctx)

    // 讓子goroutine運行3秒
    time.Sleep(3 * time.Second)
    
    // 主任務決定取消子任務
    fmt.Println("通知子任務取消!")
    cancel()

    // 等待子任務退出
    time.Sleep(1 * time.Second)
}
場景二:為“慢操作”設定“最後期限” (超時控制)
func fetchUserData(userID string) (string, error) {
    // 最多給這次請求 50 毫秒的時間
    ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
    defer cancel() // 釋放資源

    // 創建一個帶 context 的 HTTP 請求
    req, err := http.NewRequestWithContext(ctx, "GET", "http://api.example.com/users/"+userID, nil)
    if err != nil {
        return "", err
    }

    // 執行請求
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        // 如果是因為超時,這裏的 err 就是 context.DeadlineExceeded
        if errors.Is(err, context.DeadlineExceeded) {
            fmt.Println("請求下游服務超時了!")
        }
        return "", err
    }
    // ... 處理響應
    defer resp.Body.Close()
    return "some data", nil
}
場景三:在請求調用鏈中“秘密”傳遞信息 (元數據傳遞)
// 定義一個私有的 key 類型,防止衝突
type traceIDKey struct{}

// 中間件:將 trace_id 注入 context
func TracingMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        traceID := r.Header.Get("X-Request-ID") // 假設從請求頭獲取
        if traceID == "" {
            traceID = "generate-a-new-one"
        }
        // 注入 context
        ctx := context.WithValue(r.Context(), traceIDKey{}, traceID)
        // 將新的 context 傳入下一個處理器
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

// 業務邏輯處理器:從 context 中讀取 trace_id
func GetUserHandler(w http.ResponseWriter, r *http.Request) {
    // 讀取 context
    ctx := r.Context()
    if traceID, ok := ctx.Value(traceIDKey{}).(string); ok {
        fmt.Printf("處理請求,Trace ID: %s\n", traceID)
    }
    // ... 業務邏輯
}

Add a new Comments

Some HTML is okay.