博客 / 詳情

返回

基於 cronet 的單鏈接性能信息收集

背景

公司的一款基於網絡雲盤的產品,需要統計每個鏈接到各個服務器節點的性能,以便後台做更優的調度。常用的性能指標有 DNS 解析耗時、連接耗時、ssl 握手耗時、首分片耗時、總的發送接收字節數、總的請求耗時以及基於它們計算的平均速度等。早先的基於 boost 的版本這些都很好統計,後來該產品底層網絡庫換成 cronet 就不好統計了,我的工作就是基於 cronet 重新收集上述性能信息。

cronet 網絡編程

正式開搞前先簡單看下 cronet 網絡編程範式與之前有何不同:

#include <string>
#include <thread>
#include <iostream>
#include <cronet/cronet_c.h>

// 回調:重定向
void on_redirect_received(Cronet_UrlRequestCallback* callback,
                         Cronet_UrlRequest* request,
                         Cronet_UrlResponseInfo* info,
                         const char* new_location) {
    std::cout << "Redirect to: " << new_location << std::endl;
    Cronet_UrlRequest_FollowRedirect(request);
}


// 回調:響應開始
void on_response_started(Cronet_UrlRequestCallback* callback,
                        Cronet_UrlRequest* request,
                        Cronet_UrlResponseInfo* info) {
    std::cout << "Response started" << std::endl;
    Cronet_Buffer* buffer = Cronet_Buffer_Create();
    Cronet_Buffer_InitWithAlloc(buffer, 4096); // 4KB緩衝區
    Cronet_UrlRequest_Read(request, buffer);
}

// 回調:讀取完成
void on_read_completed(Cronet_UrlRequestCallback* callback,
                      Cronet_UrlRequest* request,
                      Cronet_UrlResponseInfo* info,
                      Cronet_Buffer* buffer,
                      uint64_t bytes_read) {
    // 處理數據
    if (bytes_read > 0) {
        const char* data = static_cast<const char*>(Cronet_Buffer_GetData(buffer));
        std::cout << "Read " << bytes_read << " bytes" << std::endl;
        std::cout << data << std::endl; 
    }
    
    // 釋放當前buffer
    Cronet_Buffer_Destroy(buffer);

    // 繼續讀取(如果還有數據且未完成)
    if (bytes_read > 0) {
        Cronet_Buffer* new_buffer = Cronet_Buffer_Create();
        Cronet_Buffer_InitWithAlloc(new_buffer, 4096);
        Cronet_UrlRequest_Read(request, new_buffer);
    } else {
        std::cout << "Read completed" << std::endl;
    }
}

// 回調:請求成功
void on_succeeded(Cronet_UrlRequestCallback* callback,
                 Cronet_UrlRequest* request,
                 Cronet_UrlResponseInfo* info) {
    std::cout << "Request succeeded" << std::endl;
}

// 回調:請求失敗
void on_failed(Cronet_UrlRequestCallback* callback,
              Cronet_UrlRequest* request,
              Cronet_UrlResponseInfo* info,
              Cronet_Error* error) {
    std::cout << "Request failed" << std::endl;
}

// 回調:取消
void on_canceled(Cronet_UrlRequestCallback* callback,
                Cronet_UrlRequest* request,
                Cronet_UrlResponseInfo* info) {
    std::cout << "Request cancelled" << std::endl;
}

// Executor
void executor_func(Cronet_Executor *executor, Cronet_Runnable *runnable) {
    Cronet_Runnable_Run(runnable);
}

int main() {
    // 1. 創建引擎
    Cronet_EnginePtr engine = Cronet_Engine_Create();
    Cronet_EngineParamsPtr params = Cronet_EngineParams_Create();
    Cronet_Engine_StartWithParams(engine, params);
    
    // 3. 創建回調
    Cronet_UrlRequestCallbackPtr callback = Cronet_UrlRequestCallback_CreateWith(
        on_redirect_received,  // 重定向回調
        on_response_started,
        on_read_completed,
        on_succeeded,
        on_failed,
        on_canceled
    );
    
    // 4. 配置請求
    Cronet_UrlRequestParamsPtr req_params = Cronet_UrlRequestParams_Create();
    Cronet_UrlRequestParams_http_method_set(req_params, "GET");
    
    // 添加請求頭
    Cronet_HttpHeaderPtr header = Cronet_HttpHeader_Create();
    Cronet_HttpHeader_name_set(header, "User-Agent");
    Cronet_HttpHeader_value_set(header, "Cronet-C-Client");
    Cronet_UrlRequestParams_request_headers_add(req_params, header);
    
    // 5. 創建執行器
    Cronet_ExecutorPtr executor = Cronet_Executor_CreateWith(executor_func);
    
    // 6. 創建並啓動請求
    Cronet_UrlRequestPtr request = Cronet_UrlRequest_Create();
    Cronet_UrlRequest_InitWithParams(request, engine, 
                                     "http://httpbin.org/get", 
                                     req_params, callback, executor);
    Cronet_UrlRequest_Start(request);
    
    // 7. 等待請求完成
    std::this_thread::sleep_for(std::chrono::seconds(15));
    
    // 8. 清理資源
    Cronet_UrlRequest_Destroy(request);
    Cronet_HttpHeader_Destroy(header);
    Cronet_UrlRequestParams_Destroy(req_params);
    Cronet_Executor_Destroy(executor);
    Cronet_UrlRequestCallback_Destroy(callback);
    Cronet_EngineParams_Destroy(params);
    Cronet_Engine_Destroy(engine);
    
    return 0;
}

上面是 deepseek 生成的 cronet 基於 C 語言的示例,運行後有以下輸出:

$ ./cronet_conn_stat
Response started
Read 279 bytes
{
  "args": {},
  "headers": {
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "User-Agent": "Cronet-C-Client",
    "X-Amzn-Trace-Id": "Root=1-69425f84-67dbe33a06e303cf4c611b72"
  },
  "origin": "111.108.111.133",
  "url": "http://httpbin.org/get"
}

Request succeeded

與 libcurl 相比,Cronet_UrlRequest_Start 類似 curl_easy_perform 的角色,但變為異步執行,它會立即返回,之後通過回調不斷通知連接上的事件,因此示例中是通過 sleep 15 秒來阻塞主線程的,工程實踐中這個完全可以和消息、事件循環集成在一起,從而提高線程併發能力;與 boost 相比 (特別是基於 boost::asio::ip::tcp 版本的實現),完全不需要主動 async_resolve、async_connect、async_handeshake 以及 async_write,只需要在 on_read_completed 回調中無腦 Cronet_UrlRequest_Read 即可,底層過程 cronet 都幫你包辦了,達到節省心智負擔的目的。

不過這也帶來一個問題,就是之前可以手動打樁計算的各種耗時,現在都看不到了,最多能獲取個首分片耗時和總請求耗時,其中首分片這還包含了解析、連接、ssl 握手時長的耗時,相對失真了都。

cronet 對鏈接性能信息的支持

cronet 其實也有接口統計鏈接層的一些信息,主要通過下面的接口獲取:

 ///////////////////////
// Struct Cronet_Metrics.
CRONET_EXPORT Cronet_MetricsPtr Cronet_Metrics_Create(void);
CRONET_EXPORT void Cronet_Metrics_Destroy(Cronet_MetricsPtr self);
...
// Cronet_Metrics getters.
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_request_start_get(
    const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_dns_start_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_dns_end_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_connect_start_get(
    const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_connect_end_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_ssl_start_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_ssl_end_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_sending_start_get(
    const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_sending_end_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_push_start_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_push_end_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_response_start_get(
    const Cronet_MetricsPtr self);
CRONET_EXPORT
Cronet_DateTimePtr Cronet_Metrics_request_end_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
bool Cronet_Metrics_socket_reused_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
int64_t Cronet_Metrics_sent_byte_count_get(const Cronet_MetricsPtr self);
CRONET_EXPORT
int64_t Cronet_Metrics_received_byte_count_get(const Cronet_MetricsPtr self);

主要是通過 Cronet_Metrics_xxx 的接口獲取,所需的 dns、connect、ssl、request 耗時都有,耗時是通過接口返回的兩個時間做差值得到的,舉例來説:

Cronet_DateTimePtr start = Cronet_Metrics_connect_start(metrics);
Cronet_DateTimePtr end = Cronet_Metrics_connect_end(metrics);
if (start && end) {
    int64_t start_ms = Cronet_DateTime_value_get(start);
    int64_t end_ms = Cronet_DateTime_value_get(end);
    int64_t connect = (start_ms > 0 && end_ms > 0) ? (end_ms - start_ms) : 0;
    // printf("connect elapse %lld\n", connect);
}

注意返回的 Cronet_DateTime 對象到毫秒值,還需要調用一個接口,莫法子,C 語言的接口就是這麼廢柴~

現在的關鍵落到了如何獲取 Cronet_Metrics 對象,發現只有一個接口可以:

// Cronet_RequestFinishedInfo getters.
CRONET_EXPORT
Cronet_MetricsPtr Cronet_RequestFinishedInfo_metrics_get(
    const Cronet_RequestFinishedInfoPtr self);

需要輸入 Cronet_RequestFinishedInfo 對象,這又是個什麼東東,經過一番搜索,發現唯一途徑是通過一個回調:

// The app implements abstract interface Cronet_RequestFinishedInfoListener by
// defining custom functions for each method.
typedef void (*Cronet_RequestFinishedInfoListener_OnRequestFinishedFunc)(
    Cronet_RequestFinishedInfoListenerPtr self,
    Cronet_RequestFinishedInfoPtr request_info,
    Cronet_UrlResponseInfoPtr response_info,
    Cronet_ErrorPtr error);

這個回調又是經由 Cronet_RequestFinishedInfoListener 對象設置的:

///////////////////////
// Abstract interface Cronet_RequestFinishedInfoListener is implemented by the
// app.

// There is no method to create a concrete implementation.

// Destroy an instance of Cronet_RequestFinishedInfoListener.
CRONET_EXPORT void Cronet_RequestFinishedInfoListener_Destroy(
    Cronet_RequestFinishedInfoListenerPtr self);
// Set and get app-specific Cronet_ClientContext.
...
// The app implements abstract interface Cronet_RequestFinishedInfoListener by
// defining custom functions for each method.
typedef void (*Cronet_RequestFinishedInfoListener_OnRequestFinishedFunc)(
    Cronet_RequestFinishedInfoListenerPtr self,
    Cronet_RequestFinishedInfoPtr request_info,
    Cronet_UrlResponseInfoPtr response_info,
    Cronet_ErrorPtr error);
// The app creates an instance of Cronet_RequestFinishedInfoListener by
// providing custom functions for each method.
CRONET_EXPORT Cronet_RequestFinishedInfoListenerPtr
Cronet_RequestFinishedInfoListener_CreateWith(
    Cronet_RequestFinishedInfoListener_OnRequestFinishedFunc
        OnRequestFinishedFunc);

看這個 CreateWith 接口,它的唯一參數就是上面聲明的用户回調。這一系列接口其實是創建了一個偵聽器,之後還需要關聯到引擎才能生效:

void on_request_finished_listener(
    Cronet_RequestFinishedInfoListenerPtr self,
    Cronet_RequestFinishedInfoPtr request_info,
    Cronet_UrlResponseInfoPtr response_info,
    Cronet_ErrorPtr error)
{
}

...

int main() {
    // 1. 創建引擎
    Cronet_EnginePtr engine = Cronet_Engine_Create();
    Cronet_EngineParamsPtr params = Cronet_EngineParams_Create();
    Cronet_Engine_StartWithParams(engine, params);
    ...
    // 5. 創建執行器
    Cronet_ExecutorPtr executor = Cronet_Executor_CreateWith(executor_func);
    ...
    Cronet_RequestFinishedInfoListenerPtr listener = Cronet_RequestFinishedInfoListener_CreateWith(on_request_finished_listener);
    if (listener) {
        Cronet_Engine_AddRequestFinishedListener(engine, listener, executor);
        std::cout << "request finished listener registered" << std::endl;
    }
    else {
        std::cout << "setup request finished listener failed, no connection statistic provided" << std::endl;
    }
    
    ...
        
    // 8. 清理資源
    Cronet_UrlRequest_Destroy(request);
    Cronet_HttpHeader_Destroy(header);
    Cronet_UrlRequestParams_Destroy(req_params);
    
    if (listener) {
        Cronet_Engine_RemoveRequestFinishedListener(engine, listener);
        Cronet_RequestFinishedInfoListener_Destroy(listener);
    }
    
    Cronet_Executor_Destroy(executor);
    Cronet_UrlRequestCallback_Destroy(callback);
    Cronet_EngineParams_Destroy(params);
    Cronet_Engine_Destroy(engine);
    
    return 0;
}

關聯偵聽器的接口如下:

CRONET_EXPORT
void Cronet_Engine_AddRequestFinishedListener(
    Cronet_EnginePtr self,
    Cronet_RequestFinishedInfoListenerPtr listener,
    Cronet_ExecutorPtr executor);
CRONET_EXPORT
void Cronet_Engine_RemoveRequestFinishedListener(
    Cronet_EnginePtr self,
    Cronet_RequestFinishedInfoListenerPtr listener);

這樣整個流程就串起來了:在 Cronet_Engine 初始化時創建並關聯一個 Cronet_RequestFinishedInfoListener 對象,該對象持有一個 Cronet_RequestFinishedInfoListener_OnRequestFinished 類型的用户回調,當連接結束時 cronet 會將性能信息通過該回調通知到用户,用户通過回調的第二個參數 request_info 獲取鏈接性能信息,即 Cronet_RequestFinishedInfo -> Cronet_Metrics,再通過後者的一系列接口獲取感興趣的信息。

從整個流程可以看出:

* 性能信息只有在連接關閉時才能獲取到

* 性能信息並不是關聯到單鏈接 (Cronet_UrlRequest),而是關聯到全局 (Cronet_Engine)

* 可以關聯多個 Listener 對象,但感覺沒什麼必要

性能信息投遞

回到業務層面,每個下載任務包含若干鏈接,在任務結束時 (成功、失敗或取消) 對鏈接信息進行上報,平時這些信息是由鏈接對象管理的,因此需要將位於全局回調的性能信息進行投遞。

用户定義的鏈接對象一般是關聯到 Cronet_UrlRequest,即通過下面的接口:

///////////////////////
// Concrete interface Cronet_UrlRequest.

CRONET_EXPORT void Cronet_UrlRequest_SetClientContext(
    Cronet_UrlRequestPtr self,
    Cronet_ClientContext client_context);
CRONET_EXPORT Cronet_ClientContext
Cronet_UrlRequest_GetClientContext(Cronet_UrlRequestPtr self);

順便插一句,cronet 中各種對象都支持設置用户數據,命名也非常統一: XXX_Get/SetClientContext。

這樣就可以通過 Cronet_UrlRequest 對象找到關聯的鏈接對象,回過頭來再看性能信息的回調:

// The app implements abstract interface Cronet_RequestFinishedInfoListener by
// defining custom functions for each method.
typedef void (*Cronet_RequestFinishedInfoListener_OnRequestFinishedFunc)(
    Cronet_RequestFinishedInfoListenerPtr self,
    Cronet_RequestFinishedInfoPtr request_info,
    Cronet_UrlResponseInfoPtr response_info,
    Cronet_ErrorPtr error);

這裏提供的不是 Cronet_UrlRequest 而是 Cronet_UrlResponseInfo,兩者對不上,於是問題演化為如何通過 Cronet_UrlResponseInfo 找到 Cronet_UrlRequest。

梳理 cronet 請求生命週期:

 

image

一般有這麼幾條路徑:

* onResponseStarted -> onReadCompleted -> onSucceeded

* onCanceled / onResponseStarted -> onCanceled / onResponseStarted -> onReadCompleted -> onCanceled

* onFailed / onResponseStarted -> onReadCompleted -> onFailed

302 重定向就不單獨列出了,可以在 follow 和 cancel 中選一種繼續。結合相關的回調函數原型觀察:

// The app implements abstract interface Cronet_UrlRequestCallback by defining
// custom functions for each method.
typedef void (*Cronet_UrlRequestCallback_OnRedirectReceivedFunc)(
    Cronet_UrlRequestCallbackPtr self,
    Cronet_UrlRequestPtr request,
    Cronet_UrlResponseInfoPtr info,
    Cronet_String new_location_url);
typedef void (*Cronet_UrlRequestCallback_OnResponseStartedFunc)(
    Cronet_UrlRequestCallbackPtr self,
    Cronet_UrlRequestPtr request,
    Cronet_UrlResponseInfoPtr info);
typedef void (*Cronet_UrlRequestCallback_OnReadCompletedFunc)(
    Cronet_UrlRequestCallbackPtr self,
    Cronet_UrlRequestPtr request,
    Cronet_UrlResponseInfoPtr info,
    Cronet_BufferPtr buffer,
    uint64_t bytes_read);
typedef void (*Cronet_UrlRequestCallback_OnSucceededFunc)(
    Cronet_UrlRequestCallbackPtr self,
    Cronet_UrlRequestPtr request,
    Cronet_UrlResponseInfoPtr info);
typedef void (*Cronet_UrlRequestCallback_OnFailedFunc)(
    Cronet_UrlRequestCallbackPtr self,
    Cronet_UrlRequestPtr request,
    Cronet_UrlResponseInfoPtr info,
    Cronet_ErrorPtr error);
typedef void (*Cronet_UrlRequestCallback_OnCanceledFunc)(
    Cronet_UrlRequestCallbackPtr self,
    Cronet_UrlRequestPtr request,
    Cronet_UrlResponseInfoPtr info);

發現它們都提供 Cronet_UrlRequest & Cronet_UrlResponseInfo 兩個對象,於是一個大膽的想法誕生了:在所有回調中建立二者的映射關係,最終在偵聽器回調中再通過 Cronet_UrlResponseInfo 反查 Cronet_UrlRequest !

這個想法是可行的,特別是 Cronet_RequestFinishedInfoListener_OnRequestFinishedFunc 保證在上述所有回調之後被調用。

整合在一起

假設上述關係通過全局 rr_map 變量映射在一起,那麼最終的 listener 回調可以這樣實現:

extern on_request_finished(Cronet_ClientContext obj, int64_t connect); 
void on_request_finished_listener(
    Cronet_RequestFinishedInfoListenerPtr self,
    Cronet_RequestFinishedInfoPtr request_info,
    Cronet_UrlResponseInfoPtr response_info,
    Cronet_ErrorPtr error)
{
    int64_t connect = 0; 
    Cronet_MetricsPtr metrics = Cronet_RequestFinishedInfo_metrics(request_info); 
    if (metrics) {
        Cronet_DateTimePtr start = Cronet_Metrics_connect_start(metrics);
        Cronet_DateTimePtr end = Cronet_Metrics_connect_end(metrics);
        if (start && end) {
            int64_t start_ms = Cronet::instance()->Cronet_DateTime_value_get(start);
            int64_t end_ms = Cronet::instance()->Cronet_DateTime_value_get(end);
            connect = (start_ms > 0 && end_ms > 0) ? (end_ms - start_ms) : 0;
        }
    }

    auto it = rr_map.find(response_info); 
    if (it != rr_map.end()) { 
        Cronet_UrlRequestPtr req = it->second;
        Cronet_ClientContext obj = Cronet_UrlRequest_GetClientContext(req); 
        if (obj) {
            on_request_finished(obj, connect);
        }
    }
}

其中 on_request_finished 是用户實現的回調,它的兩個參數 obj 和 connect 分別是用户定義的鏈接層對象與連接耗時。其它的像 dns 耗時、ssl 握手耗時、首分片耗時都可以如法泡製,這裏就不一一贅述了。

下面是整合後的示意圖:

image

其中演示了 Cronet_UrlResponseInfo 與 Cronet_UrlRequestInfo 之間建立映射的過程,以及整個過程涉及的主要 Cronet 類型和回調。

後記

功能上線後,確實可用,解決了之前 cronet 收集不到鏈接性能數據的問題

image

其中 connect (xx) 標識的即為連接耗時。

本文完整的 demo 可參考 github 上的 cronet_conn_stat 項目,支持在 mac & windows 上進行驗證。

下面是 demo 的一個典型輸出:

$ ./cronet_conn_stat
request finished listener registered
Response started
Read 279 bytes
{
  "args": {},
  "headers": {
    "Accept-Encoding": "gzip, deflate",
    "Host": "httpbin.org",
    "User-Agent": "Cronet-C-Client",
    "X-Amzn-Trace-Id": "Root=1-694268b2-27badad81ccc4cdb11ccc5f3"
  },
  "origin": "111.108.111.133",
  "url": "http://httpbin.org/get"
}

request finished listen
request finish, connect elapse 346 ms
Request succeeded

最後一行輸出了連接耗時。

早先 deepseek 給的一版示例中,未給 Cronet_Executor 提供回調函數:

    Cronet_ExecutorPtr executor = Cronet_Executor_CreateWith(NULL);

編譯正常,但運行到第一個回調時,就會崩潰:

$ ./cronet_conn_stat
request finished listener registered
Segmentation fault: 11

windows 上掛上調度器看甚至有詳細的崩潰堆棧:

cronet_crash

明顯是在在第一個回調 (on_response_started) 中訪問空指針崩了,增加 executor 回調設置後,就正常了。所以有時候 AI 給出的結果也不完全靠譜,還得自己去實際跑跑才行。

github demo 中還有一個開關 (ENABLE_EXECUTOR_THREAD),可以控制是否將各種事件的回調放在一個單獨的線程中去執行:

void custom_executor_func(Cronet_Executor *executor, Cronet_Runnable *cronet_task) {
    ExecutorThread* et = (ExecutorThread*)Cronet_Executor_GetClientContext(executor); 
    if (!et) {
        std::cerr << "Executor not initialized!" << std::endl;
        return;
    }

    // 將Cronet的任務包裝成std::function
    if (cronet_task) {
        et->postTask([cronet_task]() {
            // 執行Cronet任務
            Cronet_Runnable_Run(cronet_task);
        });
    }
}

為此還引入了一個線程與函數的投遞封裝類 (ExectorThread),有興趣的讀者可以去研究下。

參考

[1]. blob/main/components/cronet/native/test/url_request_test.cc

[2]. Cronet 請求生命週期

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.