Stories

Detail Return Return

在 Go 語言中如何實現協程池 - Stories Detail

公眾號首發:https://mp.weixin.qq.com/s/Xbk4QF7HFll102xaF5r_3Q

如果你熟悉 Java、Python 等編程語言,那麼你一定聽説或者使用過進程池或線程池。因為進程和線程不是越多越好,過多的進程或線程可能造成資源浪費和性能下降。所以池化技術在這些主流編程語言中非常流行,可以有效控制併發場景下資源使用量。

而 Go 語言則沒有提供多進程和多線程的支持,僅提供了協程(goroutine)的概念。在 Go 中開啓 goroutine 的成本非常低,以至於我們在絕大多數情況下開啓 goroutine 時根本無需考慮協程數量,所以也就很少有人提及 Go 的協程池化技術。不過使用場景少,不代表完全沒用。通過協程池我們可以來掌控資源使用量,降低協程泄漏風險。

gammazero/workerpool 就是用來實現協程池的 Go 包,本文我們一起來學習一下其使用方法,並深入其源碼來探究下如何實現一個 Go 協程池。

使用示例

workerpool 直譯過來是工作池,在 Go 中就是指代協程池。workerpool 的用法非常簡單,示例代碼如下:

package main

import (
    "fmt"
    "time"

    "github.com/gammazero/workerpool"
)

func main() {
    wp := workerpool.New(2)
    requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

    for _, r := range requests {
        wp.Submit(func() {
            fmt.Printf("%s: Handling request: %s\n", time.Now().Format(time.RFC3339), r)
            time.Sleep(1 * time.Second)
        })
    }

    wp.StopWait()
}

workerpool.New(2) 表示我們創建了一個容量為 2 的協程池,即同一時刻最多隻會有 2 個 goroutine 正在執行。wp.Submit() 用來提交一個任務,任務類型為無參數和返回值的函數 func(),這裏我們在 for 循環中提交了 5 個任務。調用 wp.StopWait() 可以等待所有已提交的任務執行完成。

執行示例代碼,得到輸出如下:

$ go run main.go
2025-05-08T23:40:16+08:00: Handling request: alpha
2025-05-08T23:40:16+08:00: Handling request: beta
2025-05-08T23:40:17+08:00: Handling request: gamma
2025-05-08T23:40:17+08:00: Handling request: delta
2025-05-08T23:40:18+08:00: Handling request: epsilon

不過這裏的輸出內容並不是一下子全部輸出完成的,而是兩行兩行的輸出。

根據打印的時間可以發現,是先輸出:

2025-05-08T23:40:16+08:00: Handling request: alpha
2025-05-08T23:40:16+08:00: Handling request: beta

接着等待 1s 再輸出:

2025-05-08T23:40:17+08:00: Handling request: gamma
2025-05-08T23:40:17+08:00: Handling request: delta

再次等待 1s 最後輸出:

2025-05-08T23:40:18+08:00: Handling request: epsilon

這個輸出結果符合預期,也就是説同一時刻最多隻會有 2 個 goroutine 正在執行。

源碼解讀

workerpool 用法非常簡單,接下來我們一起看看其實現原理。

下圖是 workerpool 源碼中實現的全部功能:

image.png

WorkerPool 是一個結構體,源碼中圍繞這個結構體定義了很多函數或方法。這些函數或方法你不必死記硬背,先有一個宏觀上的認知,接下來我將帶你深入學習其中的核心方法。

WorkerPool 結構體完整定義如下:

// WorkerPool 是 Go 協程的集合池,用於確保同時處理請求的協程數量嚴格受控於預設的上限值
type WorkerPool struct {
    maxWorkers   int                 // 最大工作協程數
    taskQueue    chan func()         // 任務提交隊列
    workerQueue  chan func()         // 工作協程消費隊列
    stoppedChan  chan struct{}       // 停止完成通知通道
    stopSignal   chan struct{}       // 停止信號通道
    waitingQueue deque.Deque[func()] // 等待隊列(雙端隊列)
    stopLock     sync.Mutex          // 停止操作互斥鎖
    stopOnce     sync.Once           // 控制只停止一次
    stopped      bool                // 是否已經停止
    waiting      int32               // 等待隊列中任務計數
    wait         bool                // 協程池退出時是否等待已入隊任務執行完成
}

這裏屬性很多,其中有 3 個屬性是需要我們重點關注的,taskQueueworkerQueue 以及 waitingQueue,這三者分別代表任務提交隊列、工作隊列和等待隊列。稍後我將通過一個流程圖來講解任務在這 3 個隊列中的傳遞流程,現在我們一起來看一下 WorkerPool 的構造函數:

// New 創建並啓動協程池
// maxWorkers 參數指定可以併發執行任務的最大工作協程數。
func New(maxWorkers int) *WorkerPool {
    // 至少有一個 worker
    if maxWorkers < 1 {
        maxWorkers = 1
    }

    // 實例化協程池對象
    pool := &WorkerPool{
        maxWorkers:  maxWorkers,
        taskQueue:   make(chan func()),
        workerQueue: make(chan func()),
        stopSignal:  make(chan struct{}),
        stoppedChan: make(chan struct{}),
    }

    // 啓動任務調度器
    go pool.dispatch()

    return pool
}

New 函數創建一個指定容量的協程池對象 WorkerPool,我們已經在使用示例中見過其用法了。這裏邏輯還是比較簡單的,僅接收一個參數,並初始化了幾個必要的屬性。

值得注意的是,這裏通過開啓新的 goroutine 的方式啓動了 dispatch() 方法,這個方法是協程池最核心的邏輯,用來實現任務的調度執行。

為此,我畫了一張流程圖,來分析 WorkerPool 最核心的任務派發流程:

image.png

圖中涉及兩個方法,其中 Submit() 方法用於提交一個任務到協程池,dispatch 方法則用於派發任務到 goroutine 中去執行。dispatch 方法內部有一個無限循環,實現任務實時派發執行。這個 for 無限循環中控制着任務在 3 個隊列中的流轉和工作協程數量。

只要通過 Submit() 方法提交任務,就一定會進入任務提交隊列 taskQueue 中,而 taskQueue 是一個通過 make(chan func()) 初始化的無緩衝的 channel,所以任務不會在裏面停留,要麼通過鏈路 ② 下發到等待隊列 waitingQueue 中,要麼通過鏈路 ④ 下發到工作隊列 workerQueue 中。最終具體會下發到哪裏,是 dispatch 方法中的 for 循環邏輯來決定的。

dispatchfor 循環中會處理任務分發,核心邏輯有兩個部分,包含兩種處理模式:

  • 隊列優先模式:在 for 循環中,會優先判斷等待隊列 waitingQueue 是否為空,如果不為空,則進入隊列優先模式。

    1. 此時會優先從等待隊列對頭取出任務,然後交給工作隊列 workerQueue,協程池中的工作協程(worker)就會不停的從 workerQueue 中拿到任務並執行。
    2. 如果此時剛好還有新的任務被提交,則新提交的任務自動進入等待隊列尾部。
    3. 任務從提交到執行的流程是 ① ② ③。
  • 直通模式:等待隊列完全清空後,程序自動切換到直通模式。

    1. 此時等待隊列 workerQueue 已經清空,如果有新任務提交進來,可以直接交給工作隊列 workerQueue,讓工作協程(worker)來執行。
    2. 如果此時工作協程數量達到了協程池的上限,則將任務提交到等待隊列 waitingQueue 中。
    3. 任務從提交到執行的流程是 ① ④。

以上就是協程池 dispatch 方法的核心調度流程。

接下來,我將對 WorkerPool 核心代碼進行一一解讀,以此來從微觀上更加細緻的理解協程池的設計。

Submit 方法實現如下:

// Submit 將任務函數提交到工作池隊列等待執行,不會等待任務執行完成
func (p *WorkerPool) Submit(task func()) {
    if task != nil {
        p.taskQueue <- task
    }
}

這個方法非常簡單,就是將任務提交到 taskQueue 隊列中。

接下來我們看下是最核心也是最複雜的方法 dispatch 是如何實現的:

// 任務派發,循環的將下一個排隊中的任務發送給可用的工作協程(worker)執行
func (p *WorkerPool) dispatch() {
    defer close(p.stoppedChan)            // 保證調度器退出時關閉停止通知通道
    timeout := time.NewTimer(idleTimeout) // 創建 2 秒週期的空閒檢測定時器
    var workerCount int                   // 當前活躍 worker 計數器
    var idle bool                         // 空閒狀態標識
    var wg sync.WaitGroup                 // 用於等待所有 worker 完成

Loop:
    for { // 主循環處理任務分發

        // 隊列優先模式:優先檢測等待隊列
        if p.waitingQueue.Len() != 0 {
            if !p.processWaitingQueue() {
                break Loop // 等待隊列為空,退出循環
            }
            continue
        }

        // 直通模式:開始處理提交上來的新任務
        select {
        case task, ok := <-p.taskQueue: // 接收到新任務
            if !ok { // 協程池停止時會關閉任務通道,如果 !ok 説明協程池已停止,退出循環
                break Loop
            }

            select {
            case p.workerQueue <- task: // 嘗試派發任務
            default: // 沒有空閒的 worker,無法立即派發任務
                if workerCount < p.maxWorkers { // 如果協程池中的活躍協程數量小於最大值,那麼創建一個新的協程(worker)來執行任務
                    wg.Add(1)
                    go worker(task, p.workerQueue, &wg) // 創建新的 worker 執行任務
                    workerCount++                       // worker 記數加 1
                } else { // 已達協程池容量上限
                    p.waitingQueue.PushBack(task)                              // 將任務提交到等待隊列
                    atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len())) // 原子更新等待計數
                }
            }
            idle = false // 標記為非空閒
        case <-timeout.C: // 空閒超時處理
            // 在一個空閒超時週期內,存在空閒的 workers,那麼停止一個 worker
            if idle && workerCount > 0 {
                if p.killIdleWorker() { // 回收一個 worker
                    workerCount-- // worker 計數減 1
                }
            }
            idle = true                // 標記為空閒
            timeout.Reset(idleTimeout) // 複用定時器
        }
    }

    if p.wait { // 調用了 StopWait() 方法,需要運行等待隊列中的任務,直至隊列清空
        p.runQueuedTasks()
    }

    // 終止所有 worker
    for workerCount > 0 {
        p.workerQueue <- nil // 發送終止信號給 worker
        workerCount--        // worker 計數減 1,直至為 0 退出循環
    }
    wg.Wait() // 阻塞等待所有 worker 完成

    timeout.Stop() // 停止定時器
}

這個方法代碼量稍微有點多,不過結合我上面畫的流程圖,其實也好理解。我在代碼註釋中也標出了兩種任務處理模式:等待隊列優先模式和直通模式。

我們先看等待隊列優先模式:

// 隊列優先模式:優先檢測等待隊列
if p.waitingQueue.Len() != 0 {
    if !p.processWaitingQueue() {
        break Loop // 協程池已經停止
    }
    continue // 隊列不為空則繼續下一輪循環
}

如果等待隊列不為空,則優先處理等待隊列。p.processWaitingQueue 方法實現如下:

// 處理等待隊列
func (p *WorkerPool) processWaitingQueue() bool {
    select {
    case task, ok := <-p.taskQueue: // 接收到新任務
        if !ok { // 協程池停止時會關閉任務通道,如果 !ok 説明協程池已停止,返回 false,不再繼續處理
            return false
        }
        p.waitingQueue.PushBack(task) // 將新任務加入等待隊列隊尾
    case p.workerQueue <- p.waitingQueue.Front(): // 從等待隊列隊頭獲取任務並放入工作隊列
        p.waitingQueue.PopFront() // 任務已經開始處理,所以要從等待隊列中移除任務
    }
    atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len())) // 原子修改等待隊列中任務計數
    return true
}

這個方法中有兩個 case 需要處理:

  1. 接收到新任務,直接加入到等待隊列 waitingQueue 的隊尾。
  2. 從等待隊列 waitingQueue 的隊頭獲取任務並放入工作隊列 workerQueue

這與前文流程圖中的講解吻合。

任務交給工作隊列 workerQueue 以後,誰來處理 workerQueue 中的任務呢?我們接着往下看直通模式的代碼。

直通模式的代碼中同樣使用 select 多路複用,將邏輯分成了兩個 case 來處理:

// 直通模式:開始處理提交上來的新任務
select {
case task, ok := <-p.taskQueue: // 接收到新任務
    ...
case <-timeout.C: // 空閒超時處理
    ...
}

兩個 case 分別實現任務執行和空閒超時處理。

我們先來看處理任務的 case:

case task, ok := <-p.taskQueue: // 接收到新任務
    if !ok { // 協程池停止時會關閉任務通道,如果 !ok 説明協程池已停止,退出循環
        break Loop
    }

    select {
    case p.workerQueue <- task: // 嘗試派發任務
    default: // 沒有空閒的 worker,無法立即派發任務
        if workerCount < p.maxWorkers { // 如果協程池中的活躍協程數量小於最大值,那麼創建一個新的協程(worker)來執行任務
            wg.Add(1)
            go worker(task, p.workerQueue, &wg) // 創建新的 worker 執行任務
            workerCount++                       // worker 記數加 1
        } else { // 已達協程池容量上限
            p.waitingQueue.PushBack(task)                              // 將任務提交到等待隊列
            atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len())) // 原子更新等待計數
        }
    }
    idle = false // 標記為非空閒

直通模式下,有新的任務提交進來,首先會嘗試直接將其加入工作隊列 workerQueue 中,如果任務下發失敗,則説明當前時刻沒有空閒的工作協程(worker),無法立即派發任務。那麼繼續比較當前正在執行的工作協程數量(workerCount)和協程池大小(maxWorkers),如果協程池中的活躍協程數量小於最大值,那麼創建一個新的協程(worker)來執行任務。否則,説明正在執行的工作協程數量已達協程池容量上限,那麼將任務提交到等待隊列 waitingQueue 中。那麼下一次 for 循環執行的時候,檢測到 waitingQueue 中有任務,就會優先處理 waitingQueue。這也就實現了兩種模式的切換。

我們再來看下工作協程 worker 是如何執行任務的:

// 工作協程,執行任務並在收到 nil 信號時停止
func worker(task func(), workerQueue chan func(), wg *sync.WaitGroup) {
    for task != nil { // 循環執行任務,直至接收到終止信號 nil
        task()               // 執行任務
        task = <-workerQueue // 接收新任務
    }
    wg.Done() // 標記 worker 完成
}

可以發現,這裏使用 for 循環來不停的執行提交過來的任務,直至從 workerQueue 中接收到終止信號 nil。那麼這個終止信號是何時下發的呢?往下看你馬上能找到答案。

接下來我們看一下直通模式的另外一個 case 邏輯:

case <-timeout.C: // 空閒超時處理
    // 在一個空閒超時週期內,存在空閒的 workers,那麼停止一個 worker
    if idle && workerCount > 0 {
        if p.killIdleWorker() { // 回收一個 worker
            workerCount-- // worker 計數減 1
        }
    }
    idle = true                // 標記為空閒
    timeout.Reset(idleTimeout) // 複用定時器

這裏使用定時器來管理超過特定時間,未收到任務,需要關閉空閒的工作協程(worker)。

關閉 worker 的方法是 p.killIdleWorker

// 停止一個空閒 worker
func (p *WorkerPool) killIdleWorker() bool {
    select {
    case p.workerQueue <- nil: // 發送終止信號給工作協程(worker)
        // Sent kill signal to worker.
        return true
    default:
        // No ready workers. All, if any, workers are busy.
        return false
    }
}

這裏正是通過給 workerQueue 發送 nil 來作為終止信號,以此來實現通知 worker 退出的。

看完了 Submitdispatch 方法源碼,你現在是否對協程池有了更深入的認知呢?你可以再回顧一下我在前文中畫的任務調度流程圖,加深印象。

workerpool 的源碼就講解到這裏,其他方法實現其實都比較簡單,就交給你自己去探索了。你可以參考我的中文註釋版源碼:https://github.com/jianghushinian/blog-go-example/blob/main/goroutine/workerpool/workerpool.go。

總結

協程池作為 Go 中不那麼常用的技術,依然有其存在的價值,本文介紹的 workerpool 項目是一個協程池的實現。

workerpool 用法非常簡單,僅需要通過 workerpool.New(n) 函數既可創建一個大小為 n 的協程池,之後通過 wp.Submit(task) 既可以提交任務到協程池。

workerpool 內部提供了 3 個隊列來對任務進行派發調度,任務提交隊列 taskQueue 和工作隊列 workQueue 都是使用 channel 實現的,並且無緩衝,真正帶有緩衝效果的隊列是等待隊列 WaitingQueue,這個是真正的隊列實現,採用雙端隊列,而非 channel,並且不限制隊列長度。也就是説,無論排隊多少個任務,workerpool 都不會阻止新任務的提交。所以,我們在創建協程池時需要設置一個合理的大小限制,以防止等待隊列無限增長,任務很長一段時間都得不到執行。

此外,workerpool 內部雖然會維護一個協程池,但超過一定空閒時間沒有任務提交過來,工作協程是會關閉的,之後新任務進來再次啓動新的協程,因為啓動新協程開銷小,所以沒長久駐留協程。

本文示例源碼我都放在了 GitHub 中,歡迎點擊查看。

希望此文能對你有所啓發。

延伸閲讀

  • workerpool GitHub 源碼:https://github.com/gammazero/workerpool
  • 本文 GitHub 示例代碼:https://github.com/jianghushinian/blog-go-example/blob/main/goroutine/workerpool/workerpool.go
  • 本文永久地址:https://jianghushinian.cn/2025/05/09/goroutine-workerpool/

聯繫我

  • 公眾號:Go編程世界
  • 微信:jianghushinian
  • 郵箱:jianghushinian007@outlook.com
  • 博客:https://jianghushinian.cn
  • GitHub:https://github.com/jianghushinian

Add a new Comments

Some HTML is okay.