假設有一組任務有前後依賴關係,我們可以使用Go的通道特性,將前一個任務的執行結果(或結束信號)送入下一個任務,已達到自動化依次執行工作流的每個任務的目的。
為了模擬這一工作流,我們假設有五個通道和四個協程,每個協程監聽前一個通道的數據,並將接收到的數據送入下一個通道中。
當任務執行結束後,最好能夠主動回收通道,已達到節省內存開銷的目的。與執行工作流類似的是,應當能做到關閉首個通道後,按照依賴關係連鎖關閉後續所有通道。
示例代碼如下:https://go.dev/play/p/zcuEA7t_zSU
package main
import (
"fmt"
"time"
)
func main() {
// 創建通道
channels := make([]chan int, 5)
for i := range channels {
channels[i] = make(chan int)
}
// 設置協程
for i := 0; i < len(channels)-1; i++ {
i := i
go func(in <-chan int, out chan<- int) {
for {
value, ok := <-in // 從前一個通道接收數據
if !ok {
fmt.Printf("Channel[%d] closed. Exiting goroutine.\n", i)
close(out)
return
}
out <- value // 發送數據到下一個通道
}
}(channels[i], channels[i+1])
}
// 啓動第一個協程,將數據發送到第一個通道
go func(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i // 發送數據到第一個通道
}
close(out) // 關閉第一個通道
}(channels[0])
// 從最後一個通道接收數據並打印
for {
select {
case value, ok := <-channels[len(channels)-1]:
if !ok {
fmt.Println("Last channel closed. Exiting.")
return
}
fmt.Println("Received:", value)
case <-time.After(1 * time.Second):
fmt.Println("Timeout. Exiting.")
return
}
}
}
執行結果如下:
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Channel[0] closed. Exiting goroutine.
Channel[1] closed. Exiting goroutine.
Channel[2] closed. Exiting goroutine.
Received: 8
Received: 9
Channel[3] closed. Exiting goroutine.
Last channel closed. Exiting.
從執行結果中可以看出,最後一個協程的結果還未送出時,首個通道既已開始關閉。