Stories

Detail Return Return

mqtt在php項目中的常見用法 - Stories Detail

MQTT協議在thinkphp項目中的常見用法

一般我們在實際項目中用,都是將相關方法寫到自定義指令控制器中, 後期通過supervisor進程管理工具啓一個進程專門運行自定義指令, 來實現持續監聽處理的邏輯

composer三方擴展用的是 'php-mqtt/client'

帶重連的標準寫法如下所示:

while (true) {
    try {
        //獲取長連接客户端(單例,用於監聽等長期操作)
        $client = MqttUtil::getPersistentClient();
        // 阻塞處理事件(直到異常或信號退出)
        $client->loop(true);
    } catch (Exception $e) {
         echo "MQTT 循環出錯: " . $e->getMessage() . "\n";
        sleep(1); // 延遲重連,避免頻繁嘗試
    }
}

在phpmqtt/client中,$client->loop()的作用

在 php-mqtt/client 庫中,$client->loop() 是 MQTT 客户端處理消息和維護連接的客户端事件循環方法,其主要作用是讓客户端進入 “事件處理循環”,處理與 MQTT 服務器之間的通信。通俗來講, 通過loop()方法可以讓MQTT客户端'活'起來, 持續運行並相應網絡事件。

$client->loop() 的基本功能

  1. 處理接收的消息:
    當客户端訂閲的主題有新消息時,loop() 會接收消息並觸發你註冊的訂閲回調函數(例如前文代碼中處理 device/+/publish 主題的匿名函數)。
  2. 維持連接活性:
    自動處理 MQTT 協議的心跳機制(根據連接時設置的 keepalive 間隔),定期向服務器發送心跳包,防止連接被服務器判定為 “失效” 而斷開。
  3. 處理網絡事件
    包括髮送客户端的消息(如 publish 發佈的內容)、處理服務器的確認信息(如 QoS 1/2 級別的消息確認)等。

    常用參數與用法

    loop() 方法有三個可選參數,用於控制循環行為:

    $client->loop((bool $allowSleep = true, 
    bool $exitWhenQueuesEmpty = false, 
    int $queueWaitLimit = null);
  4. bool $allowSleep = true:

    • 是否允許在 “沒有事件” 時短暫休眠,降低 CPU 佔用。
  5. bool $exitWhenQueuesEmpty = false):

    • 所有隊列都空了時,是否自動退出循環。
  6. int $queueWaitLimit = null :

    • 等待隊列變空的最長時間(秒),配合 $exitWhenQueuesEmpty = true 使用。

$client->loop()方法執行流程如下:

記錄開始時間 loopStartedAt
進入 while(true) 死循環:

1. 如果有人調用了 $client->interrupt(),就退出循環
2. 調用 loopOnce() 處理一次事件(收消息、發消息、發心跳)
3. 如果 $exitWhenQueuesEmpty = true:
   a. 檢查是否沒有活躍訂閲
   b. 如果所有隊列都空了 → 退出
   c. 如果設置了 $queueWaitLimit 且已超時 → 退出
4. 回到第1步

使用了$client->loop(true)還需要用while(true)嗎?

即使使用 $client->loop(true)(默認阻塞模式,無超時),仍然需要 while(true) 循環,原因與 loop() 方法的執行機制有關:
loop(true) 是 “單次阻塞”,而非 “永久運行”
$client->loop(true)(不帶超時參數)的行為是:

  • 進入阻塞狀態,持續等待並處理 MQTT 事件(接收消息、心跳等)。
  • 但它並非無限運行—— 當發生以下情況時會退出阻塞並返回:

    1. 客户端主動調用 disconnect()(手動斷開連接)。
    2. 發生不可恢復的錯誤(如連接被強制斷開、網絡異常等)。
    3. 接收到退出信號(如通過 pcntl_signal 註冊的 SIGINT 終止信號)。

沒有 while(true) 會發生什麼?
如果只調用一次 $client->loop(true):

$client->connect();
$client->subscribe(...);
$client->loop(true); // 僅執行一次阻塞,退出後程序直接結束
$client->disconnect();

當 loop(true) 因上述某一原因退出後(例如臨時網絡波動導致連接斷開),程序會直接執行後續的 disconnect() 並退出,無法再次連接或繼續監聽。

總結

$client->loop(true) 負責單次阻塞處理 MQTT 事件,而 while(true) 負責在事件處理結束後(無論正常還是異常)重新啓動整個流程,兩者缺一不可:

  • 沒有 loop(true):無法處理消息和維持連接。
  • 沒有 while(true):一次事件處理結束後程序就會退出,無法持續運行或重連。

實際處理案例代碼:

小程序連接MQTT服務器的時候需要配置下nginx代理
map $http_upgrade $connection_upgrade {
    default upgrade;  # 當客户端發送 Upgrade 頭時,Connection 設為 upgrade
    ''      close;    # 當客户端沒有 Upgrade 頭時,Connection 設為 close
}
server
{
    listen 80;
    listen 443 ssl;
    server_name you-domain.com;
    index index.php index.html index.htm default.php default.htm default.html;
    root /www/wwwroot/youdomain/public;
     # 代理 MQTT over WebSocket(加密)
    location /mqtt {
        proxy_pass http://127.0.0.1:8083;  # 轉發到 EMQX 的 8083 端口
        
        # 啓用 WebSocket 升級(與非加密配置相同)
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        
        # 其他基礎配置
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_read_timeout 300s;
        proxy_send_timeout 300s;  # Nginx 連續兩次向後端發送消息的超時時間
    }

}

使用的mqtt的client端擴展如下:

composer require php-mqtt/client
mqtt服務端指令對應方法
<?php

namespace app\device\command;

use fast\MqttUtil;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use PhpMqtt\Client\Exceptions\MqttClientException;
use think\Db;
use think\Log;

class MqttListen extends Command
{
    private $localCache = []; //本地緩存容器
    private $cacheExpire = 5; //緩存過期時間,單位秒
    
    // 配置指令
    protected function configure()
    {
        $this->setName('mqtt:listen')
            ->setDescription('MQTT消息監聽服務(帶斷線重連和業務處理)');
    }

    // 執行指令
    protected function execute(Input $input, Output $output)
    {
        Log::info('MQTT監聽服務啓動');
        $reconnectInterval = 5; // 重連間隔(秒)
        // 主監聽循環
        while (true) {
            try {
                MqttUtil::subscribe('device/+/publish', function ($topic, $message) use ($output) {
                    //這裏要加一個消息去重也就是近5s已經處理過的消息就不處理了
                    //消息體重有個time字段記錄發現消息的秒級時間戳,同樣時間戳的消息體只處理一次
                    $md5 = md5($message);
                    $now = time();
                    //清理過期緩存
                    $this->localCache = array_filter($this->localCache, function ($time) use ($now) {
                        return $now - $time < $this->cacheExpire;
                    });
                    //先判斷本地緩存是否有,有就直接返回,沒有的話就繼續處理
                    if (isset($this->localCache[$md5])) {
                        MonologUtil::stream('該消息已處理,不再重複處理');
                        return;
                    }
                    //加入本地緩存
                    $this->localCache[$md5] = $now;
                    //消息處理邏輯
                    $this->handleMessage($topic, $message, $output);
                }, 1);
                // 2. 啓動監聽循環(1秒超時,允許響應信號)
                $client = MqttUtil::getPersistentClient();
                $client->loop(true);

            } catch (MqttClientException $e) {
                $errorMsg = "MQTT連接異常: {$e->getMessage()}";
                Log::error($errorMsg);

                // 斷開舊連接,等待重連
                MqttUtil::disconnectPersistentClient();
                sleep($reconnectInterval);
            } catch (\Exception $e) {
                $errorMsg = "系統異常: {$e->getMessage()}";
                Log::error($errorMsg);
                sleep(1);
            }
        }
    }

    /**
     * 處理接收到的MQTT消息
     */
    private function handleMessage(string $topic, string $message, Output $output)
    {
        Log::info('收到MQTT消息: ' . json_encode(['topic' => $topic, 'message' => $message], JSON_UNESCAPED_UNICODE));

        try {
            // 解析設備SN
            if (preg_match('/device\/(.*?)\/publish/', $topic, $matches)) {
                $deviceSn = $matches[1];

                // 處理消息並與數據庫交互
                $result = $this->processMessage($deviceSn, $message);
                if ($result['need_publish']) {// 發佈處理結果(如果需要發佈的話)
                    $responseTopic = "device/{$deviceSn}/subscribe";
                    //MqttUtil::publishTemporary($responseTopic, $result, 1);//不用這種臨時連接
                    //因為已經有永久連接了,所以沒必要在這裏面再起個臨時鏈接發送
                    MqttUtil::publishPersistent($responseTopic, $result['data']);
                }
            } else {
                $error = "無法解析設備SN,主題格式不正確: {$topic}";
                Log::warning($error);
            }
        } catch (\Exception $e) {
            $errorMsg = '處理消息時發生錯誤: ' . $e->getMessage();
            Log::error($errorMsg);
        }
    }

    /**
     * 結合數據庫處理消息(業務邏輯)
     */
    private function processMessage(string $deviceSn, string $message)
    {
        try {
            // 解析消息
            $data = json_decode($message, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                throw new \Exception('消息格式錯誤: ' . json_last_error_msg());
            }

            //邏輯處理
            return [
                'status' => 'success',
                'device_sn' => $deviceSn,
                'message' => '數據已處理',
                'need_publish' => 0, //是否需要發佈給前台信息
            ];
        } catch (\Exception $e) {
            Db::rollback();
            throw $e;
        }
    }
}

上面服務指令方法中主要用來處理相關的邏輯,並使用了MqttUtil提供的各種mqtt相關的連接以及常用方法, 因為在業務系統中也需要給頻道發佈一些內容, 所以通過fast\MqttUtil 將基礎的連接和常用方法做一下封裝,方便其他地方調用

<?php

namespace fast;

use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\Exceptions\MqttClientException;
use think\Env;


class MqttUtil
{
    // 單例實例(用於長連接,如監聽)
    public static $persistentClient = null;

    // 配置信息
    private static $config = [];

    /**
     * 初始化配置
     */
    private static function initConfig(): void
    {
        if (empty(self::$config)) {
            self::$config = [
                'server' => 'mqtt.yourdomain.com',//localhost
                'port' => 1883,
                'username' => '',
                'password' => '',
                'keepalive' => 60,
                'client_id' => 'daoheng',
                'qos' => 1,
            ];
        }
    }

    /**
     * 獲取長連接客户端(單例,用於監聽等長期操作)
     * @throws MqttClientException
     */
    public static function getPersistentClient(): MqttClient
    {
        self::initConfig();

        // 若客户端未初始化或已斷開,重新連接
        if (!self::$persistentClient || !self::$persistentClient->isConnected()) {
            $clientId = self::$config['client_id'];
            //引入環境變量,避免測試環境與正式環境衝突
            $env = Env::get('app.env', '');
            $clientId .= $env;
            self::$persistentClient = new MqttClient(
                self::$config['server'],
                self::$config['port'],
                $clientId
            );

            // 配置連接參數
            $connectionSettings = (new ConnectionSettings)
//                ->setUsername(self::$config['username'])
//                ->setPassword(self::$config['password'])
                ->setKeepAliveInterval(self::$config['keepalive']) //設置60秒發送一次心跳
                ->setConnectTimeout(10) //連接超時時間
                ->setSocketTimeout(80)// 80秒無響應則超時
                ->setReconnectAutomatically(true);// 自動重連

            // 連接服務器
            self::$persistentClient->connect($connectionSettings, true);
            Log::info('MQTT長連接客户端已初始化並連接');
        }

        return self::$persistentClient;
    }

    /**
     * 獲取臨時客户端(用於單次發佈等短期操作)
     * @throws MqttClientException
     */
    public static function getTemporaryClient(): MqttClient
    {
        self::initConfig();

        //臨時連接的clientId不能重複,否則後面的連接會頂掉前面的連接
        $clientId = self::$config['client_id'] . uniqid();
        $client = new MqttClient(
            self::$config['server'],
            self::$config['port'],
            $clientId
        );

        $connectionSettings = (new ConnectionSettings)
//            ->setUsername(self::$config['username'])
//            ->setPassword(self::$config['password'])
            ->setKeepAliveInterval(5); // 臨時連接縮短心跳間隔

        $client->connect($connectionSettings, true);
        return $client;
    }

    /**
     * 訂閲主題(基於長連接客户端)
     * @param string $topic 主題
     * @param callable $callback 消息回調函數
     * @param int $qos QoS級別
     * @throws MqttClientException
     */
    public static function subscribe(string $topic, callable $callback, int $qos = null): void
    {
        $qos = is_null($qos) ? (self::$config['qos'] ?? 1) : $qos;
        $client = self::getPersistentClient();
        $client->subscribe($topic, $callback, $qos);
        Log::info("已訂閲主題: {$topic} (QoS: {$qos})");
    }
    
    //使用永久連接發佈消息
    public static function publishPersistent(string $topic, array $message, int $qos = 1): bool
    {
        $client = self::getPersistentClient();
        $message['time'] = time();//統一加個時間戳
        $messageJson = json_encode($message, JSON_UNESCAPED_UNICODE);
        $client->publish($topic, $messageJson, $qos);
        $event = $message['event'] ?? '';
        if ($event != 'heartbeat') {
            MonologUtil::stream('php[長連接client]發佈mqtt消息:', [
                'topic' => $topic,
                'message' => $message,
            ]);
        }
        return true;
    }
    
    /**
     * 發佈消息(默認使用臨時客户端,避免長連接阻塞)
     * @param string $topic 主題
     * @param array $message 消息內容
     * @param int $qos QoS級別
     * @return bool 發佈是否成功
     */
    public static function publishTemporary(string $topic, array $message, int $qos = null): bool
    {
        $qos = is_null($qos) ? (self::$config['qos'] ?? 1) : $qos;
        $client = null;

        try {
            // 使用臨時客户端發佈,避免影響長連接
            $client = self::getTemporaryClient();
            $message = json_encode($message, JSON_UNESCAPED_UNICODE);
            $client->publish($topic, $message, $qos);
            Log::info("已發佈消息到主題: {$topic},內容: {$message}");
            return true;
        } catch (MqttClientException $e) {
            Log::error("消息發佈失敗 ({$topic}): {$e->getMessage()}");
            return false;
        } finally {
            // 臨時客户端使用後斷開連接
            if ($client && $client->isConnected()) {
                try {
                    $client->disconnect();
                } catch (MqttClientException $e) {
                    Log::warning("發佈後斷開連接失敗: {$e->getMessage()}");
                }
            }
        }
    }

    /**
     * 斷開長連接客户端
     */
    public static function disconnectPersistentClient(): void
    {
        if (self::$persistentClient && self::$persistentClient->isConnected()) {
            try {
                self::$persistentClient->disconnect();
                Log::info('MQTT長連接客户端已斷開');
            } catch (MqttClientException $e) {
                Log::warning("長連接斷開失敗: {$e->getMessage()}");
            }
            self::$persistentClient = null;
        }
    }
}
user avatar _kysou Avatar ailvyoudemaojin Avatar youqingyouyideqingjiao Avatar
Favorites 3 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.