redigo連接池的源碼分析
今天我們來看一看redigo(https://github.com/gomodule/redigo)是如何實現連接池的。
概述
連接池部分的代碼在redis/pool.go中,相關結構體和接口的UML圖如下圖所示
Pool結構體定義了連接池的屬性和行為,包括以下主要參數:
Dial func() (Conn, error):指向用於新建連接的函數,由redigo的用户指定MaxIdle int:最大空閒連接數MaxActive int:連接池的容量,即連接池中最多可以包含多少個連接,包括正在使用的連接和空閒連接IdleTimeout time.Duration:空閒連接的最大空閒時間Get() Conn:從連接池獲取連接
另外,idleList是一個由空閒連接(類型為*poolConn)構成的雙向鏈表。pushFront()、popFront()和popBack()這3個函數分別用於,通過將剛剛使用過的連接插入到鏈表頭部來將其放回連接池;從鏈表頭部取出空閒連接;從鏈表尾部刪除長時間沒有使用的空閒連接。
type idleList struct {
count int
front, back *poolConn
}
實現連接池時,需要考慮以下幾個問題
-
何時新建連接?
- 若新建連接時發現已創建的連接數到達連接池的容量上限,該如何處理?
- 如何回收空閒時間過長的連接?
- 如何確保連接池中的連接依然存活?
下面就帶着這幾個問題,重點梳理一下從連接池中獲取連接的func (p *Pool) Get() Conn方法和將連接放回連接池的func (ac *activeConn) Close()方法。
問題1:如何回收空閒時間過長的連接?
先來梳理func (p *Pool) Get() Conn方法的邏輯。
func (p *Pool) Get() Conn {
// GetContext returns errorConn in the first argument when an error occurs.
c, _ := p.GetContext(context.Background())
return c
}
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// Wait until there is a vacant connection in the pool.
waited, err := p.waitVacantConn(ctx)
if err != nil {
return errorConn{err}, err
}
// ...
Get()會返回兩種類型的連接,activeConn和errorConn,這兩種類型都實現了Conn接口。
這裏採用了稱為Null Object或Special Case的設計模式,即使獲取連接時發生錯誤,也不會產生nil,而是返回一個異常的連接。只不過在異常連接上的絕大多數操作都會返回錯誤。這樣設計的好處一是避免了空指針異常,二是延後了錯誤處理的時機,或者説減少了一處需要檢查錯誤的位置,redigo的用户可以認為Get()總會返回“有效的”連接,而在錯誤檢查時,只需重點檢查Do()等方法的返回值。
Get()調用了GetContext(),而後者又調用了waitVacantConn()。waitVacantConn()有兩條執行路徑,我們先來看最簡單的一條——若沒有開啓等待模式p.Wait == false或者沒有設置最大連接數(連接池的容量),就直接返回。p.Wait == true時的邏輯將在後面介紹。
func (p *Pool) waitVacantConn(ctx context.Context) (waited time.Duration, err error) {
if !p.Wait || p.MaxActive <= 0 {
// No wait or no connection limit.
return 0, nil
}
// ...
現在,關注點又回到GetContext()方法裏了,
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// ...
p.mu.Lock()
if waited > 0 {
// ...
}
// Prune stale connections at the back of the idle list.
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back // ①
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
這部分代碼回答了有關連接池的一個問題——如何回收空閒時間過長的連接?
redigo的實現方法是獲取連接時順帶回收空閒時間過長的連接。①p.idle.back(類型為*poolConn)是指向空閒連接的雙向鏈表尾部的指針,所指向的空閒連接的t字段記錄了該連接最後一次使用的時間。如果t加上連接池參數p.IdleTimeout(最大空閒時間)在當前時間nowFunc()之前(類比食品的保質期在當前時間之前),就從雙向鏈表p.idle中刪除該連接後關閉。
由於這部分代碼可能會被多個goroutine併發執行,所以在回收(=從鏈表中刪除)空閒連接時,以及p.active計數器--時,都需要通過p.mu.Lock()加鎖。redigo在這裏還儘可能縮小了鎖的範圍:
p.mu.Lock()
// for ...
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
// ...
// }
問題2:如何確保連接池中的連接依然存活?
回收完空閒時間過長的連接後,就可以遍歷空閒連接的鏈表,從中獲取可用的空閒連接了。這部分代碼同樣可能會被多個goroutine併發執行,所以依然需要互斥鎖p.mu的保護。
p.mu.Lock()
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
// return an `activeConn` or check next idle connection
// ...
}
activeConn的結構如下
type activeConn struct {
p *Pool
pc *poolConn
state int
}
之所以要確保空閒連接依然存活,是因為空閒連接雖然存在,但可能已經是失效的連接了。那麼什麼時候會出現這種情況呢?
在Redis的配置中,有一項叫做timeout,默認為0。
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0
如果該選項的值不為0,且小於redigo連接池的配置項MaxIdle的值會發生什麼呢?我們不妨測試一下
$ fgrep timeout -B2 /usr/local/etc/redis.conf
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 5
--
$ # 重啓redis
$ # brew services restart redis
func main() {
pool := &redis.Pool{
MaxActive: 1,
MaxIdle: 1,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", "127.0.0.1:6379")
},
}
c := pool.Get()
reply, err := c.Do("PING")
if err != nil {
fmt.Println(reply, err)
}
c.Close() // return to pool
time.Sleep(20 * time.Second)
c = pool.Get()
reply, err = c.Do("PING")
if err != nil {
fmt.Println(reply, err) // <nil> EOF
}
}
通過Wireshark抓包,就很容易解釋為什麼第二次c.Do("PING")報錯了,
可以看到在9.40秒時,Redis關閉了與客户端之間的TCP連接。而在23.54秒左右(相對於第一次PING時的3.53秒,經歷了20秒,就是time.Sleep(20 * time.Second)睡眠的時間),redigo在已關閉的空閒連接上發送PING,Redis直接通過RST標誌斷開了連接。
這就是空閒連接雖然存在,但已經失效的情況。
為了避免這種情況,我們不但可以根據Redis的timeout的配置,調整連接池IdleTimeout time.Duration的值,還可以在創建連接池時指定TestOnBorrow函數,例如
// pool := &redis.Pool{
// // Other pool configuration not shown in this example.
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// if time.Since(t) < time.Minute {
// return nil
// }
// _, err := c.Do("PING")
// return err
// },
// }
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
// ...
return &activeConn{p: p, pc: pc}, nil
}
pc.c.Close() // ①
p.mu.Lock()
p.active--
可以看到,當p.TestOnBorrow檢測失敗時,①空閒連接就會因無效而被關閉,避免了後續在已被Redis關閉的TCP連接上發送請求的問題。
問題3:新建連接的問題
如果空閒連接的鏈表為空,或者鏈表中沒有存活着的可用連接,就不得不新建連接了。
新建連接很簡單,只需要調用dial()函數,
p.mu.Lock()
// ...
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// ...
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
dial()的實現如下,僅僅是調用了創建連接池時指定的新建連接的(Dial成員指向的)函數
func (p *Pool) dial(ctx context.Context) (Conn, error) {
// ...
if p.Dial != nil {
return p.Dial()
}
// ...
}
但新建時需要考慮,當已創建的連接數已達到連接池的容量上限時要如何處理。
我們先來看redigo中最簡單的一種處理方法,
// Handle limit for p.Wait == false.
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return errorConn{ErrPoolExhausted}, ErrPoolExhausted
}
此時,p.Wait == false,且已創建的連接數達到了連接池的容量上限(p.active >= p.MaxActive),於是redigo直接返回了表示錯誤的連接return errorConn{}。
當p.Wait == true時的處理方式稍微複雜一些,簡單來説就是,當已創建的連接數達到了連接池的容量上限時,通過Pool結構體上的ch
type Pool struct {
// ...
ch chan struct{} // limits open connections when p.Wait is true
讓獲取連接的goroutine進入等待狀態。
select {
case <-p.ch:
// ...
case <-ctx.Done():
return 0, ctx.Err()
}
p.ch有點類似令牌桶,只要桶裏還有令牌,就不會阻塞。初始化是在lazyInit()函數中完成的,桶中初始有p.MaxActive個令牌。
func (p *Pool) lazyInit() {
p.initOnce.Do(func() {
p.ch = make(chan struct{}, p.MaxActive)
// ...
for i := 0; i < p.MaxActive; i++ {
p.ch <- struct{}{}
}
}
})
}
將連接放回連接池
最後再來看一看將連接放回連接池的過程。
釋放連接是通過用户調用func (ac *activeConn) Close() (err error) {實現的。該方法最終會調用
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc() // ①
p.idle.pushFront(pc) // ②
if p.idle.count > p.MaxIdle { // ┐
pc = p.idle.back // │- ③
p.idle.popBack() // ┘
} else {
pc = nil
}
}
if pc != nil { // ┐
p.mu.Unlock() // │
pc.c.Close() // │- ③
p.mu.Lock() // │
p.active-- // │
} // ┘
// ...
p.mu.Unlock()
return nil
}
put()的主流程很簡單
- ①更新連接的最後一次使用時間為當前時間
- ②將連接插入到空閒連接鏈表的頭部
- ③如果當前的空閒連接數(已算上剛剛插入到鏈表頭部的空閒連接)已超過了
MaxIdle,則將空閒鏈表尾部的連接從鏈表刪除後關閉
與從連接池中獲取連接一樣,這部分代碼同樣可能會被多個goroutine併發執行,所以依然需要互斥鎖p.mu的保護。
至此,我們就梳理完成了redigo中連接池部分的源代碼了。
附
描述redigo的UML類圖的代碼
@startuml
interface Conn {
}
struct Pool {
Dial func
MaxIdle int
MaxActive int
**idle idleList**
IdleTimeout time.Duration
**Get() Conn**
}
struct idleList {
count int
front *poolConn
back *poolConn
pushFront(pc *poolConn)
popFront()
popBack()
}
struct poolConn {}
struct activeConn {
p *Pool
pc *poolConn
**Close() error**
Do(cmd string, args ...any) (reply any, err error)
}
idleList "1" *-- "many" poolConn : contains
idleList --* Pool
Pool --* activeConn
poolConn --* activeConn
activeConn ..|> Conn
poolConn ..|> Conn
@enduml