博客 / 詳情

返回

一個高性能的 .NET MQTT 客户端與服務器庫

前言

在物聯網(IoT)蓬勃發展的今天,MQTT 協議已經成為設備通信的事實標準。無論是智能家居、工業自動化還是車聯網,MQTT 都扮演着至關重要的角色。今天,我要為大家介紹一個完全使用 C# 實現的高性能 MQTT 庫

這個庫不僅提供了完整的 MQTT 客户端實現,還包含了一個功能齊全的 Broker 服務器,支持橋接、集羣等企業級特性。

核心特性

協議支持

  • MQTT 3.1.1 - 完整支持
  • MQTT 5.0 - 完整支持(包括用户屬性、消息過期、主題別名等新特性)
  • MQTT-SN - 基於 UDP 的輕量級 MQTT 變體,適合受限設備
  • CoAP - 約束應用協議網關支持

性能特性

  • 高性能異步實現
  • 零不必要的內存分配
  • 緩衝區池技術
  • 支持 10000+ 併發連接

企業級功能

  • Broker 橋接(多 Broker 消息同步)
  • 集羣支持(去中心化 P2P 架構)
  • 靈活的認證與授權機制
  • TLS/SSL 加密傳輸
  • 持久會話與離線消息存儲

框架支持

  • .NET 6.0
  • .NET 8.0
  • .NET 10.0

技術實現

本項目採用了大量現代 .NET 高性能技術,下面詳細介紹核心技術點。

內存管理技術

Span<T> 和 Memory<T> - 零拷貝處理

項目使用 ref struct 實現的二進制讀寫器,完全在棧上分配,避免堆內存壓力:

// 零拷貝的二進制讀取器
public ref struct MqttBinaryReader
{
    private readonly ReadOnlySpan<byte> _buffer;
    private int _position;

    // 零拷貝切片操作
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public ReadOnlySpan<byte> ReadBytes(int count)
    {
        var span = _buffer.Slice(_position, count);
        _position += count;
        return span;
    }
}

技術優勢

  • ref struct 只能在棧上分配,無 GC 壓力
  • ReadOnlySpan<byte> 支持零拷貝切片
  • 避免大量字節數組複製操作

ArrayPool<T> - 緩衝區複用

使用共享內存池減少頻繁的內存分配:

// 從共享池租借緩衝區
var buffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
    await stream.ReadAsync(buffer.AsMemory(0, length), cancellationToken);
    // 處理數據...
}
finally
{
    ArrayPool<byte>.Shared.Return(buffer);  // 歸還緩衝區
}

stackalloc - 小緩衝區棧分配

對於小型臨時緩衝區,直接在棧上分配:

// 4 字節的可變長度編碼緩衝區,棧分配
Span<byte> remainingLengthBytes = stackalloc byte[4];
var size = EncodeRemainingLength(length, remainingLengthBytes);

異步編程模型

async/await + ConfigureAwait

所有 IO 操作均採用異步模式,並使用 ConfigureAwait(false) 優化:

public async Task<MqttConnectResult> ConnectAsync(CancellationToken cancellationToken = default)
{
    // 建立 TCP 連接
    await _tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false);

    // TLS 握手
    if (Options.UseTls)
    {
        await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false);
    }

    // 發送 CONNECT 報文
    await SendPacketAsync(connectPacket, cancellationToken).ConfigureAwait(false);
}

Channel<T> - 高性能事件隊列

Broker 使用有界通道實現非阻塞的事件分發:

public sealed class MqttBrokerEventDispatcher
{
    private readonly Channel<BrokerEvent> _eventChannel;

    public MqttBrokerEventDispatcher(int capacity = 10000)
    {
        // 有界通道,隊列滿時丟棄最舊事件
        _eventChannel = Channel.CreateBounded<BrokerEvent>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.DropOldest,
            SingleReader = true,
            SingleWriter = false
        });
    }

    // 非阻塞事件發送
    public void Dispatch<TEventArgs>(BrokerEventType type, TEventArgs args, EventHandler<TEventArgs>? handler)
    {
        _eventChannel.Writer.TryWrite(new BrokerEvent(type, args, handler));
    }
}

TaskCompletionSource - 請求/響應模式

實現 QoS 1/2 的確認等待機制:

private readonly Dictionary<ushort, TaskCompletionSource<object?>> _pendingPackets = new();

private async Task<object?> WaitForPacketAsync(ushort packetId, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
    _pendingPackets[packetId] = tcs;

    using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    cts.CancelAfter(TimeSpan.FromSeconds(30));  // 30 秒超時

    using var registration = cts.Token.Register(() => tcs.TrySetCanceled());
    return await tcs.Task.ConfigureAwait(false);
}

SemaphoreSlim - 發送同步

確保報文發送的串行化:

private readonly SemaphoreSlim _sendLock = new(1, 1);

private async Task SendPacketBytesAsync(byte[] packet, CancellationToken cancellationToken)
{
    await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false);
    try
    {
        await _stream.WriteAsync(packet.AsMemory(), cancellationToken).ConfigureAwait(false);
        await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
    }
    finally
    {
        _sendLock.Release();
    }
}

編譯器優化

MethodImpl 特性

針對不同場景使用合適的編譯器優化指令:

// 強制內聯 - 用於頻繁調用的短方法
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ushort ReadUInt16()
{
    var value = (ushort)((_buffer[_position] << 8) | _buffer[_position + 1]);
    _position += 2;
    return value;
}

// 最積極的優化 - 用於熱路徑
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
private static async Task<int> DecodeRemainingLengthAsync(Stream stream, CancellationToken ct)
{
    // 可變長度解碼實現...
}

// 禁止內聯 - 避免異常處理代碼膨脹熱路徑
[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowIfDisposed()
{
    if (_disposed) throw new ObjectDisposedException(nameof(MqttClient));
}

網絡編程

多層傳輸抽象

支持 TCP、UDP 等多種傳輸方式:

public interface ITransportConnection : IAsyncDisposable
{
    string ConnectionId { get; }
    TransportType TransportType { get; }
    EndPoint? RemoteEndPoint { get; }
    bool IsConnected { get; }

    ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);
    ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);
    ValueTask FlushAsync(CancellationToken cancellationToken = default);
}

TLS/SSL 支持

使用 SslStream 實現加密傳輸,支持 TLS 1.2 和 TLS 1.3:

var sslOptions = new SslClientAuthenticationOptions
{
    TargetHost = Options.Host,
    EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13,
    ClientCertificates = Options.ClientCertificate != null
        ? new X509CertificateCollection { Options.ClientCertificate }
        : null
};

await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken);

協議序列化

工廠模式 + 延遲初始化

協議處理器採用單例 + 延遲初始化模式:

public static class MqttProtocolHandlerFactory
{
    private static readonly Lazy<IMqttProtocolHandler> _v311Handler =
        new(() => new V311ProtocolHandler());
    private static readonly Lazy<IMqttProtocolHandler> _v500Handler =
        new(() => new V500ProtocolHandler());

    public static IMqttProtocolHandler GetHandler(MqttProtocolVersion version)
    {
        return version switch
        {
            MqttProtocolVersion.V311 => _v311Handler.Value,
            MqttProtocolVersion.V500 => _v500Handler.Value,
            _ => throw new NotSupportedException()
        };
    }
}

可變長度整數編碼

MQTT 協議特有的可變長度編碼,1-4 字節可表示 0 到 268,435,455:

public uint ReadVariableByteInteger()
{
    uint value = 0;
    int multiplier = 1;
    byte encodedByte;

    do
    {
        encodedByte = _buffer[_position++];
        value += (uint)((encodedByte & 0x7F) * multiplier);
        multiplier *= 128;
    } while ((encodedByte & 0x80) != 0);

    return value;
}

併發數據結構

ConcurrentDictionary - 線程安全集合

用於管理客户端會話和訂閲:

private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new();
private readonly ConcurrentDictionary<string, MqttApplicationMessage> _retainedMessages = new();

public int ConnectedClients => _sessions.Count;
public IEnumerable<MqttClientSession> Sessions => _sessions.Values;

設計模式應用

模式 應用場景 示例
工廠模式 協議處理器創建 MqttProtocolHandlerFactory
策略模式 不同協議版本實現 V311ProtocolHandler / V500ProtocolHandler
建造者模式 報文構建 IPublishPacketBuilder / IConnectPacketBuilder
觀察者模式 事件系統 MessageReceived / ClientConnected
裝飾器模式 傳輸層 TLS SslStream 裝飾 NetworkStream
單例模式 協議處理器緩存 全局共享的處理器實例

技術棧總結

類別 技術 作用
內存管理 Span<T>, Memory<T>, ref struct, ArrayPool<T>, stackalloc 零拷貝、棧分配、緩衝區複用
異步編程 async/await, Channel<T>, TaskCompletionSource, SemaphoreSlim 高效併發、非阻塞事件處理
編譯優化 AggressiveInlining, AggressiveOptimization, NoInlining JIT 編譯器優化提示
網絡層 TcpClient, TcpListener, SslStream, 傳輸抽象 多協議支持、安全傳輸
併發集合 ConcurrentDictionary, ConcurrentQueue 線程安全的數據結構
序列化 自定義二進制讀寫器、可變長度編碼 高效的協議解析

性能優化建議

客户端優化

  1. 選擇合適的 QoS:大多數場景 QoS 1 就足夠了,QoS 2 開銷較大
  2. 批量發送:如果有大量消息,考慮合併後發送
  3. 合理設置 KeepAlive:根據網絡環境調整,一般 60 秒即可
  4. 使用持久會話:如果需要接收離線消息,設置 CleanSession = false

Broker 優化

  1. 調整最大連接數:根據服務器性能設置 MaxConnections
  2. 限制消息大小:設置 MaxMessageSize 防止惡意大消息
  3. 離線消息限制:設置 MaxOfflineMessagesPerClient 防止內存溢出
  4. 使用集羣:高可用場景使用集羣部署

客户端使用指南

基礎連接

using System.Net.MQTT;

// 配置客户端選項
var options = new MqttClientOptions
{
    Host = "localhost",
    Port = 1883,
    ClientId = "my-iot-device",
    CleanSession = true
};

// 創建客户端
using var client = new MqttClient(options);

// 連接到 Broker
var result = await client.ConnectAsync();

if (result.IsSuccess)
{
    Console.WriteLine("連接成功!");
}

訂閲主題

// 訂閲單個主題
await client.SubscribeAsync("sensors/temperature", MqttQualityOfService.AtLeastOnce);

// 使用通配符訂閲多個主題
await client.SubscribeAsync("sensors/#", MqttQualityOfService.AtLeastOnce);  // 多級通配符
await client.SubscribeAsync("sensors/+/status", MqttQualityOfService.AtMostOnce);  // 單級通配符

接收消息

client.MessageReceived += (sender, e) =>
{
    Console.WriteLine($"收到消息:");
    Console.WriteLine($"  主題: {e.Message.Topic}");
    Console.WriteLine($"  內容: {e.Message.PayloadAsString}");
    Console.WriteLine($"  QoS: {e.Message.QualityOfService}");
};

發佈消息

// 簡單發佈
await client.PublishAsync("sensors/temperature", "25.5");

// 指定 QoS 發佈
await client.PublishAsync("sensors/humidity", "60%", MqttQualityOfService.AtLeastOnce);

// 發佈保留消息
await client.PublishAsync("device/status", "online", MqttQualityOfService.AtLeastOnce, retain: true);

// 使用完整的消息對象
var message = MqttApplicationMessage.Create(
    topic: "sensors/data",
    payload: "{\"temp\": 25.5, \"humidity\": 60}",
    qos: MqttQualityOfService.ExactlyOnce,
    retain: false
);
await client.PublishAsync(message);

遺囑消息(Last Will)

遺囑消息會在客户端異常斷開時自動發佈:

var options = new MqttClientOptions
{
    Host = "localhost",
    ClientId = "my-device",
    WillMessage = MqttApplicationMessage.Create(
        topic: "devices/my-device/status",
        payload: "offline",
        qos: MqttQualityOfService.AtLeastOnce,
        retain: true
    )
};

TLS 加密連接

var options = new MqttClientOptions
{
    Host = "secure-broker.example.com",
    Port = 8883,
    UseTls = true,
    // 可選:客户端證書
    ClientCertificate = new X509Certificate2("client.pfx", "password")
};

自動重連

var options = new MqttClientOptions
{
    Host = "localhost",
    AutoReconnect = true,
    ReconnectDelayMs = 5000  // 5秒後重連
};

client.Connected += (s, e) => Console.WriteLine("已連接");
client.Disconnected += (s, e) => Console.WriteLine("連接斷開,正在重連...");

完整客户端示例

using System.Net.MQTT;

var options = new MqttClientOptions
{
    Host = "localhost",
    Port = 1883,
    ClientId = $"client-{Guid.NewGuid():N}",
    Username = "user",
    Password = "password",
    CleanSession = true,
    KeepAliveSeconds = 60,
    AutoReconnect = true
};

using var client = new MqttClient(options);

// 設置事件處理
client.Connected += (s, e) => Console.WriteLine("[事件] 已連接到 Broker");
client.Disconnected += (s, e) => Console.WriteLine("[事件] 連接已斷開");
client.MessageReceived += (s, e) =>
{
    Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
};

// 連接
var result = await client.ConnectAsync();
if (!result.IsSuccess)
{
    Console.WriteLine($"連接失敗: {result.ReasonCode}");
    return;
}

// 訂閲
await client.SubscribeAsync("test/#", MqttQualityOfService.AtLeastOnce);

// 發佈測試消息
for (int i = 0; i < 10; i++)
{
    await client.PublishAsync("test/counter", i.ToString());
    await Task.Delay(1000);
}

// 斷開連接
await client.DisconnectAsync();

服務器(Broker)使用指南

啓動基礎 Broker

using System.Net.MQTT.Broker;

var options = new MqttBrokerOptions
{
    Port = 1883,
    AllowAnonymous = true,
    EnableRetainedMessages = true,
    MaxConnections = 10000
};

using var broker = new MqttBroker(options);

// 啓動服務器
await broker.StartAsync();
Console.WriteLine("MQTT Broker 已啓動,監聽端口 1883");

// 保持運行
await Task.Delay(Timeout.Infinite);

// 停止服務器
await broker.StopAsync();

配置認證

// 使用簡單認證器
broker.Authenticator = new SimpleAuthenticator()
    .AddUser("admin", "admin123")
    .AddUser("device1", "device1pass")
    .AddUser("device2", "device2pass");

var options = new MqttBrokerOptions
{
    Port = 1883,
    AllowAnonymous = false  // 禁用匿名訪問
};

自定義認證器

public class MyAuthenticator : IMqttAuthenticator
{
    public Task<MqttAuthenticationResult> AuthenticateAsync(
        MqttAuthenticationContext context,
        CancellationToken cancellationToken)
    {
        // 從數據庫驗證用户
        if (ValidateFromDatabase(context.Username, context.Password))
        {
            return Task.FromResult(MqttAuthenticationResult.Success());
        }

        return Task.FromResult(MqttAuthenticationResult.Failure(
            MqttConnectReasonCode.BadUserNameOrPassword));
    }
}

broker.Authenticator = new MyAuthenticator();

Broker 事件處理

// 客户端連接事件
broker.ClientConnected += (s, e) =>
{
    Console.WriteLine($"[連接] 客户端 {e.Session.ClientId} 已連接");
    Console.WriteLine($"  地址: {e.Session.RemoteEndpoint}");
    Console.WriteLine($"  當前連接數: {broker.ConnectedClients}");
};

// 客户端斷開事件
broker.ClientDisconnected += (s, e) =>
{
    Console.WriteLine($"[斷開] 客户端 {e.Session.ClientId} 已斷開");
};

// 客户端訂閲事件
broker.ClientSubscribed += (s, e) =>
{
    Console.WriteLine($"[訂閲] {e.Session.ClientId} 訂閲了 {e.TopicFilter}");
};

// 消息發佈事件
broker.MessagePublished += (s, e) =>
{
    Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}");
    Console.WriteLine($"  來自: {e.SourceClientId}");
};

// 消息發佈前攔截(可以阻止消息發佈)
broker.MessagePublishing += (s, e) =>
{
    // 檢查敏感主題
    if (e.Message.Topic.StartsWith("admin/") && e.SourceClientId != "admin")
    {
        e.Cancel = true;  // 阻止非管理員發佈到 admin 主題
    }
};

TLS 配置

var options = new MqttBrokerOptions
{
    // 普通端口
    Port = 1883,

    // TLS 端口
    UseTls = true,
    TlsPort = 8883,
    ServerCertificate = new X509Certificate2("server.pfx", "password"),
    RequireClientCertificate = false
};

高級功能

Broker 橋接

橋接功能允許將多個 Broker 連接起來,實現消息的跨 Broker 同步。

var broker = new MqttBroker(new MqttBrokerOptions { Port = 2883 });

// 添加橋接到父 Broker
var bridge = broker.AddBridge(new MqttBridgeOptions
{
    Name = "parent-bridge",
    RemoteHost = "parent-broker.example.com",
    RemotePort = 1883,
    ClientId = "bridge-client-1",

    // 上行規則:本地消息 -> 遠程 Broker
    UpstreamRules =
    {
        new MqttBridgeRule { LocalTopicFilter = "sensor/#", Enabled = true },
        new MqttBridgeRule { LocalTopicFilter = "device/+/data", Enabled = true }
    },

    // 下行規則:遠程消息 -> 本地
    DownstreamRules =
    {
        new MqttBridgeRule { LocalTopicFilter = "commands/#", Enabled = true },
        new MqttBridgeRule { LocalTopicFilter = "config/#", Enabled = true }
    }
});

// 橋接事件
bridge.Connected += (s, e) => Console.WriteLine("橋接已連接");
bridge.MessageForwarded += (s, e) =>
{
    var direction = e.Direction == BridgeDirection.Upstream ? "上行" : "下行";
    Console.WriteLine($"[橋接-{direction}] {e.OriginalTopic}");
};

// 獲取統計信息
var stats = bridge.GetStatistics();
Console.WriteLine($"上行消息: {stats.UpstreamMessageCount}");
Console.WriteLine($"下行消息: {stats.DownstreamMessageCount}");

集羣部署

集羣功能實現了去中心化的 P2P 架構,任何節點都可以獨立運行,支持自動故障檢測和恢復。

var broker = new MqttBroker(new MqttBrokerOptions { Port = 1883 });

// 啓用集羣
broker.EnableCluster(new MqttClusterOptions
{
    NodeId = "node-1",
    ClusterName = "my-cluster",
    ClusterPort = 11883,
    SeedNodes = new List<string>
    {
        "node2.example.com:11883",
        "node3.example.com:11883"
    },
    HeartbeatIntervalMs = 5000,
    NodeTimeoutMs = 15000,
    EnableDeduplication = true  // 防止消息重複
});

// 集羣事件
broker.Cluster!.PeerJoined += (s, e) =>
    Console.WriteLine($"節點加入: {e.Peer.NodeId}");

broker.Cluster!.PeerLeft += (s, e) =>
    Console.WriteLine($"節點離開: {e.Peer.NodeId}");

broker.Cluster!.MessageForwarded += (s, e) =>
    Console.WriteLine($"消息轉發: {e.Topic}");

await broker.StartAsync();

MQTT-SN 網關

MQTT-SN 是基於 UDP 的輕量級協議,適合資源受限的嵌入式設備:

var options = new MqttBrokerOptions
{
    Port = 1883,
    EnableMqttSn = true,
    MqttSnPort = 1885
};

CoAP 網關

CoAP 網關允許 CoAP 設備與 MQTT 生態系統互通:

var options = new MqttBrokerOptions
{
    Port = 1883,
    EnableCoAP = true,
    CoapPort = 5683,
    CoapMqttPrefix = "mqtt"
};

// CoAP 客户端可以通過以下方式訪問 MQTT 主題:
// GET coap://broker:5683/mqtt/sensors/temperature
// PUT coap://broker:5683/mqtt/sensors/temperature (發佈消息)

QoS 服務質量

MQTT 定義了三種服務質量級別:

QoS 名稱 説明 適用場景
0 At Most Once 最多一次,不保證送達 傳感器數據,丟失可接受
1 At Least Once 至少一次,可能重複 重要數據,可處理重複
2 Exactly Once 恰好一次,保證送達且不重複 計費、訂單等關鍵數據
// QoS 0 - 最多一次
await client.PublishAsync("sensor/temp", "25", MqttQualityOfService.AtMostOnce);

// QoS 1 - 至少一次
await client.PublishAsync("alert/fire", "detected", MqttQualityOfService.AtLeastOnce);

// QoS 2 - 恰好一次
await client.PublishAsync("order/create", orderJson, MqttQualityOfService.ExactlyOnce);

主題通配符

通配符 説明 示例
+ 匹配單個層級 sensor/+/temp 匹配 sensor/room1/temp
# 匹配多個層級 sensor/# 匹配 sensor/room1/tempsensor/room1/humidity
// 訂閲所有房間的温度
await client.SubscribeAsync("sensor/+/temperature", MqttQualityOfService.AtLeastOnce);

// 訂閲所有傳感器數據
await client.SubscribeAsync("sensor/#", MqttQualityOfService.AtLeastOnce);

MQTT 5.0 新特性

如果你使用 MQTT 5.0 協議,可以利用以下新特性:

var options = new MqttClientOptions
{
    Host = "localhost",
    ProtocolVersion = MqttProtocolVersion.V500  // 使用 MQTT 5.0
};

// 創建帶有 MQTT 5.0 屬性的消息
var message = MqttApplicationMessage.CreateWithProperties(
    topic: "request/data",
    payload: "{\"query\": \"temperature\"}",
    qos: MqttQualityOfService.AtLeastOnce,
    retain: false,

    // MQTT 5.0 特有屬性
    responseTopic: "response/client1",           // 響應主題
    correlationData: Encoding.UTF8.GetBytes("req-123"),  // 關聯數據
    messageExpiryInterval: 60,                   // 消息60秒後過期
    contentType: "application/json",             // 內容類型
    userProperties: new List<MqttUserProperty>   // 用户自定義屬性
    {
        new MqttUserProperty("version", "1.0"),
        new MqttUserProperty("source", "sensor-hub")
    }
);

await client.PublishAsync(message);

總結

是一個功能完整、性能優秀的 .NET MQTT 庫,具有以下優勢:

  • 完整的協議支持:MQTT 3.1.1、MQTT 5.0、MQTT-SN、CoAP 全覆蓋
  • 高性能設計:異步 IO、零分配、緩衝區池
  • 企業級特性:橋接、集羣、認證授權
  • 易於使用:簡潔的 API 設計,豐富的示例代碼
  • 現代化:支持最新的 .NET 版本

無論你是構建 IoT 平台、實現設備通信,還是搭建消息中間件,這個庫都能滿足你的需求。

相關鏈接

源碼地址:https://github.com/hnlyf1688/mqtt

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.