關於併發
Go 語言的創始人Rob Pike 曾説過:並行關乎執行,併發關乎結構。他認為:
• 併發是一種程序設計方法:將一個程序分解成多個小片段,每個小片段獨立執行;併發程序的小片段之間可以通過通信相互協作。
• 並行是有關執行的,它表示同時進行一些計算任務。
程序小片段之間通訊不同語言實現不同,比如:傳統語言Java使用共享內存方式達到線程之間通訊,而Go語言channel來進行通訊。
原生線程、Java線程、Goroutine
Java中的多線程,由 JVM 在 Java 堆中分配內存來存儲線程的相關信息,包括線程棧、程序計數器等。當需要執行 Java 線程時,它會向操作系統請求分配一個或多個原生線程(例如 POSIX 線程或 Windows 線程),操作系統分配成功後,JVM 會將 Java 線程與這些原生線程進行映射,並建立關聯,並在需要時將 Java 線程的狀態同步到相應的原生線程中。
由此可以看出,Java線程和原生線程1:1對應,由操作系統(OS)調度算法執行,該併發以下特點:
- 線程棧默認空間大且不支持動態伸縮,Java 默認最小都是1MB,Linux 默認 8MB;
- 線程切換創建、銷燬以及線程間上下文切換的代價都較大。
- 線程通過共享內存進行通訊,
POSIX線程(Pthreads)是C函數、類型和常數的集合,用於創建和管理線程。它是POSIX標準的一個子集,提供在BeagleBone Black上使用C/C++應用程序實現線程所需的一切。
原生線程就是操作系統線程或叫系統線程。
Go語言引入用户層輕量級線程(Goroutine),它由Go運行時負責調度。Goroutine相比傳統操作系統線程而言有如下優勢。
- 資源佔用小,每個Goroutine的初始棧大小僅為2KB,且支持動態伸縮,避免內存浪費;
- 由Go運行時而不是操作系統調度,goroutine上下文切換代價較小;
- 內置channel作為goroutine間通信原語,為併發設計提供強大支撐。
瞭解Go調度原理
Go 語言實現了調度器(scheduler),它負責將 goroutine 分配到原生線程上執行。
G-P-M模型
Go 語言中的調度模型(G-P-M模型)它包含了三個重要組件:G(goroutine)、P(processor)、M(machine)。
- G(goroutine):一個執行單元,這裏也就是 goroutine,它包含了執行代碼所需的信息,比如棧空間、程序計數器等。
- P(processor):P 一個邏輯處理器,它負責執行 goroutine。每個 P 維護了一個 goroutine 隊列,它可以將 goroutine 分配到 M(系統線程)上執行。P 的數量由 GOMAXPROCS 環境變量決定,默認值為 CPU 的邏輯核心數。
- M(machine):一個系統線程(machine),它負責執行 goroutine 的真正計算工作。M 與操作系統的線程直接綁定,負責實際的計算任務,比如執行 goroutine 的函數、系統調用等。Go 語言的調度器會將多個 goroutine 映射到少量的系統線程上執行。
搶佔式調度
在上面模型中,如果某個G處於死循環或長時間執行(比如:進行系統調用,IO操作),那麼P隊列裏面的G就長時間得不到執行,為了解決此問題,需要使用搶佔式調度。
Java 中有以下兩種搶佔式調度算法
-
優先級調度(Priority Scheduling):
- 每個線程都有一個優先級,高優先級的線程會比低優先級的線程更容易獲得CPU的執行權(注意:設置了優先級不是絕對優先執行,只是概率上高)。
- 在Java中,線程的優先級範圍是從
Thread.MIN_PRIORITY(1)到Thread.MAX_PRIORITY(10),默認是Thread.NORM_PRIORITY(5)。
-
時間片輪轉調度(Round Robin Scheduling):
- 每個線程被分配一個固定的時間片,當該線程的時間片用完時,操作系統會暫停它的執行,將CPU控制權交給下一個線程。
- 在Java中,時間片輪轉調度通過
yield()方法來實現。當線程調用yield()時,它就會主動放棄CPU的執行權,讓其他線程有機會執行。
Go 語言與Java搶佔調度不同,Java是實際上是操作系統時間片輪轉調度,發生在內核層。Go 搶佔調度是發生在用户層,由 Go 運行時管理,通過軟件定時器和搶佔點來實現搶佔。
Go 程序啓動時會創建一個線程(稱為監控線程),該線程運行一個內部函數 sysmon ,用來進行系統監控任務,如垃圾回收、搶佔調度、監視死鎖等。這個函數在後台運行,確保 Go 程序的正常運行。
func main() {
...
if GOARCH != "wasm" {
// 系統棧上的函數執行
systemstack(func() {
newm(sysmon, nil, -1) // 用於創建新的 M(機器,代表一個操作系統線程)。
})
}
...
}
sysmon 每20us~10ms啓動一次,大體工作:
- 釋放閒置超過5分鐘的span物理內存;
- 如果超過2分鐘沒有垃圾回收,強制執行;
- 將長時間未處理的netpoll結果添加到任務隊列;
- 向長時間運行的G任務發出搶佔調度;
- 收回因syscall長時間阻塞的P。
具體來説,以下情況會觸發搶佔式調度:
- 系統調用:當一個 goroutine 執行系統調用時,調度器會將該 goroutine 暫停,並將處理器分配給其他可運行的 goroutine。一旦系統調用完成,被暫停的 goroutine 可以繼續執行。
- 函數調用:當一個 goroutine 調用一個阻塞的函數(如通道的發送和接收操作、鎖的加鎖和解鎖操作等)時,調度器會將該 goroutine 暫停,並將處理器分配給其他可運行的 goroutine。一旦被阻塞的函數可以繼續執行,被暫停的 goroutine 可以繼續執行。
- 時間片耗盡:每個 goroutine 在運行一段時間後都會消耗一個時間片。當時間片耗盡時,調度器會將當前正在運行的 goroutine 暫停,並將處理器分配給其他可運行的 goroutine。被暫停的 goroutine 將會被放入到就緒隊列中,等待下一次調度。
GO併發模型
Go 使用 CSP(Communicating Sequential Processes,通信順序進程)併發編程模型,該模型由計算機科學家 Tony Hoare 在 1978 年提出。
在Go中,針對CSP模型提供了三種併發原語:
- goroutine:對應CSP模型中的P(原意是進程,在這裏也就是goroutine),封裝了數據的處理邏輯,是Go運行時調度的基本執行單元。
- channel:對應CSP模型中的輸入/輸出原語,用於goroutine之間的通信和同步。
- select:用於應對多路輸入/輸出,可以讓goroutine同時協調處理多個channel操作。
Go 奉行“不要通過共享內存來通信,而應通過通信來共享內存。”,也就是推薦通過channel來傳遞值,讓goroutine相互通訊協作。
channel 分為無緩衝和有緩衝,使用通道時遵循以下規範:
- 在無緩衝通道上,每一次發送操作都有對應匹配的接收操作。
- 對於從無緩衝通道進行的接收,發生在對該通道進行的發送完成之前。
- 對於帶緩衝的通道(緩存大小為C),通道中的第K個接收完成操作發生在第K+C個發送操作完成之前。
- 如果將C=0就是無緩衝的通道,也就是第K個接收完成在第K個發送完成之前。
func sender(ch chan<- int, done chan<- bool) {
fmt.Println("Sending...")
ch <- 42 // 發送數據到無緩衝通道
fmt.Println("Sent")
done <- true // 發送完成信號
}
func receiver(ch <-chan int, done <-chan bool) {
<-done // 等待發送操作完成信號
fmt.Println("Receiving...")
val := <-ch // 從無緩衝通道接收數據
fmt.Println("Received:", val)
}
func main() {
ch := make(chan int) // 創建無緩衝通道
done := make(chan bool) // 用於發送操作完成信號
go sender(ch, done) // 啓動發送goroutine
go receiver(ch, done) // 啓動接收goroutine
time.Sleep(2 * time.Second) // 等待一段時間以觀察結果
}
有緩衝通道
func sender(ch chan<- int) {
for i := 0; i < 5; i++ {
fmt.Println("Sending:", i)
ch <- i // 發送數據到通道
fmt.Println("Sent:", i)
}
close(ch)
}
func receiver(ch <-chan int) {
for {
val, ok := <-ch // 從通道接收數據
if !ok {
fmt.Println("Channel closed")
return
}
fmt.Println("Received:", val)
time.Sleep(1 * time.Second) // 模擬接收操作耗時
}
}
func main() {
ch := make(chan int, 2) // 創建帶緩衝大小為2的通道
go sender(ch) // 啓動發送goroutine
go receiver(ch) // 啓動接收goroutine
time.Sleep(10 * time.Second) // 等待一段時間以觀察結果
}
Go併發場景
並行計算
利用goroutine併發執行任務,加速計算過程。
// calculateSquare 是一個計算數字平方的函數,它模擬了一個耗時的計算過程。
func calculateSquare(num int, resultChan chan<- int) {
time.Sleep(1 * time.Second) // 模擬耗時計算
resultChan <- num * num
}
func main() {
nums := []int{1, 2, 3, 4, 5}
resultChan := make(chan int)
// 啓動多個goroutine併發計算數字的平方
for _, num := range nums {
go calculateSquare(num, resultChan)
}
// 從通道中接收計算結果並打印
for range nums {
result := <-resultChan
fmt.Println("Square:", result)
}
close(resultChan)
}
IO密集型任務
在處理IO密集型任務時,可以使用goroutine和channel實現併發讀寫操作,提高IO效率。
// fetchURL 函數用於獲取指定URL的內容,並將結果發送到通道resultChan中。
func fetchURL(url string, resultChan chan<- string) {
resp, err := http.Get(url)
if err != nil {
resultChan <- fmt.Sprintf("Error fetching %s: %s", url, err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
resultChan <- fmt.Sprintf("Error reading response from %s: %s", url, err)
return
}
resultChan <- string(body)
}
func main() {
urls := []string{"https://example.com", "https://example.org", "https://example.net"}
resultChan := make(chan string)
// 啓動多個goroutine併發獲取URL的內容
for _, url := range urls {
go fetchURL(url, resultChan)
}
// 從通道中接收結果並打印
for range urls {
result := <-resultChan
fmt.Println("Response:", result)
}
close(resultChan)
}
併發數據處理
對於需要同時處理多個數據流的情況,可以使用goroutine和channel實現併發數據處理,例如數據流的合併、拆分、過濾等操作。
// processData 函數用於處理從dataStream中接收的數據,並將處理結果發送到resultChan中。
func processData(dataStream <-chan int, resultChan chan<- int) {
for num := range dataStream {
resultChan <- num * 2 // 假設處理數據是將數據乘以2
}
}
func main() {
dataStream := make(chan int)
resultChan := make(chan int)
// 產生數據併發送到dataStream中
go func() {
for i := 1; i <= 5; i++ {
dataStream <- i
}
close(dataStream)
}()
// 啓動goroutine併發處理數據
go processData(dataStream, resultChan)
// 從通道中接收處理結果並打印
for range dataStream {
result := <-resultChan
fmt.Println("Processed Data:", result)
}
close(resultChan)
}
併發網絡編程
編寫網絡服務器或客户端時,可以利用goroutine處理每個連接,實現高併發的網絡應用。
// handler 是一個HTTP請求處理函數,它會向客户端發送"Hello, World!"的響應。
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, World!")
}
func main() {
// 註冊HTTP請求處理函數
http.HandleFunc("/", handler)
// 啓動HTTP服務器並監聽端口8080
go http.ListenAndServe(":8080", nil)
fmt.Println("Server started on port 8080")
// 使用select{}使主goroutine保持運行狀態,以便HTTP服務器能夠處理請求
select {}
}
定時任務和週期性任務
// task 是一個需要定時執行的任務函數。
func task() {
fmt.Println("Task executed at:", time.Now())
}
func main() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
// 循環等待定時器的觸發並執行任務
for {
select {
case <-ticker.C:
task()
}
}
}
工作池
通過創建一組goroutine來處理任務池中的任務,可以有效地控制併發數量,適用於需要限制併發的情況。
// worker 是一個工作函數,它會從jobs通道中接收任務,並將處理結果發送到results通道中。
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job)
time.Sleep(1 * time.Second) // 模擬工作時間
fmt.Printf("Worker %d finished job %d\n", id, job)
results <- job * 2 // 假設工作的結果是輸入的兩倍
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs) // 緩衝channel用於發送任務
results := make(chan int, numJobs) // 用於接收任務結果
// 啓動多個worker goroutine
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// 發送任務到jobs channel
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 關閉jobs channel
// 等待所有worker完成並收集結果
go func() {
wg.Wait()
close(results)
}()
// 從通道中接收處理結果並打印
for result := range results {
fmt.Println("Result:", result)
}
}