动态

详情 返回 返回

Actor併發系統説明與使用 - 动态 详情

簡介

Actor模型是一種並行計算模型,提供了一種用於構建併發、分佈式系統的抽象方法

Actor模型中,計算被表示為獨立的、輕量級的計算單元,稱為Actor,可以發送和接收消息並進行本地計算

作為一種通用的消息傳遞編程模型,被廣泛用於構建大規模可伸縮分佈式系統

核心思想是獨立維護隔離狀態,並基於消息傳遞實現異步通信

Actor模型組成

  • 存儲:每個 Actor 持有一個郵箱(mailbox),本質上是一個隊列,用於存儲消息
  • 通信:每個 Actor 可以發送消息至任何 Actor,使用異步消息傳遞,不保證消息到達目標 Actor 時的順序
  • 計算: Actor 可以通過處理消息來更新內部狀態,對於外部而言,Actor 的狀態是隔離的isolated state

優勢

  • 每個 Actor 獨立運行,因此程序自然是並行的
  • 每個Actor都完全獨立於其他實例,不存在共享內存和競爭條件的問題,可以避免併發編程中的一些難點
  • 通過消息傳遞進行通信,每個Actor都有一個郵箱來接收消息,每次只處理一個消息,以確保狀態的一致性和線程安全
  • 消息傳遞是異步的,發送方不需要等待接收方的響應,從而避免了鎖和同步的開銷
  • 提供高度的併發性和可擴展性,能夠有效地解決多核CPU和分佈式系統中的併發編程問題
  • 提供良好的容錯性和可恢復性,因為每個Actor都有自己的狀態和行為,可以更容易地實現系統的容錯和恢復

Actor 基於線程的任務調度

為每一個 Actor 分配一個獨立的執行過程(線程),獨佔該線程,可以是操作系統線程,協程或者虛擬機線程

如果當前 Actor 的郵箱為空,Actor 會阻塞當前線程,等待接收新的消息

由於線程數量受到系統的限制,因此 Actor 的數量也會受到限制

Actor基於事件驅動的任務調度

只有在事件觸發(即接收消息)時,才為 Actor 的任務分配線程並執行,當事件處理完畢,即退出線程

該方式可以使用很少的線程來執行大量 Actor 產生的任務,也是現在大部分Actor模型所採用的調度方式

這種實現與 run loopevent loop 機制非常相似

rust中的Actor模型實現-actix

Actix 建立在 Actor 模型的基礎上,允許將應用程序編寫為一組獨立執行但協作的Actor,這些Actor通過消息進行通信

Actor 是封裝狀態和行為的對象,並在actix 庫提供的Actor系統中運行

Actor 在特定的執行上下文 Context<A> 中運行, 上下文對象僅在執行期間可用, 每個 Actor 都有一個單獨的執行上下文,執行上下文還控制 Actor 的生命週期。 Actor 僅通過交換消息進行通信

任何 Rust 類型都可以是 Actor,只需要實現 Actor Trait 即可

為了能夠處理特定消息,參與者必須為此消息提供 Handler<M> 實現

基本使用

修改Cargo.toml

[dependencies]
actix = "0.11.0"
actix-rt = "2.2"

修改main.rs

use actix::prelude::*;

/// 定義消息的類型,rtype指定返回類型
#[derive(Message)]
#[rtype(result = "Result<bool, std::io::Error>")]
struct Ping;

// 定義Actor
struct MyActor;

// 集成Actor Trait
impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Context<Self>) {
        println!("Actor is alive");
    }

    fn stopped(&mut self, ctx: &mut Context<Self>) {
        println!("Actor is stopped");
    }
}

/// Define handler for `Ping` message
impl Handler<Ping> for MyActor {
    type Result = Result<bool, std::io::Error>;

    fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {
        println!("Ping received");

        Ok(true)
    }
}

#[actix_rt::main]
async fn main() {
    // Start MyActor in current thread
    let addr = MyActor.start();

    // Send Ping message.
    // send() message returns Future object, that resolves to message result
    let result = addr.send(Ping).await;

    match result {
        Ok(res) => println!("Got result: {}", res.unwrap()),
        Err(err) => println!("Got error: {}", err),
    }
}

運行結果

Actor is alive
Ping received
Got result: true

閲讀參考

淺談 Actor 模型

懶人專用高併發:Actor模型

Actix Actor官方文檔

user avatar ssbunny 头像 spacewander 头像 chanmufeng 头像 zhaozixing 头像 8848_62c77d4bb2532 头像 dongdong_chainwiseweb3 头像 shenchendemaoyi 头像 jamesfancy 头像 xiongshihubao 头像 meiduyandechengzi 头像 kuanrongdeshanyang 头像 yeauty_60d93baf449fd 头像
点赞 13 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.