Stories

Detail Return Return

如何優雅地組織Rust項目中的異步代碼? - Stories Detail

概要

很多使用過Async Rust的人都可能有過被其要求的約束所困擾的經歷,例如,spawned task'static的要求,MutexGuard不能跨越.await,等等。克服這些約束需要仔細地設計代碼結構,很可能會導致晦澀和嵌套的代碼,這對開發人員和審查人員都是一種挑戰。在這篇文章中,我將首先列出我在編寫async Rust代碼時的一些痛點。然後,我將指出我們真正需要異步代碼的場景,並討論為什麼我們應該把異步和非異步代碼分開。最後,我將展示我是如何在最近的一次Curp重構中實踐這一原則的。

痛點:

Spawned Task必須是'static

在spawn一個新的async task的時候,編譯器並不知道該task會被執行多久,可能很短暫,也可能會一直執行至程序運行結束。所以,編譯器會要求該task所含的所有類型都擁有'static的生命週期。

這樣的限制使得我們常常能在spawn前看到不少的clone代碼。當然,這些代碼從某種角度來講可以幫助程序員更好地理清哪些變量的所有權是要被移交給新的task的,但同時,也會使得代碼看上去很囉嗦,不夠簡潔。

let a_arc = Arc::clone(&a);
let b_arc = Arc::clone(&b);
tokio::spawn(async move {
    // ...
});

Send的變量的持有不可以跨越.await

這點限制背後的原因tokio的task並不是固定在一個線程上執行的,空閒線程會主動“偷取”忙碌線程的task,這就需要task可以被Send

請看下面一段代碼:

let mut log_l = log.lock();
log_l.append(new_entry.clone());
broadcast(new_entry).await;

嘗試編譯後,會發現報錯:log_l不能跨越.await點持有。

自然,為了使得拿着鎖的critical section儘量地短,我們不需要拿着鎖過.await點,所以我們在其中加一行放鎖的代碼:

let mut log_l = log.lock();
log_l.append(new_entry.clone());
drop(log_l);
broadcast(new_entry).await;

很可惜,還是不能通過編譯,這是因為編譯器目前只能通過計算代碼Scope的方式來判斷一個task是否可以被Send。如果説上一個痛點還有一定的好處,那麼這個問題就純粹來源於編譯器的限制了。所以我們必須把代碼改成這個樣子:

{
    let mut log_w = log.write();
    log_w.append(new_entry.clone());
}
broadcast(new_entry).await;

如果一個函數中有需要拿多把鎖,又有很多的異步調用,代碼就會嵌套起來,變得複雜晦澀。
Side Note: 我們知道tokio自己有個異步的鎖tokio::sync::Mutex,它是可以被hold過.await的。但要注意的是,大多數情況下,我們並不會需要異步鎖,因為異步鎖通常意味着拿着鎖的critical section是會非常長的。所以,如果我們需要在異步代碼中拿鎖,不要不加思索地使用異步鎖,事實上,在tokio官方文檔中,也是更加建議使用同步鎖的。

使用異步Rust的場景和組織方式

如果我們經常在項目開發中遇到上述問題,自然就會開始思考其產生的原因以及該怎樣避免。我認為一個很重要的因素就是沒有把async和非async的代碼給分開,或者説,更本質的原因是我們沒有在設計項目架構的時候將需要async的部分和不需要async的部分分開。所以接下來,我將梳理我們什麼時候才能真正地用到Async Rust?

I/O

當我們進行比較耗時的I/O操作,我們不想讓這些操作block住我們當前的線程。所以我們用異步I/O,當運行到await 的時候,I/O就可以到後台去做,讓其它的task執行。

// .await will enable other scheduled tasks to progress
let mut file = File::create(“foo.txt”).await?;

file.write(b"some bytes”).await?;

後台任務

後台任務的task通常會伴隨着一個channel的接收端出現。

tokio::spawn(async move {
    while let Some(job) = rx.recv().await {
        // ...
    }
};

併發任務

併發地spawn多個task可以更高效地利用多核處理器。

let chunks = data.chunks(data.len() / N_TASKS);
for chunk in chunks {
  tokio::spawn(work_on(chunk));
}

依賴等待

使用.await等待依賴。這種使用相對較少一些。

// wait for some event
event.listen().await;

// barrier
barrier.wait().await;

可以看到,使用Async代碼的地方,主要集中在I/O、併發與後台任務。在開發之前,我們也不妨有意識地去分離項目中的async與sync部分:縮小Async部分的函數,將處理邏輯移動至普通函數中。將這兩部分分離,不僅可以緩解文章開頭所説的痛點,更可以幫我理清代碼結構。

{
    let mut log_w = log.write();
    log_w.append(new_entry.clone());
    // ...
}
broadcast(new_entry).await;

// move the logic to another function instead

fn update_log(log: &mut Log, new_entry: Entry) {
    log.append(new_entry);
    // ...
}

update_log(&mut log.write(), new_entry.clone());
broadcast(new_entry).await;

關於Curp的一次大型重構

在重構之前,由於一次次的迭代,代碼的可讀性和結構變得越來越差。具體來説,由於我們有若干個帶鎖結構需要在curp server的各個部分中共享,而curp server的大部分函數又是async的,async和拿鎖的代碼混雜在一起,就導致了我們常常在開發過程中遇到上述痛點。

所以,我們重新調整了curp server的結構,將其分為了async部分的CurpNode和非async部分的RawCurpCurpNode包括了異步IO(接收,發送網絡請求,數據持久化),後台任務(定時檢查leader活性,leader在每個節點上覆制數據、校準各follower);RawCurp可被視為一個狀態機,它接收來自CurpNode的調用,並更新狀態。如果RawCurp想要做一些異步操作(比如廣播心跳),它就可以通過返回值讓CurpNode去替它發請求。
162e824b54ad70849251e7d6ce8568d0.jpg

舉一個tick task的例子,在未refactor之前,由於我們不能LockGuard不能過.await點,以及有多邏輯分支的限制,不得不將代碼組織成這樣的一個形式:


    loop {
        let _now = ticker.tick().await;
        let task = {
            let state_c = Arc::clone(&state);
            let state_r = state.upgradable_read();
            if state_r.is_leader() {
                if state_r.needs_hb
                {
                    let resps = bcast_heartbeats(connects.clone(), state_r, rpc_timeout);
                    Either::Left(handle_heartbeat_responses(
                        resps,
                        state_c,
                        Arc::clone(&timeout),
                    ))
                } else {
                    continue;
                }
            } else {
                let mut state_w = RwLockUgradableReadGuard::upgrade(state_r);
                // ...
                let resps = bcast_votes(connects.clone(), state_r, rpc_timeout);
                Either::Right(handle_vote_responses(resps, state_c))
            }
        };
        task.await;
    }

在refactor之後,處理邏輯都被放在了RawCurp中,CurpNode中的代碼就清晰多了:


loop {
    let _now = ticker.tick().await;
    let action = curp.tick();
    match action {
        TickAction::Heartbeat(hbs) => {
            Self::bcast_heartbeats(Arc::clone(&curp), &connects, hbs).await;
        }
        TickAction::Votes(votes) => {
            Self::bcast_votes(Arc::clone(&curp), &connects, votes).await;
        }
        TickAction::Nothing => {}
    }
}

我們的項目:Xline

Xline是一個用於元數據管理的分佈式KV存儲。以上為對Xline中使用的Curp共識協議的重構總結。

如果你想了解更多關於Xline的信息,請參考我們的Github:https://github.com/datenlord/Xline

達坦科技(DatenLord)專注下一代雲計算——“天空計算”的基礎設施技術,致力於拓寬雲計算的邊界。達坦科技打造的新一代開源跨雲存儲平台DatenLord,通過軟硬件深度融合的方式打通云云壁壘,實現無限制跨雲存儲、跨雲聯通,建立海量異地、異構數據的統一存儲訪問機制,為雲上應用提供高性能安全存儲支持。以滿足不同行業客户對海量數據跨雲、跨數據中心高性能訪問的需求。

公眾號:達坦科技DatenLord
知乎賬號:
https://www.zhihu.com/org/da-tan-ke-ji
B站:
https://space.bilibili.com/2017027518

user avatar u_15745565 Avatar u_17176510 Avatar jordan_haidee Avatar yeauty_60d93baf449fd Avatar 5si66p3e Avatar cloudimagine Avatar woyaofeidegenggao_6395f006f02b5 Avatar
Favorites 7 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.