文章目錄
- 第20章 異步編程
- 20.1 async/await語法
- 異步函數和異步塊
- await關鍵字和Future執行
- 錯誤處理 in Async Code
- 20.2 Future trait執行
- Future trait詳解
- Pin和內存安全
- 手動實現複雜的Future
- 20.3 異步運行時選擇
- Tokio運行時
- Async-std運行時
- 選擇運行時和性能考慮
- 20.4 構建高性能異步應用
- 任務調度和負載均衡
- 連接池和資源管理
- 背壓和流量控制
- 總結
第20章 異步編程
異步編程是現代軟件開發中的重要範式,它允許程序在等待I/O操作(如網絡請求、文件讀寫等)時執行其他任務,從而大大提高程序的併發性能和資源利用率。Rust的異步編程模型以其零成本抽象和高性能而聞名,本章將深入探討Rust異步編程的各個方面。
20.1 async/await語法
Rust通過async和await關鍵字提供了直觀的異步編程語法,讓開發者能夠以近乎同步的方式編寫異步代碼,同時保持高性能。
異步函數和異步塊
在Rust中,任何函數都可以通過添加async關鍵字轉變為異步函數。異步函數在調用時不會立即執行,而是返回一個實現了Future trait的類型。
// 基本的異步函數
async fn simple_async_function() -> u32 {
println!("Starting async function");
42
}
// 異步方法
struct AsyncProcessor;
impl AsyncProcessor {
async fn process_data(&self, data: &str) -> String {
format!("Processed: {}", data)
}
}
// 異步塊
async fn async_block_example() {
let result = async {
println!("Inside async block");
"Hello from async block"
}.await;
println!("{}", result);
}
// 在trait中使用異步方法(需要async-trait crate或使用nightly特性)
// 使用async-trait crate的示例
// #[async_trait]
// trait DataFetcher {
// async fn fetch(&self) -> Result<String, Box<dyn Error>>;
// }
fn demonstrate_async_basics() {
// 異步函數返回Future,需要執行器來運行
let future = simple_async_function();
println!("Async function returned a Future: {:?}", future);
// 在實際應用中,我們會使用運行時來執行這些future
}
await關鍵字和Future執行
await關鍵字用於等待異步操作的完成。它只能在異步上下文中使用,並且會掛起當前任務直到Future完成。
use std::time::Duration;
use tokio::time::sleep;
// 模擬異步操作
async fn simulated_async_work(name: &str, duration_ms: u64) -> String {
println!("[{}] Starting work", name);
sleep(Duration::from_millis(duration_ms)).await;
println!("[{}] Work completed", name);
format!("Result from {}", name)
}
// 順序執行異步操作
async fn sequential_execution() {
println!("=== Sequential Execution ===");
let start = std::time::Instant::now();
let result1 = simulated_async_work("Task 1", 500).await;
println!("Got: {}", result1);
let result2 = simulated_async_work("Task 2", 300).await;
println!("Got: {}", result2);
let result3 = simulated_async_work("Task 3", 200).await;
println!("Got: {}", result3);
let duration = start.elapsed();
println!("Total time: {:?}", duration);
}
// 使用join併發執行
async fn concurrent_execution() {
println!("\n=== Concurrent Execution ===");
let start = std::time::Instant::now();
// 使用tokio::join!併發執行多個future
let (result1, result2, result3) = tokio::join!(
simulated_async_work("Task A", 500),
simulated_async_work("Task B", 300),
simulated_async_work("Task C", 200)
);
println!("Got: {}, {}, {}", result1, result2, result3);
let duration = start.elapsed();
println!("Total time: {:?}", duration);
}
// 使用select處理多個異步操作
async fn selective_execution() {
println!("\n=== Selective Execution ===");
let mut fast_task = simulated_async_work("Fast Task", 100);
let mut slow_task = simulated_async_work("Slow Task", 500);
tokio::select! {
result = &mut fast_task => {
println!("Fast task finished first: {}", result);
}
result = &mut slow_task => {
println!("Slow task finished first: {}", result);
}
}
// 另一個任務可能還在運行,我們可以選擇等待它
// 或者直接丟棄它(會被取消)
}
#[tokio::main]
async fn main() {
sequential_execution().await;
concurrent_execution().await;
selective_execution().await;
}
錯誤處理 in Async Code
異步代碼中的錯誤處理與同步代碼類似,但有一些特殊的考慮因素。
use std::io;
use tokio::fs;
// 異步錯誤處理
async fn read_file_async(filename: &str) -> Result<String, io::Error> {
let content = fs::read_to_string(filename).await?;
Ok(content)
}
// 組合多個異步操作的錯誤處理
async fn process_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
// 使用?在異步函數中傳播錯誤
let file1 = read_file_async("file1.txt").await?;
let file2 = read_file_async("file2.txt").await?;
println!("File1: {}", file1);
println!("File2: {}", file2);
Ok(())
}
// 使用try_join!處理併發操作的錯誤
async fn process_files_concurrently() -> Result<(), Box<dyn std::error::Error>> {
let (result1, result2) = tokio::try_join!(
read_file_async("file1.txt"),
read_file_async("file2.txt")
)?;
println!("File1: {}", result1);
println!("File2: {}", result2);
Ok(())
}
// 超時處理
async fn operation_with_timeout() -> Result<String, Box<dyn std::error::Error>> {
use tokio::time::timeout;
let slow_operation = async {
sleep(Duration::from_secs(10)).await;
"Operation completed".to_string()
};
// 設置5秒超時
match timeout(Duration::from_secs(5), slow_operation).await {
Ok(result) => Ok(result),
Err(_) => Err("Operation timed out".into()),
}
}
// 重試機制
async fn retry_async_operation<F, T, E>(
mut operation: F,
max_retries: usize,
) -> Result<T, E>
where
F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
E: std::fmt::Debug,
{
for attempt in 0..=max_retries {
match operation().await {
Ok(result) => return Ok(result),
Err(e) if attempt == max_retries => return Err(e),
Err(e) => {
println!("Attempt {} failed: {:?}, retrying...", attempt + 1, e);
sleep(Duration::from_millis(100 * 2u64.pow(attempt as u32))).await;
}
}
}
unreachable!()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 錯誤處理示例
match read_file_async("nonexistent.txt").await {
Ok(content) => println!("File content: {}", content),
Err(e) => println!("Error reading file: {}", e),
}
// 超時示例
match operation_with_timeout().await {
Ok(result) => println!("Success: {}", result),
Err(e) => println!("Error: {}", e),
}
// 重試示例
let mut attempts = 0;
let result = retry_async_operation(
|| {
attempts += 1;
Box::pin(async move {
if attempts < 3 {
Err("Temporary failure")
} else {
Ok("Success")
}
})
},
3,
).await;
println!("Retry result: {:?}", result);
Ok(())
}
20.2 Future trait執行
理解Future trait是掌握Rust異步編程的關鍵。Future代表了一個可能還沒有就緒的值,它構成了Rust異步編程的基礎。
Future trait詳解
Future trait定義了異步計算的基本接口。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
// 自定義Future實現
struct SimpleTimer {
duration: Duration,
elapsed: bool,
}
impl SimpleTimer {
fn new(duration: Duration) -> Self {
Self {
duration,
elapsed: false,
}
}
}
impl Future for SimpleTimer {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.elapsed {
Poll::Ready("Timer finished!")
} else {
// 在實際實現中,這裏會設置一個waker來在時間到時喚醒任務
// 這裏簡化實現,直接標記為就緒
self.elapsed = true;
// 克隆waker以便在定時器完成後喚醒任務
let waker = cx.waker().clone();
// 在實際場景中,我們會在這裏啓動一個定時器
// 當定時器到期時調用waker.wake()
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
waker.wake();
});
Poll::Pending
}
}
}
// 組合Future
struct AndThen<F1, F2> {
first: F1,
second: Option<F2>,
}
impl<F1, F2> AndThen<F1, F2> {
fn new(first: F1, second: F2) -> Self {
Self {
first,
second: Some(second),
}
}
}
impl<F1, F2, A, B, E> Future for AndThen<F1, F2>
where
F1: Future<Output = Result<A, E>>,
F2: Future<Output = Result<B, E>>,
{
type Output = Result<B, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(second) = &mut self.second {
// 輪詢第一個future
match Pin::new(&mut self.first).poll(cx) {
Poll::Ready(Ok(_)) => {
// 第一個future成功完成,開始輪詢第二個
let second_future = self.second.take().unwrap();
Pin::new(&mut Some(second_future)).poll(cx)
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
} else {
// 這種情況不應該發生
panic!("AndThen future in invalid state");
}
}
}
// 使用自定義Future
async fn use_custom_future() {
println!("Starting custom timer...");
let result = SimpleTimer::new(Duration::from_secs(1)).await;
println!("Custom timer: {}", result);
}
#[tokio::main]
async fn main() {
use_custom_future().await;
}
Pin和內存安全
Pin是Rust異步編程中的關鍵概念,它確保了Future在內存中的位置不會改變,這對於自引用結構體至關重要。
use std::marker::PhantomPinned;
use std::pin::Pin;
// 自引用結構體示例
struct SelfReferential {
data: String,
pointer_to_data: *const String,
_pin: PhantomPinned, // 標記為!Unpin
}
impl SelfReferential {
fn new(data: String) -> Self {
Self {
data,
pointer_to_data: std::ptr::null(),
_pin: PhantomPinned,
}
}
fn init(self: Pin<&mut Self>) {
let self_ptr: *const String = &self.data;
unsafe {
let this = self.get_unchecked_mut();
this.pointer_to_data = self_ptr;
}
}
fn get_data(self: Pin<&Self>) -> &str {
unsafe {
&*self.pointer_to_data
}
}
}
// 為SelfReferential實現Future
impl Future for SelfReferential {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready("SelfReferential future completed")
}
}
// Pin的實用示例
async fn pin_examples() {
// 棧上固定
let mut value = SelfReferential::new("hello".to_string());
let mut pinned_value = unsafe { Pin::new_unchecked(&mut value) };
pinned_value.as_mut().init();
println!("Data through pointer: {}", pinned_value.as_ref().get_data());
// 堆上固定
let boxed_value = Box::pin(SelfReferential::new("world".to_string()));
// 使用pin!宏(Rust 1.68+)
// let pinned_value = pin!(SelfReferential::new("hello".to_string()));
}
// 處理Unpin類型
async fn unpin_examples() {
// 大多數類型都是Unpin的,可以自由移動
let mut x = 5;
let pinned_x = Pin::new(&mut x);
// Unpin類型可以被安全地移動
let mut y = 10;
let pinned_y = Pin::new(&mut y);
// 即使被pin住,Unpin類型仍然可以移動
std::mem::swap(&mut x, &mut y);
}
#[tokio::main]
async fn main() {
pin_examples().await;
unpin_examples().await;
}
手動實現複雜的Future
對於高級用例,我們可能需要手動實現複雜的Future。
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
// 異步通道實現
struct AsyncChannel<T> {
queue: Arc<Mutex<VecDeque<T>>>,
wakers: Arc<Mutex<Vec<std::task::Waker>>>,
}
impl<T> AsyncChannel<T> {
fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
wakers: Arc::new(Mutex::new(Vec::new())),
}
}
fn send(&self, value: T) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(value);
// 喚醒一個等待的接收者
let mut wakers = self.wakers.lock().unwrap();
if let Some(waker) = wakers.pop() {
waker.wake();
}
}
fn try_receive(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop_front()
}
}
// 接收Future
struct ReceiveFuture<T> {
channel: Arc<AsyncChannel<T>>,
}
impl<T> ReceiveFuture<T> {
fn new(channel: &AsyncChannel<T>) -> Self {
Self {
channel: Arc::clone(&channel.queue),
}
}
}
impl<T> Future for ReceiveFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 嘗試立即接收
if let Some(value) = self.channel.try_receive() {
return Poll::Ready(value);
}
// 沒有數據可用,註冊waker
let mut wakers = self.channel.wakers.lock().unwrap();
wakers.push(cx.waker().clone());
Poll::Pending
}
}
// 使用自定義通道
async fn use_async_channel() {
let channel = Arc::new(AsyncChannel::new());
let channel_clone = Arc::clone(&channel);
// 生產者任務
let producer = tokio::spawn(async move {
for i in 0..5 {
channel_clone.send(i);
println!("Sent: {}", i);
sleep(Duration::from_millis(100)).await;
}
});
// 消費者任務
let consumer = tokio::spawn(async move {
for _ in 0..5 {
let receive_future = ReceiveFuture::new(&channel);
let value = receive_future.await;
println!("Received: {}", value);
}
});
let _ = tokio::join!(producer, consumer);
}
#[tokio::main]
async fn main() {
use_async_channel().await;
}
20.3 異步運行時選擇
Rust的標準庫只提供了Future trait,實際的異步執行需要依賴第三方運行時。目前最流行的運行時是tokio和async-std。
Tokio運行時
Tokio是功能最全面的異步運行時,提供了完整的異步I/O、定時器、同步原語等。
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, Mutex, RwLock};
use std::sync::Arc;
// Tokio TCP服務器示例
async fn tcp_server() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
// 使用共享狀態
let connection_count = Arc::new(Mutex::new(0u32));
loop {
let (socket, addr) = listener.accept().await?;
println!("Accepted connection from: {}", addr);
let count = Arc::clone(&connection_count);
tokio::spawn(async move {
// 更新連接計數
let mut guard = count.lock().await;
*guard += 1;
println!("Active connections: {}", *guard);
drop(guard); // 儘早釋放鎖
if let Err(e) = handle_client(socket).await {
eprintln!("Error handling client: {}", e);
}
// 連接關閉,減少計數
let mut guard = count.lock().await;
*guard -= 1;
println!("Active connections: {}", *guard);
});
}
}
async fn handle_client(mut socket: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await?;
if n == 0 {
return Ok(());
}
// 回顯接收到的數據
socket.write_all(&buf[0..n]).await?;
}
}
// Tokio通道示例
async fn channel_example() {
let (tx, mut rx) = mpsc::channel(32);
// 生產者任務
let producer = tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.expect("Failed to send");
sleep(Duration::from_millis(100)).await;
}
});
// 消費者任務
let consumer = tokio::spawn(async move {
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
});
tokio::join!(producer, consumer);
}
// Tokio同步原語
async fn synchronization_examples() {
// Mutex示例
let mutex = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..10 {
let mutex_clone = Arc::clone(&mutex);
let handle = tokio::spawn(async move {
let mut guard = mutex_clone.lock().await;
*guard += i;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let value = mutex.lock().await;
println!("Final mutex value: {}", *value);
// RwLock示例
let rwlock = Arc::new(RwLock::new(String::new()));
let reader_lock = Arc::clone(&rwlock);
let writer = tokio::spawn(async move {
let mut guard = rwlock.write().await;
*guard = "Hello, World!".to_string();
});
let reader = tokio::spawn(async move {
// 等待寫入完成
sleep(Duration::from_millis(10)).await;
let guard = reader_lock.read().await;
println!("Read: {}", *guard);
});
tokio::join!(writer, reader);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Tokio Channel Example ===");
channel_example().await;
println!("\n=== Tokio Synchronization Example ===");
synchronization_examples().await;
// 注意:TCP服務器示例被註釋掉了,以免阻塞測試
// println!("\n=== Starting TCP Server ===");
// tcp_server().await?;
Ok(())
}
Async-std運行時
Async-std提供了與標準庫類似的API,但所有阻塞操作都是異步的。
// 需要在Cargo.toml中添加async-std依賴
// async-std = "1.12"
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
// Async-std TCP服務器
async fn async_std_server() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8081").await?;
println!("Async-std server listening on 127.0.0.1:8081");
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let mut stream = stream?;
task::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = stream.read(&mut buf).await.unwrap_or(0);
if n == 0 {
break;
}
stream.write_all(&buf[0..n]).await.unwrap();
}
});
}
Ok(())
}
// Async-std文件操作
async fn async_file_operations() -> Result<(), Box<dyn std::error::Error>> {
use async_std::fs;
// 異步寫入文件
let mut file = fs::File::create("test.txt").await?;
file.write_all(b"Hello, async-std!").await?;
// 異步讀取文件
let content = fs::read_to_string("test.txt").await?;
println!("File content: {}", content);
// 清理
fs::remove_file("test.txt").await?;
Ok(())
}
// 手動運行async-std
fn run_async_std_examples() -> Result<(), Box<dyn std::error::Error>> {
// 使用async_std::main屬性宏更簡單
// 這裏展示手動運行的方式
task::block_on(async {
println!("=== Async-std File Operations ===");
async_file_operations().await?;
// 注意:服務器示例被註釋掉了
// println!("\n=== Starting Async-std TCP Server ===");
// async_std_server().await?;
Ok(())
})
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
run_async_std_examples()
}
選擇運行時和性能考慮
在選擇運行時和設計異步應用時,需要考慮多個因素。
use std::time::Instant;
// 性能基準測試
async fn benchmark_async_operations() {
const OPERATIONS: usize = 10000;
// 測試生成大量任務的開銷
let start = Instant::now();
let mut handles = Vec::with_capacity(OPERATIONS);
for i in 0..OPERATIONS {
let handle = tokio::spawn(async move {
i * 2
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let duration = start.elapsed();
println!("Spawned {} tasks in {:?}", OPERATIONS, duration);
// 測試異步通道性能
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
let producer = tokio::spawn(async move {
for i in 0..OPERATIONS {
tx.send(i).await.unwrap();
}
});
let consumer = tokio::spawn(async move {
let mut count = 0;
while let Some(_) = rx.recv().await {
count += 1;
}
count
});
let start = Instant::now();
let (_, received_count) = tokio::join!(producer, consumer);
let duration = start.elapsed();
println!("Processed {} messages in {:?}", received_count, duration);
}
// 運行時配置
fn configure_runtime() {
// Tokio運行時配置
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 配置工作線程數
.enable_io() // 啓用I/O
.enable_time() // 啓用定時器
.thread_name("my-tokio-worker")
.build()
.unwrap();
rt.block_on(async {
println!("Running on custom configured runtime");
benchmark_async_operations().await;
});
}
// 選擇策略
fn runtime_selection_guidance() {
println!("\n=== Runtime Selection Guidance ===");
println!("Choose Tokio when you need:");
println!("- Full-featured async ecosystem");
println!("- High-performance networking");
println!("- Advanced features like tracing, metrics");
println!("- Production-grade reliability");
println!("\nChoose async-std when you need:");
println!("- Standard library-like API");
println!("- Simplicity and ease of use");
println!("- Good performance for general use cases");
println!("\nConsider smol for:");
println!("- Minimal dependencies");
println!("- Embedded systems");
println!("- Custom runtime needs");
}
#[tokio::main]
async fn main() {
println!("=== Runtime Performance Benchmark ===");
benchmark_async_operations().await;
println!("\n=== Custom Runtime Configuration ===");
configure_runtime();
runtime_selection_guidance();
}
20.4 構建高性能異步應用
構建高性能異步應用需要仔細考慮任務調度、資源管理和錯誤處理等方面。
任務調度和負載均衡
use tokio::sync::mpsc;
use std::collections::VecDeque;
// 簡單的負載均衡器
struct LoadBalancer<T> {
workers: Vec<mpsc::Sender<T>>,
next_worker: usize,
}
impl<T> LoadBalancer<T>
where
T: Send + 'static,
{
fn new(worker_count: usize, buffer_size: usize) -> Self {
let mut workers = Vec::with_capacity(worker_count);
for i in 0..worker_count {
let (tx, mut rx) = mpsc::channel(buffer_size);
// 啓動工作線程
tokio::spawn(async move {
while let Some(task) = rx.recv().await {
println!("Worker {} processing task: {:?}", i, task);
// 模擬工作負載
sleep(Duration::from_millis(100)).await;
}
});
workers.push(tx);
}
Self {
workers,
next_worker: 0,
}
}
async fn dispatch(&mut self, task: T) -> Result<(), mpsc::error::SendError<T>> {
let worker_index = self.next_worker;
self.next_worker = (self.next_worker + 1) % self.workers.len();
self.workers[worker_index].send(task).await
}
}
// 工作竊取調度器
struct WorkStealingScheduler<T> {
queues: Vec<Mutex<VecDeque<T>>>,
}
impl<T> WorkStealingScheduler<T>
where
T: Send + 'static,
{
fn new(worker_count: usize) -> Self {
let mut queues = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
queues.push(Mutex::new(VecDeque::new()));
}
Self { queues }
}
async fn push(&self, worker_id: usize, task: T) {
let mut queue = self.queues[worker_id].lock().await;
queue.push_back(task);
}
async fn pop(&self, worker_id: usize) -> Option<T> {
// 首先嚐試自己的隊列
if let Some(task) = self.queues[worker_id].lock().await.pop_front() {
return Some(task);
}
// 工作竊取:嘗試其他隊列
for i in 0..self.queues.len() {
if i != worker_id {
if let Some(task) = self.queues[i].lock().await.pop_back() { // 從尾部竊取
return Some(task);
}
}
}
None
}
}
async fn demonstrate_scheduling() {
println!("=== Load Balancer Example ===");
let mut load_balancer = LoadBalancer::new(4, 10);
for i in 0..20 {
load_balancer.dispatch(format!("Task {}", i)).await.unwrap();
}
// 等待任務完成
sleep(Duration::from_secs(3)).await;
println!("\n=== Work Stealing Example ===");
let scheduler = Arc::new(WorkStealingScheduler::new(4));
// 啓動工作線程
let mut handles = Vec::new();
for worker_id in 0..4 {
let scheduler = Arc::clone(&scheduler);
let handle = tokio::spawn(async move {
for task_id in 0..5 {
scheduler.push(worker_id, format!("Worker {} Task {}", worker_id, task_id)).await;
}
// 處理任務
while let Some(task) = scheduler.pop(worker_id).await {
println!("Worker {} got: {}", worker_id, task);
sleep(Duration::from_millis(50)).await;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::main]
async fn main() {
demonstrate_scheduling().await;
}
連接池和資源管理
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;
// 簡單的連接池
struct ConnectionPool<T> {
connections: Mutex<Vec<T>>,
semaphore: Semaphore,
max_size: usize,
current_size: AtomicUsize,
}
impl<T> ConnectionPool<T>
where
T: Default + Send + 'static,
{
fn new(max_size: usize) -> Self {
Self {
connections: Mutex::new(Vec::new()),
semaphore: Semaphore::new(max_size),
max_size,
current_size: AtomicUsize::new(0),
}
}
async fn get_connection(&self) -> PooledConnection<T> {
// 等待可用許可
let permit = self.semaphore.acquire().await.unwrap();
// 嘗試從池中獲取連接
if let Some(connection) = self.connections.lock().await.pop() {
return PooledConnection {
connection: Some(connection),
pool: self,
_permit: permit,
};
}
// 池為空,創建新連接
let current = self.current_size.fetch_add(1, Ordering::SeqCst);
if current < self.max_size {
PooledConnection {
connection: Some(T::default()),
pool: self,
_permit: permit,
}
} else {
self.current_size.fetch_sub(1, Ordering::SeqCst);
panic!("Connection pool exhausted");
}
}
fn return_connection(&self, connection: T) {
self.connections.lock().blocking_lock().push(connection);
}
}
// 池化連接
struct PooledConnection<'a, T> {
connection: Option<T>,
pool: &'a ConnectionPool<T>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl<'a, T> std::ops::Deref for PooledConnection<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.connection.as_ref().unwrap()
}
}
impl<'a, T> std::ops::DerefMut for PooledConnection<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.connection.as_mut().unwrap()
}
}
impl<'a, T> Drop for PooledConnection<'a, T> {
fn drop(&mut self) {
if let Some(connection) = self.connection.take() {
self.pool.return_connection(connection);
}
}
}
// 模擬數據庫連接
#[derive(Default, Debug)]
struct DatabaseConnection {
id: u32,
}
static CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);
impl Default for DatabaseConnection {
fn default() -> Self {
let id = CONNECTION_ID.fetch_add(1, Ordering::SeqCst) as u32;
Self { id }
}
}
impl DatabaseConnection {
async fn query(&self, sql: &str) -> String {
sleep(Duration::from_millis(50)).await;
format!("Result from connection {}: {}", self.id, sql)
}
}
async fn demonstrate_connection_pool() {
println!("=== Connection Pool Example ===");
let pool = Arc::new(ConnectionPool::<DatabaseConnection>::new(3));
let mut handles = Vec::new();
// 啓動多個任務同時使用連接池
for i in 0..10 {
let pool = Arc::clone(&pool);
let handle = tokio::spawn(async move {
let connection = pool.get_connection().await;
let result = connection.query(&format!("SELECT * FROM table_{}", i)).await;
println!("Task {}: {}", i, result);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("All tasks completed");
}
#[tokio::main]
async fn main() {
demonstrate_connection_pool().await;
}
背壓和流量控制
use tokio::sync::broadcast;
// 帶背壓的生產者-消費者模式
struct BoundedProcessor<T> {
processor_tx: mpsc::Sender<T>,
capacity: usize,
}
impl<T> BoundedProcessor<T>
where
T: Send + 'static,
{
fn new<F>(capacity: usize, processor: F) -> Self
where
F: Fn(T) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(capacity);
// 啓動處理器任務
tokio::spawn(async move {
while let Some(item) = rx.recv().await {
processor(item).await;
}
});
Self {
processor_tx: tx,
capacity,
}
}
async fn process(&self, item: T) -> Result<(), mpsc::error::SendError<T>> {
self.processor_tx.send(item).await
}
fn capacity(&self) -> usize {
self.capacity
}
}
// 自適應速率限制器
struct AdaptiveRateLimiter {
semaphore: Semaphore,
max_permits: usize,
current_permits: AtomicUsize,
}
impl AdaptiveRateLimiter {
fn new(initial_permits: usize) -> Self {
Self {
semaphore: Semaphore::new(initial_permits),
max_permits: initial_permits,
current_permits: AtomicUsize::new(initial_permits),
}
}
async fn acquire(&self) -> tokio::sync::SemaphorePermit {
self.semaphore.acquire().await.unwrap()
}
fn adjust_capacity(&self, new_capacity: usize) {
let current = self.current_permits.load(Ordering::SeqCst);
if new_capacity > current {
// 增加許可
let additional = new_capacity - current;
self.semaphore.add_permits(additional);
self.current_permits.store(new_capacity, Ordering::SeqCst);
} else if new_capacity < current {
// 減少許可(更復雜,通常需要更精細的實現)
println!("Reducing capacity from {} to {}", current, new_capacity);
}
self.max_permits = new_capacity;
}
}
// 監控和指標收集
struct MetricsCollector {
request_count: AtomicUsize,
error_count: AtomicUsize,
latency_sum: AtomicUsize,
request_times: Mutex<Vec<Instant>>,
}
impl MetricsCollector {
fn new() -> Self {
Self {
request_count: AtomicUsize::new(0),
error_count: AtomicUsize::new(0),
latency_sum: AtomicUsize::new(0),
request_times: Mutex::new(Vec::new()),
}
}
fn record_request(&self) -> Instant {
self.request_count.fetch_add(1, Ordering::SeqCst);
Instant::now()
}
fn record_completion(&self, start: Instant, success: bool) {
let latency = start.elapsed().as_millis() as usize;
self.latency_sum.fetch_add(latency, Ordering::SeqCst);
if !success {
self.error_count.fetch_add(1, Ordering::SeqCst);
}
}
fn get_metrics(&self) -> Metrics {
let total_requests = self.request_count.load(Ordering::SeqCst);
let errors = self.error_count.load(Ordering::SeqCst);
let total_latency = self.latency_sum.load(Ordering::SeqCst);
let avg_latency = if total_requests > 0 {
total_latency / total_requests
} else {
0
};
Metrics {
total_requests,
errors,
avg_latency,
error_rate: if total_requests > 0 {
errors as f64 / total_requests as f64
} else {
0.0
},
}
}
}
#[derive(Debug)]
struct Metrics {
total_requests: usize,
errors: usize,
avg_latency: usize,
error_rate: f64,
}
async fn demonstrate_backpressure() {
println!("=== Backpressure and Flow Control ===");
// 創建有界處理器
let processor = Arc::new(BoundedProcessor::new(5, |item: usize| {
Box::pin(async move {
println!("Processing item: {}", item);
sleep(Duration::from_millis(100)).await;
})
}));
// 創建速率限制器
let rate_limiter = Arc::new(AdaptiveRateLimiter::new(3));
// 創建指標收集器
let metrics = Arc::new(MetricsCollector::new());
let mut handles = Vec::new();
// 生產者任務
for i in 0..20 {
let processor = Arc::clone(&processor);
let rate_limiter = Arc::clone(&rate_limiter);
let metrics = Arc::clone(&metrics);
let handle = tokio::spawn(async move {
let start = metrics.record_request();
// 獲取速率限制許可
let _permit = rate_limiter.acquire().await;
match processor.process(i).await {
Ok(()) => {
metrics.record_completion(start, true);
}
Err(e) => {
metrics.record_completion(start, false);
eprintln!("Failed to process item {}: {}", i, e);
}
}
});
handles.push(handle);
}
// 動態調整容量
let rate_limiter_clone = Arc::clone(&rate_limiter);
let adjust_handle = tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
println!("Increasing capacity to 10");
rate_limiter_clone.adjust_capacity(10);
});
// 等待所有任務完成
for handle in handles {
handle.await.unwrap();
}
adjust_handle.await.unwrap();
// 輸出指標
println!("Final metrics: {:?}", metrics.get_metrics());
}
#[tokio::main]
async fn main() {
demonstrate_backpressure().await;
}
總結
本章深入探討了Rust異步編程的各個方面:
- async/await語法:直觀的異步編程接口,讓開發者能夠以同步風格編寫異步代碼
- Future trait執行:理解Future的工作原理、Pin的內存安全保證以及手動實現複雜Future
- 異步運行時選擇:比較Tokio和async-std等主流運行時,瞭解各自的優勢和適用場景
- 高性能異步應用構建:任務調度、連接池、背壓控制等高級技術
Rust的異步編程模型提供了零成本抽象的承諾,意味着異步代碼在性能上可以與手動編寫的回調代碼相媲美,同時保持了更高的可讀性和可維護性。
掌握異步編程是構建高性能網絡服務、併發數據處理系統和其他I/O密集型應用的關鍵。通過合理運用本章介紹的技術和模式,你可以構建出既高效又可靠的異步Rust應用。