动态

详情 返回 返回

Tars-Cpp 協程實現分析 - 动态 详情

作者:vivo 互聯網服務器團隊- Ye Feng

本文介紹了協程的概念,並討論了 Tars Cpp 協程的實現原理和源碼分析。

一、前言

Tars 是 Linux 基金會的開源項目(https://github.com/TarsCloud),它是基於名字服務使用 Tars 協議的高性能 RPC 開發框架,配套一體化的運營管理平台,並通過伸縮調度,實現運維半托管服務。Tars 集可擴展協議編解碼、高性能 RPC 通信框架、名字路由與發現、發佈監控、日誌統計、配置管理等於一體,通過它可以快速用微服務的方式構建自己的穩定可靠的分佈式應用,並實現完整有效的服務治理。

Tars 目前支持 C++,Java,PHP,Nodejs,Go 語言,其中 TarsCpp 3.x 全面啓用對協程的支持,服務框架全面融合協程。本文基於TarsCpp-v3.0.0版本,討論了協程在TarsCpp服務框架的實現。

二、協程的介紹

2.1 什麼是協程

協程的概念最早出現在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協程認為是“可以暫停和恢復執行”的函數。

圖片

協程可以看成一種特殊的函數,相比於函數,協程最大的特點就是支持掛起(yield)和恢復(resume)的能力。如上圖所示:函數不能主動中斷執行流;而協程支持主動掛起,中斷執行流,並在一定時機恢復執行。

協程的作用

  1. 降低併發編碼的複雜度,尤其是異步編程(callback hell)。
  2. 協程在用户態中實現調度,避免了陷入內核,上下文切換開銷小。

2.2 進程、線程和協程

我們可以簡單的認為協程是用户態的線程。協程和線程主要異同:

  1. 相同點:都可以實現上下文切換(保存和恢復執行流)
  2. 不同點:線程的上下文切換在內核實現,切換的時機由內核調度器控制。協程的上下文切換在用户態實現,切換的時機由調用方自身控制。

進程、線程和協程的比較:

2.3 協程的分類

按控制傳遞(Control-transfer)機制分為:對稱(Symmetric)協程和非對稱(Asymmetric)協程。

  • 對稱協程:協程之間相互獨立,調度權(CPU)可以在任意協程之間轉移。協程只有一種控制傳遞操作(yield)。對稱協程一般需要調度器支持,通過調度算法選擇下一個目標協程。
  • 非對稱協程:協程之間存在調用關係,協程讓出的調度權只能返回給調用者。協程有兩種控制操作:恢復(resume)和掛起(yield)。

下圖演示了對稱協程的調度權轉移流程,協程只有一個操作yield,表示讓出CPU,返回給調度器。

下圖演示了非對稱協程的調度權轉移流程。協程可以有兩個操作,即resume和yield。resume表示轉移CPU給被調用者,yield表示被調用者返回CPU給調用者。

根據協程是否有獨立的棧空間,協程分為有棧協程(stackful)和無棧協程(stackless)兩種。

  • 有棧協程:每個協程有獨立的棧空間,保存獨立的上下文(執行棧、寄存器等),協程的喚醒和掛起就是拷貝和切換上下文。優點:協程調度可以嵌套,在內存中的任意位置、任意時刻進行。侷限:協程數目增大,內存開銷增大。
  • 無棧協程:單個線程內所有協程都共享同一個棧空間(共享棧),協程的切換就是簡單的函數調用和返回,無棧協程通常是基於狀態機或閉包來實現。優點:減小內存開銷。侷限:協程調度產生的局部變量都在共享棧上, 一旦新的協程運行後共享棧中的數據就會被覆蓋, 先前協程的局部變量也就不再有效, 進而無法實現參數傳遞、嵌套調用等高級協程交互。

Golang 中的 goroutine、Lua 中的協程都是有棧協程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無棧協程。

三、Tars 協程實現

實現協程的核心有兩點:

  • 實現用户態的上下文切換。
  • 實現協程的調度。

Tars 協程的由下面幾個類實現

  • TC_CoroutineInfo 協程信息類:實現協程的上下文切換。每個協程對應一個 TC_CoroutineInfo 對象,上下文切換基於boost.context實現。
  • TC_CoroutineScheduler 協程調度器類:實現了協程的管理和調度。
  • TC_Coroutine 協程類:繼承於線程類(TC_Thread),方便業務快速使用協程。

Tars 協程有幾個特點:

  • 有棧協程。每個協程都分配了獨立的棧空間。
  • 對稱協程。協程之間相互獨立,由調度器負責調度。
  • 基於 epoll 實現協程調度,和網絡IO無縫結合。

3.1 用户態上下文切換的實現方式

協程可以看成一種特殊的函數,和普通函數不同,協程函數有掛起(yield)和恢復(resume)的能力,即可以中斷自己的執行流,並且在合適的時候恢復執行流,這也稱為上下文切換的能力。

協程執行的過程,依賴兩個關鍵要素:協程棧和寄存器,協程的上下文環境其實就是寄存器和棧的狀態。實現上下文切換的核心就是實現保存並恢復當前執行環境的寄存器狀態的能力。

實現用户態上下文切換一般有以下方式:

3.2  基於boost.context實現上下文切換

Tars 協程是基於 boost.context 實現,boost.context 提供了兩個接口(make_fcontext, jump_fcontext)實現協程的上下文切換。

代碼1:
/**
 * @biref 執行環境上下文
 */
typedef void*   fcontext_t;

/**
 * @biref 事件參數包裝
 */

struct transfer_t {
    fcontext_t     fctx; // 來源的執行上下文。來源的上下文指的是從什麼位置跳轉過來的
    void*      data; // 接口傳入的自定義的指針
};

/**
 * @biref 初始化執行環境上下文
 * @param sp 棧空間地址
 * @param size 棧空間的大小
 * @param fn 入口函數
 * @return 返回初始化完成後的執行環境上下文
 */
extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t));

/**
 * @biref 跳轉到目標上下文
 * @param to 目標上下文
 * @param vp 目標上下文的附加參數,會設置為transfer_t裏的data成員
 * @return 跳轉來源
 */
extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);

(1)make_fcontext 創建協程

  • 接受三個參數,stack 是為協程分配的棧底,stack_size 是棧的大小,fn 是協程的入口函數
  • 返回初始化完成後的執行環境上下文

(2)jump_fcontext 切換協程

  • 接受兩個參數,目標上下文地址和參數指針
  • 返回一個上下文,指向當前上下文從哪個上下文跳轉過來

make_fcontext 和 jump_fcontext 通過彙編代碼實現,具體的彙編代碼可以參考:

  • https://github.com/TarsCloud/TarsCpp/blob/v3.0.0/util/src/asm/jump_x86_64_sysv_elf_gas.S
  • https://github.com/TarsCloud/TarsCpp/blob/v3.0.0/util/src/asm/make_x86_64_sysv_elf_gas.S

boost context 是通過 fcontext_t結構體來保存協程狀態。相對於其它彙編實現的協程庫,boost的context和stack是一起的,棧底指針就是context,切換context就是切換stack。

3.3  Tars協程信息類

TC_CoroutineInfo  協程信息類,包裝了 boost.context 提供的接口,表示一個 TARS 協程。

其中,TC_CoroutineInfo::registerFunc 定義了協程的創建。

代碼2:
void TC_CoroutineInfo::registerFunc(const std::function<void ()>& callback)
{
    _callback           = callback;
    _init_func.coroFunc = TC_CoroutineInfo::corotineProc;
    _init_func.args     = this;

    fcontext_t ctx      = make_fcontext(_stack_ctx.sp, _stack_ctx.size,
                                TC_CoroutineInfo::corotineEntry); // 創建協程
    transfer_t tf       = jump_fcontext(ctx, this); // context 切換

    //實際的ctx
    this->setCtx(tf.fctx);
}



void TC_CoroutineInfo::corotineEntry(transfer_t tf)
{
    TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this
    auto    func  = coro->_init_func.coroFunc;
    void*   args = coro->_init_func.args;

    transfer_t t = jump_fcontext(tf.fctx, NULL);

    //拿到自己的協程堆棧, 當前協程結束以後, 好跳轉到main
    coro->_scheduler->setMainCtx(t.fctx);

    //再跳轉到具體函數
    func(args, t);
}

TC_CoroutineInfo::switchCoro 定義了協程切換。

代碼3:
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to)
{
    //跳轉到to協程
    _currentCoro = to;

    transfer_t t = jump_fcontext(to->getCtx(), NULL);

    //並保存協程堆棧
    to->setCtx(t.fctx);
}

四、 Tars 協程調度器

基於 boost.context 的 TC_CoroutineInfo 類實現了協程的上下文切換,協程的管理和調度,則是由 TC_CoroutineScheduler 協程調度器類來負責,分管理和調度兩個方面來説明 TC_CoroutineScheduler 調度類。

  • 協程管理:目的是需要合理的數據結構來組織協程(TC_CoroutineInfo),方便調度的實現。
  • 協程調度:目的是控制協程的啓動、休眠和喚醒,實現了 yield, sleep 等功能,本質就是實現協程的狀態機,完成協程的狀態切換。Tars 協程分為 5 個狀態:FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
代碼4:
/**
     * 協程的狀態信息
     */
    enum CORO_STATUS
    {
        CORO_FREE       = 0,
        CORO_ACTIVE     = 1,
        CORO_AVAIL      = 2,
        CORO_INACTIVE   = 3,
        CORO_TIMEOUT    = 4 
    };

4.1 Tars 協程的管理

TC_CoroutineScheduler 主要通過以下方法管理協程:

  1. TC_CoroutineScheduler::create() 創建 TC_CoroutineScheduler 對象
  2. TC_CoroutineScheduler::init() 初始化,分配協程棧內存
  3. TC_CoroutineScheduler::run() 啓動調度
  4. TC_CoroutineScheduler::terminate() 停止調度
  5. TC_CoroutineScheduler::destroy() 資源銷燬,釋放協程棧內存

我們可以通過 TC_CoroutineScheduler::init() 看到數據結構的初始化過程。

代碼5:
void TC_CoroutineScheduler::init()
{
    ... ....

    createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1];

    TC_CoroutineInfo::CoroutineHeadInit(&_active);
    TC_CoroutineInfo::CoroutineHeadInit(&_avail);
    TC_CoroutineInfo::CoroutineHeadInit(&_inactive);
    TC_CoroutineInfo::CoroutineHeadInit(&_timeout);
    TC_CoroutineInfo::CoroutineHeadInit(&_free);

    int iSucc = 0;
    for(size_t i = 0; i < _currentSize; ++i)
    {
        //iId=0不使用, 給mainCoro使用!!!!
        uint32_t iId = generateId();
        stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協程棧內存
        TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx);
        _all_coro[iId] = coro;
        TC_CoroutineInfo::CoroutineAddTail(coro, &_free);
        ++iSucc;
    }
    _currentSize = iSucc;

    _mainCoro.setUid(0);
    _mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE);

    _currentCoro = &_mainCoro;
}

通過下面的 TC_CoroutineScheduler 調度類數據結構圖,可以更清楚的看到協程的組織方式:

圖片

Tars調度類數據結構

  • 使用協程之前,需要在協程數組(_all_coro),創建指定數量的協程對象,併為每個協程分配協程棧內存。
  • 通過鏈表的方式管理協程,每個狀態都有一個鏈表。協程狀態切換,對應協程在不同狀態鏈表的轉移。

4.2 Tars 協程的調度

Tars 調度是基於epoll實現,在 epoll 循環裏檢查是否有需要執行的協程, 有則執行之, 沒有則等待在epoll對象上, 直到有喚醒或者超時。使用 epoll 實現的好處是可以和網絡IO無縫粘合, 當有數據發送/接收時, 喚醒epoll對象, 從而完成協程的切換。

Tars 協程調度的核心邏輯是:TC_CoroutineScheduler::run()

代碼6:
void TC_CoroutineScheduler::run()
{
    ... ...

    while(!_epoller->isTerminate())
    {
        if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
        {
            _epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網絡事件
        }

        //喚醒需要激活的協程
        wakeup();

        //喚醒sleep的協程
        wakeupbytimeout();

        //喚醒yield的協程
        wakeupbyself();

        int iLoop = 100;
        //執行active協程, 每次執行100個, 避免佔滿cpu
        while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
        {
            TC_CoroutineInfo *coro = _active._next;
            switchCoro(coro);
            --iLoop;
        }

        //執行available協程, 每次執行1個
        if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail))
        {
            TC_CoroutineInfo *coro = _avail._next;
            switchCoro(coro);
        }
    }

    ... ...
}

下圖可以更清楚得看到協程調度和狀態轉移的過程。

TC_CoroutineScheduler 提供了下面四種方法實現協程的調度:

(1)TC_CoroutineScheduler::go(): 啓動協程。

(2)TC_CoroutineScheduler::yield(): 當前協程放棄繼續執行。並提供了兩種方式,支持不同的喚醒策略。

  • yield(true): 會自動喚醒(等到下次協程調度, 都會再激活當前線程)
  • yield(false): 不再自動喚醒, 除非自己調度該協程(比如put到調度器中)

(3)TC_CoroutineScheduler::sleep(): 當前協程休眠iSleepTime時間(單位:毫秒),然後會被喚醒繼續執行。

(4)TC_CoroutineScheduler::put(): 放入需要喚醒的協程, 將協程放入到調度器中, 馬上會被調度器調度。

五、總結

本文介紹了協程的概念,並討論了 Tars Cpp 協程的實現原理和源碼分析。

TarsCpp 3.x全面啓用對協程的支持,本文的源碼分析是基於TarsCpp-v3.0.0版本

https://github.com/TarsCloud/TarsCpp/tree/release/3.0

user avatar mannayang 头像 u_16776161 头像 xiaoniuhululu 头像 it1042290135 头像 huangxunhui 头像 tangpanqing 头像 jackn 头像 datadowell 头像 phytium_developers 头像 daimajiangxin 头像 innsane 头像 zhaoqianglaoshi 头像
点赞 27 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.