前言
在物聯網(IoT)蓬勃發展的今天,MQTT 協議已經成為設備通信的事實標準。無論是智能家居、工業自動化還是車聯網,MQTT 都扮演着至關重要的角色。今天,我要為大家介紹一個完全使用 C# 實現的高性能 MQTT 庫
這個庫不僅提供了完整的 MQTT 客户端實現,還包含了一個功能齊全的 Broker 服務器,支持橋接、集羣等企業級特性。
核心特性
協議支持
- MQTT 3.1.1 - 完整支持
- MQTT 5.0 - 完整支持(包括用户屬性、消息過期、主題別名等新特性)
- MQTT-SN - 基於 UDP 的輕量級 MQTT 變體,適合受限設備
- CoAP - 約束應用協議網關支持
性能特性
- 高性能異步實現
- 零不必要的內存分配
- 緩衝區池技術
- 支持 10000+ 併發連接
企業級功能
- Broker 橋接(多 Broker 消息同步)
- 集羣支持(去中心化 P2P 架構)
- 靈活的認證與授權機制
- TLS/SSL 加密傳輸
- 持久會話與離線消息存儲
框架支持
- .NET 6.0
- .NET 8.0
- .NET 10.0
技術實現
本項目採用了大量現代 .NET 高性能技術,下面詳細介紹核心技術點。
內存管理技術
Span<T> 和 Memory<T> - 零拷貝處理
項目使用 ref struct 實現的二進制讀寫器,完全在棧上分配,避免堆內存壓力:
// 零拷貝的二進制讀取器
public ref struct MqttBinaryReader
{
private readonly ReadOnlySpan<byte> _buffer;
private int _position;
// 零拷貝切片操作
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ReadOnlySpan<byte> ReadBytes(int count)
{
var span = _buffer.Slice(_position, count);
_position += count;
return span;
}
}
技術優勢:
ref struct只能在棧上分配,無 GC 壓力ReadOnlySpan<byte>支持零拷貝切片- 避免大量字節數組複製操作
ArrayPool<T> - 緩衝區複用
使用共享內存池減少頻繁的內存分配:
// 從共享池租借緩衝區
var buffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
await stream.ReadAsync(buffer.AsMemory(0, length), cancellationToken);
// 處理數據...
}
finally
{
ArrayPool<byte>.Shared.Return(buffer); // 歸還緩衝區
}
stackalloc - 小緩衝區棧分配
對於小型臨時緩衝區,直接在棧上分配:
// 4 字節的可變長度編碼緩衝區,棧分配
Span<byte> remainingLengthBytes = stackalloc byte[4];
var size = EncodeRemainingLength(length, remainingLengthBytes);
異步編程模型
async/await + ConfigureAwait
所有 IO 操作均採用異步模式,並使用 ConfigureAwait(false) 優化:
public async Task<MqttConnectResult> ConnectAsync(CancellationToken cancellationToken = default)
{
// 建立 TCP 連接
await _tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);
// TLS 握手
if (Options.UseTls)
{
await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false);
}
// 發送 CONNECT 報文
await SendPacketAsync(connectPacket, cancellationToken).ConfigureAwait(false);
}
Channel<T> - 高性能事件隊列
Broker 使用有界通道實現非阻塞的事件分發:
public sealed class MqttBrokerEventDispatcher
{
private readonly Channel<BrokerEvent> _eventChannel;
public MqttBrokerEventDispatcher(int capacity = 10000)
{
// 有界通道,隊列滿時丟棄最舊事件
_eventChannel = Channel.CreateBounded<BrokerEvent>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false
});
}
// 非阻塞事件發送
public void Dispatch<TEventArgs>(BrokerEventType type, TEventArgs args, EventHandler<TEventArgs>? handler)
{
_eventChannel.Writer.TryWrite(new BrokerEvent(type, args, handler));
}
}
TaskCompletionSource - 請求/響應模式
實現 QoS 1/2 的確認等待機制:
private readonly Dictionary<ushort, TaskCompletionSource<object?>> _pendingPackets = new();
private async Task<object?> WaitForPacketAsync(ushort packetId, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
_pendingPackets[packetId] = tcs;
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30)); // 30 秒超時
using var registration = cts.Token.Register(() => tcs.TrySetCanceled());
return await tcs.Task.ConfigureAwait(false);
}
SemaphoreSlim - 發送同步
確保報文發送的串行化:
private readonly SemaphoreSlim _sendLock = new(1, 1);
private async Task SendPacketBytesAsync(byte[] packet, CancellationToken cancellationToken)
{
await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _stream.WriteAsync(packet.AsMemory(), cancellationToken).ConfigureAwait(false);
await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_sendLock.Release();
}
}
編譯器優化
MethodImpl 特性
針對不同場景使用合適的編譯器優化指令:
// 強制內聯 - 用於頻繁調用的短方法
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ushort ReadUInt16()
{
var value = (ushort)((_buffer[_position] << 8) | _buffer[_position + 1]);
_position += 2;
return value;
}
// 最積極的優化 - 用於熱路徑
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
private static async Task<int> DecodeRemainingLengthAsync(Stream stream, CancellationToken ct)
{
// 可變長度解碼實現...
}
// 禁止內聯 - 避免異常處理代碼膨脹熱路徑
[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowIfDisposed()
{
if (_disposed) throw new ObjectDisposedException(nameof(MqttClient));
}
網絡編程
多層傳輸抽象
支持 TCP、UDP 等多種傳輸方式:
public interface ITransportConnection : IAsyncDisposable
{
string ConnectionId { get; }
TransportType TransportType { get; }
EndPoint? RemoteEndPoint { get; }
bool IsConnected { get; }
ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);
ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);
ValueTask FlushAsync(CancellationToken cancellationToken = default);
}
TLS/SSL 支持
使用 SslStream 實現加密傳輸,支持 TLS 1.2 和 TLS 1.3:
var sslOptions = new SslClientAuthenticationOptions
{
TargetHost = Options.Host,
EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13,
ClientCertificates = Options.ClientCertificate != null
? new X509CertificateCollection { Options.ClientCertificate }
: null
};
await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken);
協議序列化
工廠模式 + 延遲初始化
協議處理器採用單例 + 延遲初始化模式:
public static class MqttProtocolHandlerFactory
{
private static readonly Lazy<IMqttProtocolHandler> _v311Handler =
new(() => new V311ProtocolHandler());
private static readonly Lazy<IMqttProtocolHandler> _v500Handler =
new(() => new V500ProtocolHandler());
public static IMqttProtocolHandler GetHandler(MqttProtocolVersion version)
{
return version switch
{
MqttProtocolVersion.V311 => _v311Handler.Value,
MqttProtocolVersion.V500 => _v500Handler.Value,
_ => throw new NotSupportedException()
};
}
}
可變長度整數編碼
MQTT 協議特有的可變長度編碼,1-4 字節可表示 0 到 268,435,455:
public uint ReadVariableByteInteger()
{
uint value = 0;
int multiplier = 1;
byte encodedByte;
do
{
encodedByte = _buffer[_position++];
value += (uint)((encodedByte & 0x7F) * multiplier);
multiplier *= 128;
} while ((encodedByte & 0x80) != 0);
return value;
}
併發數據結構
ConcurrentDictionary - 線程安全集合
用於管理客户端會話和訂閲:
private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new();
private readonly ConcurrentDictionary<string, MqttApplicationMessage> _retainedMessages = new();
public int ConnectedClients => _sessions.Count;
public IEnumerable<MqttClientSession> Sessions => _sessions.Values;
設計模式應用
| 模式 | 應用場景 | 示例 |
|---|---|---|
| 工廠模式 | 協議處理器創建 | MqttProtocolHandlerFactory |
| 策略模式 | 不同協議版本實現 | V311ProtocolHandler / V500ProtocolHandler |
| 建造者模式 | 報文構建 | IPublishPacketBuilder / IConnectPacketBuilder |
| 觀察者模式 | 事件系統 | MessageReceived / ClientConnected |
| 裝飾器模式 | 傳輸層 TLS | SslStream 裝飾 NetworkStream |
| 單例模式 | 協議處理器緩存 | 全局共享的處理器實例 |
技術棧總結
| 類別 | 技術 | 作用 |
|---|---|---|
| 內存管理 | Span<T>, Memory<T>, ref struct, ArrayPool<T>, stackalloc |
零拷貝、棧分配、緩衝區複用 |
| 異步編程 | async/await, Channel<T>, TaskCompletionSource, SemaphoreSlim |
高效併發、非阻塞事件處理 |
| 編譯優化 | AggressiveInlining, AggressiveOptimization, NoInlining |
JIT 編譯器優化提示 |
| 網絡層 | TcpClient, TcpListener, SslStream, 傳輸抽象 |
多協議支持、安全傳輸 |
| 併發集合 | ConcurrentDictionary, ConcurrentQueue |
線程安全的數據結構 |
| 序列化 | 自定義二進制讀寫器、可變長度編碼 | 高效的協議解析 |
性能優化建議
客户端優化
- 選擇合適的 QoS:大多數場景 QoS 1 就足夠了,QoS 2 開銷較大
- 批量發送:如果有大量消息,考慮合併後發送
- 合理設置 KeepAlive:根據網絡環境調整,一般 60 秒即可
- 使用持久會話:如果需要接收離線消息,設置
CleanSession = false
Broker 優化
- 調整最大連接數:根據服務器性能設置
MaxConnections - 限制消息大小:設置
MaxMessageSize防止惡意大消息 - 離線消息限制:設置
MaxOfflineMessagesPerClient防止內存溢出 - 使用集羣:高可用場景使用集羣部署
客户端使用指南
基礎連接
using System.Net.MQTT;
// 配置客户端選項
var options = new MqttClientOptions
{
Host = "localhost",
Port = 1883,
ClientId = "my-iot-device",
CleanSession = true
};
// 創建客户端
using var client = new MqttClient(options);
// 連接到 Broker
var result = await client.ConnectAsync();
if (result.IsSuccess)
{
Console.WriteLine("連接成功!");
}
訂閲主題
// 訂閲單個主題
await client.SubscribeAsync("sensors/temperature", MqttQualityOfService.AtLeastOnce);
// 使用通配符訂閲多個主題
await client.SubscribeAsync("sensors/#", MqttQualityOfService.AtLeastOnce); // 多級通配符
await client.SubscribeAsync("sensors/+/status", MqttQualityOfService.AtMostOnce); // 單級通配符
接收消息
client.MessageReceived += (sender, e) =>
{
Console.WriteLine($"收到消息:");
Console.WriteLine($" 主題: {e.Message.Topic}");
Console.WriteLine($" 內容: {e.Message.PayloadAsString}");
Console.WriteLine($" QoS: {e.Message.QualityOfService}");
};
發佈消息
// 簡單發佈
await client.PublishAsync("sensors/temperature", "25.5");
// 指定 QoS 發佈
await client.PublishAsync("sensors/humidity", "60%", MqttQualityOfService.AtLeastOnce);
// 發佈保留消息
await client.PublishAsync("device/status", "online", MqttQualityOfService.AtLeastOnce, retain: true);
// 使用完整的消息對象
var message = MqttApplicationMessage.Create(
topic: "sensors/data",
payload: "{\"temp\": 25.5, \"humidity\": 60}",
qos: MqttQualityOfService.ExactlyOnce,
retain: false
);
await client.PublishAsync(message);
遺囑消息(Last Will)
遺囑消息會在客户端異常斷開時自動發佈:
var options = new MqttClientOptions
{
Host = "localhost",
ClientId = "my-device",
WillMessage = MqttApplicationMessage.Create(
topic: "devices/my-device/status",
payload: "offline",
qos: MqttQualityOfService.AtLeastOnce,
retain: true
)
};
TLS 加密連接
var options = new MqttClientOptions
{
Host = "secure-broker.example.com",
Port = 8883,
UseTls = true,
// 可選:客户端證書
ClientCertificate = new X509Certificate2("client.pfx", "password")
};
自動重連
var options = new MqttClientOptions
{
Host = "localhost",
AutoReconnect = true,
ReconnectDelayMs = 5000 // 5秒後重連
};
client.Connected += (s, e) => Console.WriteLine("已連接");
client.Disconnected += (s, e) => Console.WriteLine("連接斷開,正在重連...");
完整客户端示例
using System.Net.MQTT;
var options = new MqttClientOptions
{
Host = "localhost",
Port = 1883,
ClientId = $"client-{Guid.NewGuid():N}",
Username = "user",
Password = "password",
CleanSession = true,
KeepAliveSeconds = 60,
AutoReconnect = true
};
using var client = new MqttClient(options);
// 設置事件處理
client.Connected += (s, e) => Console.WriteLine("[事件] 已連接到 Broker");
client.Disconnected += (s, e) => Console.WriteLine("[事件] 連接已斷開");
client.MessageReceived += (s, e) =>
{
Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
};
// 連接
var result = await client.ConnectAsync();
if (!result.IsSuccess)
{
Console.WriteLine($"連接失敗: {result.ReasonCode}");
return;
}
// 訂閲
await client.SubscribeAsync("test/#", MqttQualityOfService.AtLeastOnce);
// 發佈測試消息
for (int i = 0; i < 10; i++)
{
await client.PublishAsync("test/counter", i.ToString());
await Task.Delay(1000);
}
// 斷開連接
await client.DisconnectAsync();
服務器(Broker)使用指南
啓動基礎 Broker
using System.Net.MQTT.Broker;
var options = new MqttBrokerOptions
{
Port = 1883,
AllowAnonymous = true,
EnableRetainedMessages = true,
MaxConnections = 10000
};
using var broker = new MqttBroker(options);
// 啓動服務器
await broker.StartAsync();
Console.WriteLine("MQTT Broker 已啓動,監聽端口 1883");
// 保持運行
await Task.Delay(Timeout.Infinite);
// 停止服務器
await broker.StopAsync();
配置認證
// 使用簡單認證器
broker.Authenticator = new SimpleAuthenticator()
.AddUser("admin", "admin123")
.AddUser("device1", "device1pass")
.AddUser("device2", "device2pass");
var options = new MqttBrokerOptions
{
Port = 1883,
AllowAnonymous = false // 禁用匿名訪問
};
自定義認證器
public class MyAuthenticator : IMqttAuthenticator
{
public Task<MqttAuthenticationResult> AuthenticateAsync(
MqttAuthenticationContext context,
CancellationToken cancellationToken)
{
// 從數據庫驗證用户
if (ValidateFromDatabase(context.Username, context.Password))
{
return Task.FromResult(MqttAuthenticationResult.Success());
}
return Task.FromResult(MqttAuthenticationResult.Failure(
MqttConnectReasonCode.BadUserNameOrPassword));
}
}
broker.Authenticator = new MyAuthenticator();
Broker 事件處理
// 客户端連接事件
broker.ClientConnected += (s, e) =>
{
Console.WriteLine($"[連接] 客户端 {e.Session.ClientId} 已連接");
Console.WriteLine($" 地址: {e.Session.RemoteEndpoint}");
Console.WriteLine($" 當前連接數: {broker.ConnectedClients}");
};
// 客户端斷開事件
broker.ClientDisconnected += (s, e) =>
{
Console.WriteLine($"[斷開] 客户端 {e.Session.ClientId} 已斷開");
};
// 客户端訂閲事件
broker.ClientSubscribed += (s, e) =>
{
Console.WriteLine($"[訂閲] {e.Session.ClientId} 訂閲了 {e.TopicFilter}");
};
// 消息發佈事件
broker.MessagePublished += (s, e) =>
{
Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
Console.WriteLine($" 來自: {e.SourceClientId}");
};
// 消息發佈前攔截(可以阻止消息發佈)
broker.MessagePublishing += (s, e) =>
{
// 檢查敏感主題
if (e.Message.Topic.StartsWith("admin/") && e.SourceClientId != "admin")
{
e.Cancel = true; // 阻止非管理員發佈到 admin 主題
}
};
TLS 配置
var options = new MqttBrokerOptions
{
// 普通端口
Port = 1883,
// TLS 端口
UseTls = true,
TlsPort = 8883,
ServerCertificate = new X509Certificate2("server.pfx", "password"),
RequireClientCertificate = false
};
高級功能
Broker 橋接
橋接功能允許將多個 Broker 連接起來,實現消息的跨 Broker 同步。
var broker = new MqttBroker(new MqttBrokerOptions { Port = 2883 });
// 添加橋接到父 Broker
var bridge = broker.AddBridge(new MqttBridgeOptions
{
Name = "parent-bridge",
RemoteHost = "parent-broker.example.com",
RemotePort = 1883,
ClientId = "bridge-client-1",
// 上行規則:本地消息 -> 遠程 Broker
UpstreamRules =
{
new MqttBridgeRule { LocalTopicFilter = "sensor/#", Enabled = true },
new MqttBridgeRule { LocalTopicFilter = "device/+/data", Enabled = true }
},
// 下行規則:遠程消息 -> 本地
DownstreamRules =
{
new MqttBridgeRule { LocalTopicFilter = "commands/#", Enabled = true },
new MqttBridgeRule { LocalTopicFilter = "config/#", Enabled = true }
}
});
// 橋接事件
bridge.Connected += (s, e) => Console.WriteLine("橋接已連接");
bridge.MessageForwarded += (s, e) =>
{
var direction = e.Direction == BridgeDirection.Upstream ? "上行" : "下行";
Console.WriteLine($"[橋接-{direction}] {e.OriginalTopic}");
};
// 獲取統計信息
var stats = bridge.GetStatistics();
Console.WriteLine($"上行消息: {stats.UpstreamMessageCount}");
Console.WriteLine($"下行消息: {stats.DownstreamMessageCount}");
集羣部署
集羣功能實現了去中心化的 P2P 架構,任何節點都可以獨立運行,支持自動故障檢測和恢復。
var broker = new MqttBroker(new MqttBrokerOptions { Port = 1883 });
// 啓用集羣
broker.EnableCluster(new MqttClusterOptions
{
NodeId = "node-1",
ClusterName = "my-cluster",
ClusterPort = 11883,
SeedNodes = new List<string>
{
"node2.example.com:11883",
"node3.example.com:11883"
},
HeartbeatIntervalMs = 5000,
NodeTimeoutMs = 15000,
EnableDeduplication = true // 防止消息重複
});
// 集羣事件
broker.Cluster!.PeerJoined += (s, e) =>
Console.WriteLine($"節點加入: {e.Peer.NodeId}");
broker.Cluster!.PeerLeft += (s, e) =>
Console.WriteLine($"節點離開: {e.Peer.NodeId}");
broker.Cluster!.MessageForwarded += (s, e) =>
Console.WriteLine($"消息轉發: {e.Topic}");
await broker.StartAsync();
MQTT-SN 網關
MQTT-SN 是基於 UDP 的輕量級協議,適合資源受限的嵌入式設備:
var options = new MqttBrokerOptions
{
Port = 1883,
EnableMqttSn = true,
MqttSnPort = 1885
};
CoAP 網關
CoAP 網關允許 CoAP 設備與 MQTT 生態系統互通:
var options = new MqttBrokerOptions
{
Port = 1883,
EnableCoAP = true,
CoapPort = 5683,
CoapMqttPrefix = "mqtt"
};
// CoAP 客户端可以通過以下方式訪問 MQTT 主題:
// GET coap://broker:5683/mqtt/sensors/temperature
// PUT coap://broker:5683/mqtt/sensors/temperature (發佈消息)
QoS 服務質量
MQTT 定義了三種服務質量級別:
| QoS | 名稱 | 説明 | 適用場景 |
|---|---|---|---|
| 0 | At Most Once | 最多一次,不保證送達 | 傳感器數據,丟失可接受 |
| 1 | At Least Once | 至少一次,可能重複 | 重要數據,可處理重複 |
| 2 | Exactly Once | 恰好一次,保證送達且不重複 | 計費、訂單等關鍵數據 |
// QoS 0 - 最多一次
await client.PublishAsync("sensor/temp", "25", MqttQualityOfService.AtMostOnce);
// QoS 1 - 至少一次
await client.PublishAsync("alert/fire", "detected", MqttQualityOfService.AtLeastOnce);
// QoS 2 - 恰好一次
await client.PublishAsync("order/create", orderJson, MqttQualityOfService.ExactlyOnce);
主題通配符
| 通配符 | 説明 | 示例 |
|---|---|---|
+ |
匹配單個層級 | sensor/+/temp 匹配 sensor/room1/temp |
# |
匹配多個層級 | sensor/# 匹配 sensor/room1/temp 和 sensor/room1/humidity |
// 訂閲所有房間的温度
await client.SubscribeAsync("sensor/+/temperature", MqttQualityOfService.AtLeastOnce);
// 訂閲所有傳感器數據
await client.SubscribeAsync("sensor/#", MqttQualityOfService.AtLeastOnce);
MQTT 5.0 新特性
如果你使用 MQTT 5.0 協議,可以利用以下新特性:
var options = new MqttClientOptions
{
Host = "localhost",
ProtocolVersion = MqttProtocolVersion.V500 // 使用 MQTT 5.0
};
// 創建帶有 MQTT 5.0 屬性的消息
var message = MqttApplicationMessage.CreateWithProperties(
topic: "request/data",
payload: "{\"query\": \"temperature\"}",
qos: MqttQualityOfService.AtLeastOnce,
retain: false,
// MQTT 5.0 特有屬性
responseTopic: "response/client1", // 響應主題
correlationData: Encoding.UTF8.GetBytes("req-123"), // 關聯數據
messageExpiryInterval: 60, // 消息60秒後過期
contentType: "application/json", // 內容類型
userProperties: new List<MqttUserProperty> // 用户自定義屬性
{
new MqttUserProperty("version", "1.0"),
new MqttUserProperty("source", "sensor-hub")
}
);
await client.PublishAsync(message);
總結
是一個功能完整、性能優秀的 .NET MQTT 庫,具有以下優勢:
- 完整的協議支持:MQTT 3.1.1、MQTT 5.0、MQTT-SN、CoAP 全覆蓋
- 高性能設計:異步 IO、零分配、緩衝區池
- 企業級特性:橋接、集羣、認證授權
- 易於使用:簡潔的 API 設計,豐富的示例代碼
- 現代化:支持最新的 .NET 版本
無論你是構建 IoT 平台、實現設備通信,還是搭建消息中間件,這個庫都能滿足你的需求。
相關鏈接
源碼地址:https://github.com/hnlyf1688/mqtt