第20章 異步編程_#python

文章目錄

  • 第20章 異步編程
  • 20.1 async/await語法
  • 異步函數和異步塊
  • await關鍵字和Future執行
  • 錯誤處理 in Async Code
  • 20.2 Future trait執行
  • Future trait詳解
  • Pin和內存安全
  • 手動實現複雜的Future
  • 20.3 異步運行時選擇
  • Tokio運行時
  • Async-std運行時
  • 選擇運行時和性能考慮
  • 20.4 構建高性能異步應用
  • 任務調度和負載均衡
  • 連接池和資源管理
  • 背壓和流量控制
  • 總結

第20章 異步編程

異步編程是現代軟件開發中的重要範式,它允許程序在等待I/O操作(如網絡請求、文件讀寫等)時執行其他任務,從而大大提高程序的併發性能和資源利用率。Rust的異步編程模型以其零成本抽象和高性能而聞名,本章將深入探討Rust異步編程的各個方面。

20.1 async/await語法

Rust通過asyncawait關鍵字提供了直觀的異步編程語法,讓開發者能夠以近乎同步的方式編寫異步代碼,同時保持高性能。

異步函數和異步塊

在Rust中,任何函數都可以通過添加async關鍵字轉變為異步函數。異步函數在調用時不會立即執行,而是返回一個實現了Future trait的類型。

// 基本的異步函數
async fn simple_async_function() -> u32 {
    println!("Starting async function");
    42
}

// 異步方法
struct AsyncProcessor;

impl AsyncProcessor {
    async fn process_data(&self, data: &str) -> String {
        format!("Processed: {}", data)
    }
}

// 異步塊
async fn async_block_example() {
    let result = async {
        println!("Inside async block");
        "Hello from async block"
    }.await;
    
    println!("{}", result);
}

// 在trait中使用異步方法(需要async-trait crate或使用nightly特性)
// 使用async-trait crate的示例
// #[async_trait]
// trait DataFetcher {
//     async fn fetch(&self) -> Result<String, Box<dyn Error>>;
// }

fn demonstrate_async_basics() {
    // 異步函數返回Future,需要執行器來運行
    let future = simple_async_function();
    println!("Async function returned a Future: {:?}", future);
    
    // 在實際應用中,我們會使用運行時來執行這些future
}

await關鍵字和Future執行

await關鍵字用於等待異步操作的完成。它只能在異步上下文中使用,並且會掛起當前任務直到Future完成。

use std::time::Duration;
use tokio::time::sleep;

// 模擬異步操作
async fn simulated_async_work(name: &str, duration_ms: u64) -> String {
    println!("[{}] Starting work", name);
    sleep(Duration::from_millis(duration_ms)).await;
    println!("[{}] Work completed", name);
    format!("Result from {}", name)
}

// 順序執行異步操作
async fn sequential_execution() {
    println!("=== Sequential Execution ===");
    
    let start = std::time::Instant::now();
    
    let result1 = simulated_async_work("Task 1", 500).await;
    println!("Got: {}", result1);
    
    let result2 = simulated_async_work("Task 2", 300).await;
    println!("Got: {}", result2);
    
    let result3 = simulated_async_work("Task 3", 200).await;
    println!("Got: {}", result3);
    
    let duration = start.elapsed();
    println!("Total time: {:?}", duration);
}

// 使用join併發執行
async fn concurrent_execution() {
    println!("\n=== Concurrent Execution ===");
    
    let start = std::time::Instant::now();
    
    // 使用tokio::join!併發執行多個future
    let (result1, result2, result3) = tokio::join!(
        simulated_async_work("Task A", 500),
        simulated_async_work("Task B", 300),
        simulated_async_work("Task C", 200)
    );
    
    println!("Got: {}, {}, {}", result1, result2, result3);
    
    let duration = start.elapsed();
    println!("Total time: {:?}", duration);
}

// 使用select處理多個異步操作
async fn selective_execution() {
    println!("\n=== Selective Execution ===");
    
    let mut fast_task = simulated_async_work("Fast Task", 100);
    let mut slow_task = simulated_async_work("Slow Task", 500);
    
    tokio::select! {
        result = &mut fast_task => {
            println!("Fast task finished first: {}", result);
        }
        result = &mut slow_task => {
            println!("Slow task finished first: {}", result);
        }
    }
    
    // 另一個任務可能還在運行,我們可以選擇等待它
    // 或者直接丟棄它(會被取消)
}

#[tokio::main]
async fn main() {
    sequential_execution().await;
    concurrent_execution().await;
    selective_execution().await;
}

錯誤處理 in Async Code

異步代碼中的錯誤處理與同步代碼類似,但有一些特殊的考慮因素。

use std::io;
use tokio::fs;

// 異步錯誤處理
async fn read_file_async(filename: &str) -> Result<String, io::Error> {
    let content = fs::read_to_string(filename).await?;
    Ok(content)
}

// 組合多個異步操作的錯誤處理
async fn process_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
    // 使用?在異步函數中傳播錯誤
    let file1 = read_file_async("file1.txt").await?;
    let file2 = read_file_async("file2.txt").await?;
    
    println!("File1: {}", file1);
    println!("File2: {}", file2);
    
    Ok(())
}

// 使用try_join!處理併發操作的錯誤
async fn process_files_concurrently() -> Result<(), Box<dyn std::error::Error>> {
    let (result1, result2) = tokio::try_join!(
        read_file_async("file1.txt"),
        read_file_async("file2.txt")
    )?;
    
    println!("File1: {}", result1);
    println!("File2: {}", result2);
    
    Ok(())
}

// 超時處理
async fn operation_with_timeout() -> Result<String, Box<dyn std::error::Error>> {
    use tokio::time::timeout;
    
    let slow_operation = async {
        sleep(Duration::from_secs(10)).await;
        "Operation completed".to_string()
    };
    
    // 設置5秒超時
    match timeout(Duration::from_secs(5), slow_operation).await {
        Ok(result) => Ok(result),
        Err(_) => Err("Operation timed out".into()),
    }
}

// 重試機制
async fn retry_async_operation<F, T, E>(
    mut operation: F,
    max_retries: usize,
) -> Result<T, E>
where
    F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
    E: std::fmt::Debug,
{
    for attempt in 0..=max_retries {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if attempt == max_retries => return Err(e),
            Err(e) => {
                println!("Attempt {} failed: {:?}, retrying...", attempt + 1, e);
                sleep(Duration::from_millis(100 * 2u64.pow(attempt as u32))).await;
            }
        }
    }
    unreachable!()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 錯誤處理示例
    match read_file_async("nonexistent.txt").await {
        Ok(content) => println!("File content: {}", content),
        Err(e) => println!("Error reading file: {}", e),
    }
    
    // 超時示例
    match operation_with_timeout().await {
        Ok(result) => println!("Success: {}", result),
        Err(e) => println!("Error: {}", e),
    }
    
    // 重試示例
    let mut attempts = 0;
    let result = retry_async_operation(
        || {
            attempts += 1;
            Box::pin(async move {
                if attempts < 3 {
                    Err("Temporary failure")
                } else {
                    Ok("Success")
                }
            })
        },
        3,
    ).await;
    
    println!("Retry result: {:?}", result);
    
    Ok(())
}

20.2 Future trait執行

理解Future trait是掌握Rust異步編程的關鍵。Future代表了一個可能還沒有就緒的值,它構成了Rust異步編程的基礎。

Future trait詳解

Future trait定義了異步計算的基本接口。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

// 自定義Future實現
struct SimpleTimer {
    duration: Duration,
    elapsed: bool,
}

impl SimpleTimer {
    fn new(duration: Duration) -> Self {
        Self {
            duration,
            elapsed: false,
        }
    }
}

impl Future for SimpleTimer {
    type Output = &'static str;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.elapsed {
            Poll::Ready("Timer finished!")
        } else {
            // 在實際實現中,這裏會設置一個waker來在時間到時喚醒任務
            // 這裏簡化實現,直接標記為就緒
            self.elapsed = true;
            
            // 克隆waker以便在定時器完成後喚醒任務
            let waker = cx.waker().clone();
            
            // 在實際場景中,我們會在這裏啓動一個定時器
            // 當定時器到期時調用waker.wake()
            tokio::spawn(async move {
                sleep(Duration::from_millis(100)).await;
                waker.wake();
            });
            
            Poll::Pending
        }
    }
}

// 組合Future
struct AndThen<F1, F2> {
    first: F1,
    second: Option<F2>,
}

impl<F1, F2> AndThen<F1, F2> {
    fn new(first: F1, second: F2) -> Self {
        Self {
            first,
            second: Some(second),
        }
    }
}

impl<F1, F2, A, B, E> Future for AndThen<F1, F2>
where
    F1: Future<Output = Result<A, E>>,
    F2: Future<Output = Result<B, E>>,
{
    type Output = Result<B, E>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if let Some(second) = &mut self.second {
            // 輪詢第一個future
            match Pin::new(&mut self.first).poll(cx) {
                Poll::Ready(Ok(_)) => {
                    // 第一個future成功完成,開始輪詢第二個
                    let second_future = self.second.take().unwrap();
                    Pin::new(&mut Some(second_future)).poll(cx)
                }
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Pending => Poll::Pending,
            }
        } else {
            // 這種情況不應該發生
            panic!("AndThen future in invalid state");
        }
    }
}

// 使用自定義Future
async fn use_custom_future() {
    println!("Starting custom timer...");
    let result = SimpleTimer::new(Duration::from_secs(1)).await;
    println!("Custom timer: {}", result);
}

#[tokio::main]
async fn main() {
    use_custom_future().await;
}

Pin和內存安全

Pin是Rust異步編程中的關鍵概念,它確保了Future在內存中的位置不會改變,這對於自引用結構體至關重要。

use std::marker::PhantomPinned;
use std::pin::Pin;

// 自引用結構體示例
struct SelfReferential {
    data: String,
    pointer_to_data: *const String,
    _pin: PhantomPinned, // 標記為!Unpin
}

impl SelfReferential {
    fn new(data: String) -> Self {
        Self {
            data,
            pointer_to_data: std::ptr::null(),
            _pin: PhantomPinned,
        }
    }
    
    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.data;
        unsafe {
            let this = self.get_unchecked_mut();
            this.pointer_to_data = self_ptr;
        }
    }
    
    fn get_data(self: Pin<&Self>) -> &str {
        unsafe {
            &*self.pointer_to_data
        }
    }
}

// 為SelfReferential實現Future
impl Future for SelfReferential {
    type Output = &'static str;
    
    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready("SelfReferential future completed")
    }
}

// Pin的實用示例
async fn pin_examples() {
    // 棧上固定
    let mut value = SelfReferential::new("hello".to_string());
    let mut pinned_value = unsafe { Pin::new_unchecked(&mut value) };
    pinned_value.as_mut().init();
    
    println!("Data through pointer: {}", pinned_value.as_ref().get_data());
    
    // 堆上固定
    let boxed_value = Box::pin(SelfReferential::new("world".to_string()));
    
    // 使用pin!宏(Rust 1.68+)
    // let pinned_value = pin!(SelfReferential::new("hello".to_string()));
}

// 處理Unpin類型
async fn unpin_examples() {
    // 大多數類型都是Unpin的,可以自由移動
    let mut x = 5;
    let pinned_x = Pin::new(&mut x);
    
    // Unpin類型可以被安全地移動
    let mut y = 10;
    let pinned_y = Pin::new(&mut y);
    
    // 即使被pin住,Unpin類型仍然可以移動
    std::mem::swap(&mut x, &mut y);
}

#[tokio::main]
async fn main() {
    pin_examples().await;
    unpin_examples().await;
}

手動實現複雜的Future

對於高級用例,我們可能需要手動實現複雜的Future。

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

// 異步通道實現
struct AsyncChannel<T> {
    queue: Arc<Mutex<VecDeque<T>>>,
    wakers: Arc<Mutex<Vec<std::task::Waker>>>,
}

impl<T> AsyncChannel<T> {
    fn new() -> Self {
        Self {
            queue: Arc::new(Mutex::new(VecDeque::new())),
            wakers: Arc::new(Mutex::new(Vec::new())),
        }
    }
    
    fn send(&self, value: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push_back(value);
        
        // 喚醒一個等待的接收者
        let mut wakers = self.wakers.lock().unwrap();
        if let Some(waker) = wakers.pop() {
            waker.wake();
        }
    }
    
    fn try_receive(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop_front()
    }
}

// 接收Future
struct ReceiveFuture<T> {
    channel: Arc<AsyncChannel<T>>,
}

impl<T> ReceiveFuture<T> {
    fn new(channel: &AsyncChannel<T>) -> Self {
        Self {
            channel: Arc::clone(&channel.queue),
        }
    }
}

impl<T> Future for ReceiveFuture<T> {
    type Output = T;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 嘗試立即接收
        if let Some(value) = self.channel.try_receive() {
            return Poll::Ready(value);
        }
        
        // 沒有數據可用,註冊waker
        let mut wakers = self.channel.wakers.lock().unwrap();
        wakers.push(cx.waker().clone());
        Poll::Pending
    }
}

// 使用自定義通道
async fn use_async_channel() {
    let channel = Arc::new(AsyncChannel::new());
    let channel_clone = Arc::clone(&channel);
    
    // 生產者任務
    let producer = tokio::spawn(async move {
        for i in 0..5 {
            channel_clone.send(i);
            println!("Sent: {}", i);
            sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 消費者任務
    let consumer = tokio::spawn(async move {
        for _ in 0..5 {
            let receive_future = ReceiveFuture::new(&channel);
            let value = receive_future.await;
            println!("Received: {}", value);
        }
    });
    
    let _ = tokio::join!(producer, consumer);
}

#[tokio::main]
async fn main() {
    use_async_channel().await;
}

20.3 異步運行時選擇

Rust的標準庫只提供了Future trait,實際的異步執行需要依賴第三方運行時。目前最流行的運行時是tokio和async-std。

Tokio運行時

Tokio是功能最全面的異步運行時,提供了完整的異步I/O、定時器、同步原語等。

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, Mutex, RwLock};
use std::sync::Arc;

// Tokio TCP服務器示例
async fn tcp_server() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Server listening on 127.0.0.1:8080");
    
    // 使用共享狀態
    let connection_count = Arc::new(Mutex::new(0u32));
    
    loop {
        let (socket, addr) = listener.accept().await?;
        println!("Accepted connection from: {}", addr);
        
        let count = Arc::clone(&connection_count);
        tokio::spawn(async move {
            // 更新連接計數
            let mut guard = count.lock().await;
            *guard += 1;
            println!("Active connections: {}", *guard);
            drop(guard); // 儘早釋放鎖
            
            if let Err(e) = handle_client(socket).await {
                eprintln!("Error handling client: {}", e);
            }
            
            // 連接關閉,減少計數
            let mut guard = count.lock().await;
            *guard -= 1;
            println!("Active connections: {}", *guard);
        });
    }
}

async fn handle_client(mut socket: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    let mut buf = [0; 1024];
    
    loop {
        let n = socket.read(&mut buf).await?;
        if n == 0 {
            return Ok(());
        }
        
        // 回顯接收到的數據
        socket.write_all(&buf[0..n]).await?;
    }
}

// Tokio通道示例
async fn channel_example() {
    let (tx, mut rx) = mpsc::channel(32);
    
    // 生產者任務
    let producer = tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.expect("Failed to send");
            sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 消費者任務
    let consumer = tokio::spawn(async move {
        while let Some(value) = rx.recv().await {
            println!("Received: {}", value);
        }
    });
    
    tokio::join!(producer, consumer);
}

// Tokio同步原語
async fn synchronization_examples() {
    // Mutex示例
    let mutex = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for i in 0..10 {
        let mutex_clone = Arc::clone(&mutex);
        let handle = tokio::spawn(async move {
            let mut guard = mutex_clone.lock().await;
            *guard += i;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
    
    let value = mutex.lock().await;
    println!("Final mutex value: {}", *value);
    
    // RwLock示例
    let rwlock = Arc::new(RwLock::new(String::new()));
    let reader_lock = Arc::clone(&rwlock);
    
    let writer = tokio::spawn(async move {
        let mut guard = rwlock.write().await;
        *guard = "Hello, World!".to_string();
    });
    
    let reader = tokio::spawn(async move {
        // 等待寫入完成
        sleep(Duration::from_millis(10)).await;
        let guard = reader_lock.read().await;
        println!("Read: {}", *guard);
    });
    
    tokio::join!(writer, reader);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("=== Tokio Channel Example ===");
    channel_example().await;
    
    println!("\n=== Tokio Synchronization Example ===");
    synchronization_examples().await;
    
    // 注意:TCP服務器示例被註釋掉了,以免阻塞測試
    // println!("\n=== Starting TCP Server ===");
    // tcp_server().await?;
    
    Ok(())
}

Async-std運行時

Async-std提供了與標準庫類似的API,但所有阻塞操作都是異步的。

// 需要在Cargo.toml中添加async-std依賴
// async-std = "1.12"

use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;

// Async-std TCP服務器
async fn async_std_server() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8081").await?;
    println!("Async-std server listening on 127.0.0.1:8081");
    
    let mut incoming = listener.incoming();
    
    while let Some(stream) = incoming.next().await {
        let mut stream = stream?;
        task::spawn(async move {
            let mut buf = [0; 1024];
            
            loop {
                let n = stream.read(&mut buf).await.unwrap_or(0);
                if n == 0 {
                    break;
                }
                stream.write_all(&buf[0..n]).await.unwrap();
            }
        });
    }
    
    Ok(())
}

// Async-std文件操作
async fn async_file_operations() -> Result<(), Box<dyn std::error::Error>> {
    use async_std::fs;
    
    // 異步寫入文件
    let mut file = fs::File::create("test.txt").await?;
    file.write_all(b"Hello, async-std!").await?;
    
    // 異步讀取文件
    let content = fs::read_to_string("test.txt").await?;
    println!("File content: {}", content);
    
    // 清理
    fs::remove_file("test.txt").await?;
    
    Ok(())
}

// 手動運行async-std
fn run_async_std_examples() -> Result<(), Box<dyn std::error::Error>> {
    // 使用async_std::main屬性宏更簡單
    // 這裏展示手動運行的方式
    task::block_on(async {
        println!("=== Async-std File Operations ===");
        async_file_operations().await?;
        
        // 注意:服務器示例被註釋掉了
        // println!("\n=== Starting Async-std TCP Server ===");
        // async_std_server().await?;
        
        Ok(())
    })
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_async_std_examples()
}

選擇運行時和性能考慮

在選擇運行時和設計異步應用時,需要考慮多個因素。

use std::time::Instant;

// 性能基準測試
async fn benchmark_async_operations() {
    const OPERATIONS: usize = 10000;
    
    // 測試生成大量任務的開銷
    let start = Instant::now();
    let mut handles = Vec::with_capacity(OPERATIONS);
    
    for i in 0..OPERATIONS {
        let handle = tokio::spawn(async move {
            i * 2
        });
        handles.push(handle);
    }
    
    for handle in handles {
        let _ = handle.await;
    }
    
    let duration = start.elapsed();
    println!("Spawned {} tasks in {:?}", OPERATIONS, duration);
    
    // 測試異步通道性能
    let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
    
    let producer = tokio::spawn(async move {
        for i in 0..OPERATIONS {
            tx.send(i).await.unwrap();
        }
    });
    
    let consumer = tokio::spawn(async move {
        let mut count = 0;
        while let Some(_) = rx.recv().await {
            count += 1;
        }
        count
    });
    
    let start = Instant::now();
    let (_, received_count) = tokio::join!(producer, consumer);
    let duration = start.elapsed();
    
    println!("Processed {} messages in {:?}", received_count, duration);
}

// 運行時配置
fn configure_runtime() {
    // Tokio運行時配置
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4) // 配置工作線程數
        .enable_io()       // 啓用I/O
        .enable_time()     // 啓用定時器
        .thread_name("my-tokio-worker")
        .build()
        .unwrap();
    
    rt.block_on(async {
        println!("Running on custom configured runtime");
        benchmark_async_operations().await;
    });
}

// 選擇策略
fn runtime_selection_guidance() {
    println!("\n=== Runtime Selection Guidance ===");
    println!("Choose Tokio when you need:");
    println!("- Full-featured async ecosystem");
    println!("- High-performance networking");
    println!("- Advanced features like tracing, metrics");
    println!("- Production-grade reliability");
    
    println!("\nChoose async-std when you need:");
    println!("- Standard library-like API");
    println!("- Simplicity and ease of use");
    println!("- Good performance for general use cases");
    
    println!("\nConsider smol for:");
    println!("- Minimal dependencies");
    println!("- Embedded systems");
    println!("- Custom runtime needs");
}

#[tokio::main]
async fn main() {
    println!("=== Runtime Performance Benchmark ===");
    benchmark_async_operations().await;
    
    println!("\n=== Custom Runtime Configuration ===");
    configure_runtime();
    
    runtime_selection_guidance();
}

20.4 構建高性能異步應用

構建高性能異步應用需要仔細考慮任務調度、資源管理和錯誤處理等方面。

任務調度和負載均衡

use tokio::sync::mpsc;
use std::collections::VecDeque;

// 簡單的負載均衡器
struct LoadBalancer<T> {
    workers: Vec<mpsc::Sender<T>>,
    next_worker: usize,
}

impl<T> LoadBalancer<T>
where
    T: Send + 'static,
{
    fn new(worker_count: usize, buffer_size: usize) -> Self {
        let mut workers = Vec::with_capacity(worker_count);
        
        for i in 0..worker_count {
            let (tx, mut rx) = mpsc::channel(buffer_size);
            
            // 啓動工作線程
            tokio::spawn(async move {
                while let Some(task) = rx.recv().await {
                    println!("Worker {} processing task: {:?}", i, task);
                    // 模擬工作負載
                    sleep(Duration::from_millis(100)).await;
                }
            });
            
            workers.push(tx);
        }
        
        Self {
            workers,
            next_worker: 0,
        }
    }
    
    async fn dispatch(&mut self, task: T) -> Result<(), mpsc::error::SendError<T>> {
        let worker_index = self.next_worker;
        self.next_worker = (self.next_worker + 1) % self.workers.len();
        
        self.workers[worker_index].send(task).await
    }
}

// 工作竊取調度器
struct WorkStealingScheduler<T> {
    queues: Vec<Mutex<VecDeque<T>>>,
}

impl<T> WorkStealingScheduler<T>
where
    T: Send + 'static,
{
    fn new(worker_count: usize) -> Self {
        let mut queues = Vec::with_capacity(worker_count);
        
        for _ in 0..worker_count {
            queues.push(Mutex::new(VecDeque::new()));
        }
        
        Self { queues }
    }
    
    async fn push(&self, worker_id: usize, task: T) {
        let mut queue = self.queues[worker_id].lock().await;
        queue.push_back(task);
    }
    
    async fn pop(&self, worker_id: usize) -> Option<T> {
        // 首先嚐試自己的隊列
        if let Some(task) = self.queues[worker_id].lock().await.pop_front() {
            return Some(task);
        }
        
        // 工作竊取:嘗試其他隊列
        for i in 0..self.queues.len() {
            if i != worker_id {
                if let Some(task) = self.queues[i].lock().await.pop_back() { // 從尾部竊取
                    return Some(task);
                }
            }
        }
        
        None
    }
}

async fn demonstrate_scheduling() {
    println!("=== Load Balancer Example ===");
    
    let mut load_balancer = LoadBalancer::new(4, 10);
    
    for i in 0..20 {
        load_balancer.dispatch(format!("Task {}", i)).await.unwrap();
    }
    
    // 等待任務完成
    sleep(Duration::from_secs(3)).await;
    
    println!("\n=== Work Stealing Example ===");
    
    let scheduler = Arc::new(WorkStealingScheduler::new(4));
    
    // 啓動工作線程
    let mut handles = Vec::new();
    for worker_id in 0..4 {
        let scheduler = Arc::clone(&scheduler);
        let handle = tokio::spawn(async move {
            for task_id in 0..5 {
                scheduler.push(worker_id, format!("Worker {} Task {}", worker_id, task_id)).await;
            }
            
            // 處理任務
            while let Some(task) = scheduler.pop(worker_id).await {
                println!("Worker {} got: {}", worker_id, task);
                sleep(Duration::from_millis(50)).await;
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    demonstrate_scheduling().await;
}

連接池和資源管理

use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;

// 簡單的連接池
struct ConnectionPool<T> {
    connections: Mutex<Vec<T>>,
    semaphore: Semaphore,
    max_size: usize,
    current_size: AtomicUsize,
}

impl<T> ConnectionPool<T>
where
    T: Default + Send + 'static,
{
    fn new(max_size: usize) -> Self {
        Self {
            connections: Mutex::new(Vec::new()),
            semaphore: Semaphore::new(max_size),
            max_size,
            current_size: AtomicUsize::new(0),
        }
    }
    
    async fn get_connection(&self) -> PooledConnection<T> {
        // 等待可用許可
        let permit = self.semaphore.acquire().await.unwrap();
        
        // 嘗試從池中獲取連接
        if let Some(connection) = self.connections.lock().await.pop() {
            return PooledConnection {
                connection: Some(connection),
                pool: self,
                _permit: permit,
            };
        }
        
        // 池為空,創建新連接
        let current = self.current_size.fetch_add(1, Ordering::SeqCst);
        if current < self.max_size {
            PooledConnection {
                connection: Some(T::default()),
                pool: self,
                _permit: permit,
            }
        } else {
            self.current_size.fetch_sub(1, Ordering::SeqCst);
            panic!("Connection pool exhausted");
        }
    }
    
    fn return_connection(&self, connection: T) {
        self.connections.lock().blocking_lock().push(connection);
    }
}

// 池化連接
struct PooledConnection<'a, T> {
    connection: Option<T>,
    pool: &'a ConnectionPool<T>,
    _permit: tokio::sync::SemaphorePermit<'a>,
}

impl<'a, T> std::ops::Deref for PooledConnection<'a, T> {
    type Target = T;
    
    fn deref(&self) -> &Self::Target {
        self.connection.as_ref().unwrap()
    }
}

impl<'a, T> std::ops::DerefMut for PooledConnection<'a, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.connection.as_mut().unwrap()
    }
}

impl<'a, T> Drop for PooledConnection<'a, T> {
    fn drop(&mut self) {
        if let Some(connection) = self.connection.take() {
            self.pool.return_connection(connection);
        }
    }
}

// 模擬數據庫連接
#[derive(Default, Debug)]
struct DatabaseConnection {
    id: u32,
}

static CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);

impl Default for DatabaseConnection {
    fn default() -> Self {
        let id = CONNECTION_ID.fetch_add(1, Ordering::SeqCst) as u32;
        Self { id }
    }
}

impl DatabaseConnection {
    async fn query(&self, sql: &str) -> String {
        sleep(Duration::from_millis(50)).await;
        format!("Result from connection {}: {}", self.id, sql)
    }
}

async fn demonstrate_connection_pool() {
    println!("=== Connection Pool Example ===");
    
    let pool = Arc::new(ConnectionPool::<DatabaseConnection>::new(3));
    
    let mut handles = Vec::new();
    
    // 啓動多個任務同時使用連接池
    for i in 0..10 {
        let pool = Arc::clone(&pool);
        let handle = tokio::spawn(async move {
            let connection = pool.get_connection().await;
            let result = connection.query(&format!("SELECT * FROM table_{}", i)).await;
            println!("Task {}: {}", i, result);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
    
    println!("All tasks completed");
}

#[tokio::main]
async fn main() {
    demonstrate_connection_pool().await;
}

背壓和流量控制

use tokio::sync::broadcast;

// 帶背壓的生產者-消費者模式
struct BoundedProcessor<T> {
    processor_tx: mpsc::Sender<T>,
    capacity: usize,
}

impl<T> BoundedProcessor<T>
where
    T: Send + 'static,
{
    fn new<F>(capacity: usize, processor: F) -> Self
    where
        F: Fn(T) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static,
    {
        let (tx, mut rx) = mpsc::channel(capacity);
        
        // 啓動處理器任務
        tokio::spawn(async move {
            while let Some(item) = rx.recv().await {
                processor(item).await;
            }
        });
        
        Self {
            processor_tx: tx,
            capacity,
        }
    }
    
    async fn process(&self, item: T) -> Result<(), mpsc::error::SendError<T>> {
        self.processor_tx.send(item).await
    }
    
    fn capacity(&self) -> usize {
        self.capacity
    }
}

// 自適應速率限制器
struct AdaptiveRateLimiter {
    semaphore: Semaphore,
    max_permits: usize,
    current_permits: AtomicUsize,
}

impl AdaptiveRateLimiter {
    fn new(initial_permits: usize) -> Self {
        Self {
            semaphore: Semaphore::new(initial_permits),
            max_permits: initial_permits,
            current_permits: AtomicUsize::new(initial_permits),
        }
    }
    
    async fn acquire(&self) -> tokio::sync::SemaphorePermit {
        self.semaphore.acquire().await.unwrap()
    }
    
    fn adjust_capacity(&self, new_capacity: usize) {
        let current = self.current_permits.load(Ordering::SeqCst);
        
        if new_capacity > current {
            // 增加許可
            let additional = new_capacity - current;
            self.semaphore.add_permits(additional);
            self.current_permits.store(new_capacity, Ordering::SeqCst);
        } else if new_capacity < current {
            // 減少許可(更復雜,通常需要更精細的實現)
            println!("Reducing capacity from {} to {}", current, new_capacity);
        }
        
        self.max_permits = new_capacity;
    }
}

// 監控和指標收集
struct MetricsCollector {
    request_count: AtomicUsize,
    error_count: AtomicUsize,
    latency_sum: AtomicUsize,
    request_times: Mutex<Vec<Instant>>,
}

impl MetricsCollector {
    fn new() -> Self {
        Self {
            request_count: AtomicUsize::new(0),
            error_count: AtomicUsize::new(0),
            latency_sum: AtomicUsize::new(0),
            request_times: Mutex::new(Vec::new()),
        }
    }
    
    fn record_request(&self) -> Instant {
        self.request_count.fetch_add(1, Ordering::SeqCst);
        Instant::now()
    }
    
    fn record_completion(&self, start: Instant, success: bool) {
        let latency = start.elapsed().as_millis() as usize;
        self.latency_sum.fetch_add(latency, Ordering::SeqCst);
        
        if !success {
            self.error_count.fetch_add(1, Ordering::SeqCst);
        }
    }
    
    fn get_metrics(&self) -> Metrics {
        let total_requests = self.request_count.load(Ordering::SeqCst);
        let errors = self.error_count.load(Ordering::SeqCst);
        let total_latency = self.latency_sum.load(Ordering::SeqCst);
        
        let avg_latency = if total_requests > 0 {
            total_latency / total_requests
        } else {
            0
        };
        
        Metrics {
            total_requests,
            errors,
            avg_latency,
            error_rate: if total_requests > 0 {
                errors as f64 / total_requests as f64
            } else {
                0.0
            },
        }
    }
}

#[derive(Debug)]
struct Metrics {
    total_requests: usize,
    errors: usize,
    avg_latency: usize,
    error_rate: f64,
}

async fn demonstrate_backpressure() {
    println!("=== Backpressure and Flow Control ===");
    
    // 創建有界處理器
    let processor = Arc::new(BoundedProcessor::new(5, |item: usize| {
        Box::pin(async move {
            println!("Processing item: {}", item);
            sleep(Duration::from_millis(100)).await;
        })
    }));
    
    // 創建速率限制器
    let rate_limiter = Arc::new(AdaptiveRateLimiter::new(3));
    
    // 創建指標收集器
    let metrics = Arc::new(MetricsCollector::new());
    
    let mut handles = Vec::new();
    
    // 生產者任務
    for i in 0..20 {
        let processor = Arc::clone(&processor);
        let rate_limiter = Arc::clone(&rate_limiter);
        let metrics = Arc::clone(&metrics);
        
        let handle = tokio::spawn(async move {
            let start = metrics.record_request();
            
            // 獲取速率限制許可
            let _permit = rate_limiter.acquire().await;
            
            match processor.process(i).await {
                Ok(()) => {
                    metrics.record_completion(start, true);
                }
                Err(e) => {
                    metrics.record_completion(start, false);
                    eprintln!("Failed to process item {}: {}", i, e);
                }
            }
        });
        
        handles.push(handle);
    }
    
    // 動態調整容量
    let rate_limiter_clone = Arc::clone(&rate_limiter);
    let adjust_handle = tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        println!("Increasing capacity to 10");
        rate_limiter_clone.adjust_capacity(10);
    });
    
    // 等待所有任務完成
    for handle in handles {
        handle.await.unwrap();
    }
    adjust_handle.await.unwrap();
    
    // 輸出指標
    println!("Final metrics: {:?}", metrics.get_metrics());
}

#[tokio::main]
async fn main() {
    demonstrate_backpressure().await;
}

總結

本章深入探討了Rust異步編程的各個方面:

  1. async/await語法:直觀的異步編程接口,讓開發者能夠以同步風格編寫異步代碼
  2. Future trait執行:理解Future的工作原理、Pin的內存安全保證以及手動實現複雜Future
  3. 異步運行時選擇:比較Tokio和async-std等主流運行時,瞭解各自的優勢和適用場景
  4. 高性能異步應用構建:任務調度、連接池、背壓控制等高級技術

Rust的異步編程模型提供了零成本抽象的承諾,意味着異步代碼在性能上可以與手動編寫的回調代碼相媲美,同時保持了更高的可讀性和可維護性。

掌握異步編程是構建高性能網絡服務、併發數據處理系統和其他I/O密集型應用的關鍵。通過合理運用本章介紹的技術和模式,你可以構建出既高效又可靠的異步Rust應用。