動態

詳情 返回 返回

C++-一篇文章入門coroutines協程 - 動態 詳情

前言

最近學習了一下C++協程,這篇文章將介紹協程的相關概念,以及在C++中如何使用協程。

什麼是協程

C++中,協程(coroutines)可以理解為一個可以暫停和恢復執行的函數。什麼意思呢?例如有以下協程函數:

Task taskFunc()
{
    ...
    co_await doSomething();  // 1
    doSomething2();             // 2
}

int main() {
    auto task = taskFunc(); // 3
    ...                        // 4
    task.resume();            // 5
}

3處調用協程函數,在執行到1的時候(例如doSomething()異步請求網絡資源),函數暫停執行,代碼走到4處,等後面某個合適的時機5,恢復執行協程函數,函數從2繼續執行,對數據進行處理,執行順序是3->1->4->5->2,在這個過程中,執行的線程始終是同一個(除非有意讓協程在不同線程中切換執行)。

協程實現原理

C++中,協程是通過編譯器來實現的,編譯器根據代碼中的co_return/co_await關鍵字,識別一個函數為協程函數,協程函數的返回值類型有如下要求:返回值是一個類,類中包含一個名稱為promise_type的類型(必須是這個類名),promise_type必須實現幾個函數:

  • get_return_object
  • initial_suspend
  • final_suspend
  • unhandled_exception
  • return_void或者return_value
    通常來説,這個外層的類會有一個std::coroutine_handle\<promise_type\> handle成員保存協程的句柄,用於控制協程和獲取協程的相關數據。編譯器會將協程狀態和協程中的數據保存在堆上,並生成執行協程所需的代碼。

    C++協程的最小例子

    #include <iostream>
    #include <coroutine>
    
    template <bool READY>
    struct Awaiter {
      bool await_ready() noexcept {
          std::cout << "await_ready: " << READY << std::endl;
          return READY;
      }
      void await_resume() noexcept {
          std::cout << "await_resume" << std::endl;
      }
      void await_suspend(std::coroutine_handle<>) noexcept {
          std::cout << "await_suspend" << std::endl;
      }
    };
    
    struct TaskPromise {
      struct promise_type {
          TaskPromise get_return_object() {
              std::cout << "get_return_object" << std::endl;
              return TaskPromise{std::coroutine_handle<promise_type>::from_promise(*this)};
          }
          Awaiter<true> initial_suspend() noexcept {
              std::cout << "initial_suspend" << std::endl;
              return {};
          }
          Awaiter<true> final_suspend() noexcept {
              std::cout << "final_suspend" << std::endl;
              return {};
          }
          void unhandled_exception() {
              std::cout << "unhandled_exception" << std::endl;
          }
          void return_void() noexcept {
              std::cout << "return_void" << std::endl;
          }
      };
    
      void resume() {
          std::cout << "resume" << std::endl;
          handle.resume();
      }
    
      std::coroutine_handle<promise_type> handle;
    };
    
    TaskPromise task_func() {
      std::cout << "task first run" << std::endl;
      co_await Awaiter<false>{};
      std::cout << "task resume" << std::endl;
    }
    
    int main() {
      auto promise = task_func();  // 1
      promise.resume();
    
      return 0;
    }

接下來我們一步一步解析協程的執行過程:

1

首先task_func就是如前所述的協程函數,因為它包含了一個co_await關鍵字,返回值是TaskPromise類型,其包含了一個promist_type類型,實現了一些必須的函數,滿足協程的所有要求。

當一個協程被調用時,編譯器不會像普通函數一樣在棧上分配空間。相反,它會在堆上動態分配一塊內存,這塊內存被稱為“協程幀”。這個幀裏包含了:

  • 協程的 Promise 對象 (promise_type)
  • 所有按值傳遞的參數的拷貝
  • 所有在 co_await 掛起點之間需要保持狀態的局部變量
  • 協程當前執行到的位置(狀態機的狀態)

總結來説就是,第一次執行協程函數task_func時,編譯器會在堆上創建一個協程幀用來保存協程狀態和局部變量等信息。接着,調用promise_type的get_return_object方法,該函數的返回值必須和協程函數的返回值一致,在task_func暫停/結束執行時,返回值就是get_return_object的返回值。

get_return_object函數的實現通常是返回外層類對象,並使用promise_type對象初始化coroutine_handle:

std::coroutine_handle<promise_type>::from_promise(*this)

std::coroutine_handle可以理解為類似std::thread的類,是一個wrapper,用來控制協程和獲取協程數據的。from_promise將一個promise_type類型轉換為std::coroutine_handle,相應的std::coroutine_handle::promise方法用於獲取handle中的promise_type對象。

2

接着,執行的是initial_suspend,這個函數的返回值是一個類類型,通常稱為awaiter,這個類用於控制接下來要繼續執行協程函數,還是暫停執行,這個類必須實現如下函數:await_ready,await_suspend,await_resume,關於這三個函數的詳細解釋,我們等到co_await時再説。
initial_suspend返回類型可以使用stl提供的兩種awaiter:std::suspend_always和std::suspend_never,分別表示總是掛起或者總是繼續執行。其實現也很簡單:

// STRUCT suspend_always
struct suspend_always {
    _NODISCARD constexpr bool await_ready() const noexcept {
        return false;
    }

    constexpr void await_suspend(coroutine_handle<>) const noexcept {}
    constexpr void await_resume() const noexcept {}
};

這樣就不用自己實現awaiter了,在我們的例子中,我們實現了自己的awaiter類,並且await_ready返回true,此時會立即調用await_resume。

3

接着開始真正執行task_func函數的第一行

std::cout << "task first run" << std::endl;

然後我們遇到了co_await表達式,co_await後面跟着的是awaiter對象,我們詳細的介紹一下awaiter的三個函數:

  • await_ready。沒有參數,返回bool類型,表示接下來應該繼續執行(true)還是暫停執行(false)
  • await_suspend。當await_ready返回false時,表示沒有準備好數據,此時下一個執行的函數就是await_suspend。await_suspend參數是協程句柄類型std::coroutine_handle,這個參數是編譯器幫忙傳入的。await_suspend返回值可以是void,也可以是bool(如果返回false則又會繼續執行協程),甚至可以是其他協程句柄,從而執行其他協程(這是高級話題,我們以後再説)
  • await_resume。當await_ready返回true時,表示已經準備好數據,此時下一個執行的函數就是await_resume。await_resume沒有參數,返回值可以是任意類型,這個返回值會作為co_return表達式的返回值。

在我們的例子中,co_await Awaiter<false>{};會讓協程掛起,於是編譯器將當前表達式生成的awaiter對象保存在協程幀中,然後協程函數task_func返回一個TaskPromise對象(由前面所説的get_return_object構造),回到了main中執行。

4

接着我們調用了TaskPromise::resume,這個函數只做了一件事,就是調用coroutine_handle的resume方法,它會調用之前co_wait掛起時保存的awaiter對象的resume方法,然後繼續執行協程函數:

std::cout << "task resume" << std::endl;

5

最後,協程函數結束執行,我們沒有寫co_return,編譯器會默認補上co_return在最後,co_return會調用promise_type::return_void()函數,表示沒有返回值。如果有返回值,就需要定義一個叫void return_value(T t)的函數,return_value和return_void不能共存。
接着編譯器調用promise_type::final_suspend結束協程,final_suspend類似initial_suspend,如果掛起,協程不會立即銷燬內部的狀態信息,反之則會立即銷燬,因為我們可能還有部分信息存在promise_type對象中,所以在finial_suspend掛起後,則需要手動釋放coroutine_handle的資源,可以採用RAII的方式,在外層類中的析構函數釋放coroutine_handle:

~TaskPromise()
{
    handle.destroy();
}

如果發生異常,則會調用promise::unhandled_exception。

協程等效代碼

綜上,我們可以寫出task_func協程執行過程的偽代碼:

TaskPromise task_func() {
    // No parameters and local variables.
    auto state = new __TaskPromise_state_(); // has TaskPromise::promise_type promise; 
    TaskPromise coro = state.promise.get_return_object();
    try {
        co_await p.inital_suspend();
        std::cout << "task first run" << std::endl;
        co_await Awaiter<false>{};
        std::cout << "task resume" << std::endl;
    } catch (...) {
        state.promise.unhandled_exception();
    }
    co_await state.promise.final_suspend();
}

協程傳值的例子

下面是一個模擬通過協程獲取數據,最終返回在main中取數據的例子


#include <iostream>
#include <coroutine>
#include <future>
#include <thread>

struct TaskPromise {
    struct promise_type {
        TaskPromise get_return_object() {
            std::cout << "get_return_object(), thread_id: " << std::this_thread::get_id() << std::endl;
            return TaskPromise{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {}
        void return_void() noexcept {}
        size_t data = 0;
    };
    std::coroutine_handle<promise_type> handle;
};

struct Awaiter {
    bool await_ready() noexcept {
        std::cout << "await_ready(), thread_id: " << std::this_thread::get_id() << std::endl;
        return false;
    }
    void await_suspend(std::coroutine_handle<TaskPromise::promise_type> handle) noexcept {
        std::cout << "await_suspend(), thread_id: " << std::this_thread::get_id() << std::endl;
        auto thread = std::thread([=]() {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            handle.promise().data = 1;
            handle.resume();
        });
        thread.join();
    }
    void await_resume() noexcept {
        std::cout << "await_resume(), thread_id: " << std::this_thread::get_id() << std::endl;
    }
};

TaskPromise task_func() {
    std::cout << "task_func() step 1, thread_id: " << std::this_thread::get_id() << std::endl;
    co_await Awaiter{};
    std::cout << "task_func() step 2, thread_id: " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::cout << "main(), thread_id: " << std::this_thread::get_id() << std::endl;
    auto promise = task_func();
    std::cout << "main(), data: " << promise.handle.promise().data << ", thread_id: " << std::this_thread::get_id() << std::endl;
    promise.handle.resume();
    std::cout << "main(), data: " << promise.handle.promise().data << ", thread_id: " << std::this_thread::get_id() << std::endl;

    return 0;
}

參考:1. https://mp.weixin.qq.com/s/0njDHtz_SGPkrr4ndAWHaA

Add a new 評論

Some HTML is okay.