動態

詳情 返回 返回

Workflow通用併發控制組件:ResourcePool資源池 - 動態 詳情

開源項目Workflow是C++異步調度的高性能框架,廣泛用於高吞吐低延遲的網絡服務器、並行計算和組裝複雜網絡請求的客户端等領域。在異步調度的編程範式下,想要實現併發控制是非常困難的,因為一旦無法做到無阻塞的調度,那麼框架性能就會大打折扣。

線上非常常見的場景是:異步服務器需要限制用户的併發,從而保護有限的後端資源比如GPU計算,並在超載時可以立刻拒絕用户或者實施排隊等待的處理策略。

一個好的併發控制組件,應該是因框架制宜,實現上能夠做到完全非阻塞,而語義上能做到足夠簡單、抽象、通用。因此在這裏介紹一下C++ Workflow項目中的ResourcePool資源池,歡迎正在使用Workflow的小夥伴嘗試,相信可以讓你的代碼簡化不少,也歡迎有類似場景的開發者們參考與交流~~~

https://github.com/sogou/workflow

一、信號量Semaphore

信號量(Semaphore)小夥伴們都知道,它由大名鼎鼎的Dijkstra發明(就是那個也發明與自己同名的最短路算法的計算機科學家)~ 這是一個用於併發和同步場景下的抽象概念:假設對任何對象我們都想要控制訪問它的併發度為n,那麼定義出兩個簡單的操作就可以做到。

  • P操作:將n減1
  • V操作:將n加1

結合定義,我們不難思考出這兩個操作的底層邏輯:

  1. 我們想操作任何一個資源的時候,就可以執行P;用完資源想還回去,可以執行V;
  2. 資源不是無限的,執行P如果n減到要小於0了,説明資源不夠,那你就等着;
  3. 執行V把資源歸還,意味着有人在等的時候,你可以叫醒這個人;

這個抽象的語義,具體到每個系統中的實現是不一樣的。

雖然Workflow構思資源池的時候並不是奔着實現信號量的語義去做的,但是基於任務流的語義想要解決併發控制的問題時,會發現與信號量的概念殊途同歸。

二、資源池ResourcePool

瞭解了語義和要解決的問題,那麼我們看看資源池的接口,進一步擁有更具體的理解:

class WFResourcePool
{
public:
    WFConditional *get(SubTask *task, void **resbuf);
    WFConditional *get(SubTask *task);
    void post(void *res);
    ...

protected:
    virtual void *pop()
    {
        return this->data.res[this->data.index++];
    }

    virtual void push(void *res)
    {
        this->data.res[--this->data.index] = res;
    }
    ...

public:
    WFResourcePool(void *const *res, size_t n);
    WFResourcePool(size_t n);
    ...
};

上述代碼個人認為只需要劃三個重點:構造函數、get()和post()。

構造函數有兩個可選,簡單版可以只傳一個n。如果對於資源池有傳遞資源的需求,比如不知,那麼還可以傳入一個長度為n資源數組,數組每個元素為一個void *,內部會再分配一份相同大小的內存,把數組複製走。

因此相對上面兩種情況,get()函數也有兩個。get()等價於上述的P操作,用於拿一個訪問資源的資格,如果使用第二個接口,那麼是可以通過一個void **resbuf獲得具體的資源。

post()函數用於資源使用完畢想歸還的時候,也就是V操作,這裏post()的res參數無需與get()得到res的一致。

三、與任務流結合:WFConditional

資源池的實現本身是很簡單的,巧妙之處是它如何與Workflow當前的異步任務結合。答案就在上述get()函數的返回值,我們需要引入一層條件任務:WFConditional。

先回顧一下,Workflow中一個異步任務可以這樣原地發射:

auto *task = WFTaskFactory::create_xxx_task(x, x, callback);
task->start();

也可以扔到一個任務流中,待前序邏輯完成後執行,以實現任務編排和調度:

SeriesWork *series; // 假設這一個現成的任務流。一般來自於我們自己創建,或者其他任務的callback中拿到
series->push_back(task);

在Workflow的實現中,它們的調度都不會卡住任何線程。

那麼,現在加了一個資源池的約束,也就意味着:一個任務的發起,應當是'當前流程允許執行'並且'從資源池中拿到資格'才能真正得到調度。

WFConditional條件任務就是這個中間層:我們通過get()接口,把任務是否能通過資源池獲得資格這個工作,交給了WFConditional,而WFConditional替代了這個任務本身被start或者放到任務流中。

用一個併發為1的小spider,展示一下常用的使用方式:

WFResourcePool pool(1); // 0. 構造資源池,表示併發只允許1

// 1. 構造一個http任務。在它的callback中,我們把資源池的資源歸還
WFHttpTask *t = WFTaskFactory::create_http_task(..., [](void *){pool.post(nullptr);});

// 2. 構造一個條件任務,它會在真正dispatch的時候才去嘗試獲取資源
WFConditional *c = pool.get(t, &t->user_data);  // 用user_data來保存res是一種實用方法。

// 3. 接下來用這個條件任務,替代http任務去和任務流結合
c->start(); // or series->push_back(c);

四、更多...

回到剛開始的場景,可以參考這個issue,這個也是內部用户諮詢得最多的需求之一:《如何根據用户配置去限制server的qps》,🔗 https://github.com/sogou/workflow/issues/1319

當然我們實現一個server的時候,實際上是要控制用户請求的併發而非QPS,因為QPS是由資源處理能力附加了時間維度而得出的,核心指標應該是同時能處理的併發數。

細心的小夥伴還會發現,資源池有兩個protected的接口,這是供聰明的你派生使用的。比如上述的post要叫醒最早在排隊的那個人以實現先來先服務、還是叫醒最晚等待的那個人來減少超時用户的數量,這都是可以通過派生資源池來實現,而資源池本身只是一個非常通用的組件。

最近學習其他領域的知識也越來越發現,計算機中的語義無論是在操作系統還是框架還是算法層實現,都是相通的。也就是説如果對計算機底層邏輯能做到融會貫通,那麼無論投身哪個細分領域都可以有新發現新貢獻。雖然博主已經墮落到了Q更,最近才有空寫一下資源池的介紹,但其實WFResourcePool的推出已經有2年了,本人單方面宣佈它有可能是Workflow在開源之後才增加的組件中最重要的一個。非常感謝開發者們的支持,期待大家對更多場景的交流和共建,一起打造更多有趣的組件,發現更多通用的方案~

user avatar ZhongQianwen 頭像 ospo 頭像 donnytab 頭像 starrocks 頭像 muzijun_68c14af5563a2 頭像 dolphinscheduler 頭像 puxiaoke6 頭像 metaxk 頭像 meituanjishutuandui 頭像 yunzhihuijishushequ 頭像 wunima 頭像 liu_chen 頭像
點贊 26 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.