MQTT協議在thinkphp項目中的常見用法
一般我們在實際項目中用,都是將相關方法寫到自定義指令控制器中, 後期通過supervisor進程管理工具啓一個進程專門運行自定義指令, 來實現持續監聽處理的邏輯
composer三方擴展用的是 'php-mqtt/client'
帶重連的標準寫法如下所示:
while (true) {
try {
//獲取長連接客户端(單例,用於監聽等長期操作)
$client = MqttUtil::getPersistentClient();
// 阻塞處理事件(直到異常或信號退出)
$client->loop(true);
} catch (Exception $e) {
echo "MQTT 循環出錯: " . $e->getMessage() . "\n";
sleep(1); // 延遲重連,避免頻繁嘗試
}
}
在phpmqtt/client中,$client->loop()的作用
在 php-mqtt/client 庫中,$client->loop() 是 MQTT 客户端處理消息和維護連接的客户端事件循環方法,其主要作用是讓客户端進入 “事件處理循環”,處理與 MQTT 服務器之間的通信。通俗來講, 通過loop()方法可以讓MQTT客户端'活'起來, 持續運行並相應網絡事件。
$client->loop() 的基本功能
- 處理接收的消息:
當客户端訂閲的主題有新消息時,loop() 會接收消息並觸發你註冊的訂閲回調函數(例如前文代碼中處理 device/+/publish 主題的匿名函數)。 - 維持連接活性:
自動處理 MQTT 協議的心跳機制(根據連接時設置的 keepalive 間隔),定期向服務器發送心跳包,防止連接被服務器判定為 “失效” 而斷開。 -
處理網絡事件
包括髮送客户端的消息(如 publish 發佈的內容)、處理服務器的確認信息(如 QoS 1/2 級別的消息確認)等。常用參數與用法
loop() 方法有三個可選參數,用於控制循環行為:
$client->loop((bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, int $queueWaitLimit = null); -
bool $allowSleep = true:
- 是否允許在 “沒有事件” 時短暫休眠,降低 CPU 佔用。
-
bool $exitWhenQueuesEmpty = false):
- 當所有隊列都空了時,是否自動退出循環。
-
int $queueWaitLimit = null :
- 等待隊列變空的最長時間(秒),配合 $exitWhenQueuesEmpty = true 使用。
$client->loop()方法執行流程如下:
記錄開始時間 loopStartedAt
進入 while(true) 死循環:
1. 如果有人調用了 $client->interrupt(),就退出循環
2. 調用 loopOnce() 處理一次事件(收消息、發消息、發心跳)
3. 如果 $exitWhenQueuesEmpty = true:
a. 檢查是否沒有活躍訂閲
b. 如果所有隊列都空了 → 退出
c. 如果設置了 $queueWaitLimit 且已超時 → 退出
4. 回到第1步
使用了$client->loop(true)還需要用while(true)嗎?
即使使用 $client->loop(true)(默認阻塞模式,無超時),仍然需要 while(true) 循環,原因與 loop() 方法的執行機制有關:
loop(true) 是 “單次阻塞”,而非 “永久運行”
$client->loop(true)(不帶超時參數)的行為是:
- 進入阻塞狀態,持續等待並處理 MQTT 事件(接收消息、心跳等)。
-
但它並非無限運行—— 當發生以下情況時會退出阻塞並返回:
- 客户端主動調用 disconnect()(手動斷開連接)。
- 發生不可恢復的錯誤(如連接被強制斷開、網絡異常等)。
- 接收到退出信號(如通過 pcntl_signal 註冊的 SIGINT 終止信號)。
沒有 while(true) 會發生什麼?
如果只調用一次 $client->loop(true):
$client->connect();
$client->subscribe(...);
$client->loop(true); // 僅執行一次阻塞,退出後程序直接結束
$client->disconnect();
當 loop(true) 因上述某一原因退出後(例如臨時網絡波動導致連接斷開),程序會直接執行後續的 disconnect() 並退出,無法再次連接或繼續監聽。
總結
$client->loop(true) 負責單次阻塞處理 MQTT 事件,而 while(true) 負責在事件處理結束後(無論正常還是異常)重新啓動整個流程,兩者缺一不可:
- 沒有 loop(true):無法處理消息和維持連接。
- 沒有 while(true):一次事件處理結束後程序就會退出,無法持續運行或重連。
實際處理案例代碼:
小程序連接MQTT服務器的時候需要配置下nginx代理
map $http_upgrade $connection_upgrade {
default upgrade; # 當客户端發送 Upgrade 頭時,Connection 設為 upgrade
'' close; # 當客户端沒有 Upgrade 頭時,Connection 設為 close
}
server
{
listen 80;
listen 443 ssl;
server_name you-domain.com;
index index.php index.html index.htm default.php default.htm default.html;
root /www/wwwroot/youdomain/public;
# 代理 MQTT over WebSocket(加密)
location /mqtt {
proxy_pass http://127.0.0.1:8083; # 轉發到 EMQX 的 8083 端口
# 啓用 WebSocket 升級(與非加密配置相同)
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 其他基礎配置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 300s;
proxy_send_timeout 300s; # Nginx 連續兩次向後端發送消息的超時時間
}
}
使用的mqtt的client端擴展如下:
composer require php-mqtt/client
mqtt服務端指令對應方法
<?php
namespace app\device\command;
use fast\MqttUtil;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use PhpMqtt\Client\Exceptions\MqttClientException;
use think\Db;
use think\Log;
class MqttListen extends Command
{
private $localCache = []; //本地緩存容器
private $cacheExpire = 5; //緩存過期時間,單位秒
// 配置指令
protected function configure()
{
$this->setName('mqtt:listen')
->setDescription('MQTT消息監聽服務(帶斷線重連和業務處理)');
}
// 執行指令
protected function execute(Input $input, Output $output)
{
Log::info('MQTT監聽服務啓動');
$reconnectInterval = 5; // 重連間隔(秒)
// 主監聽循環
while (true) {
try {
MqttUtil::subscribe('device/+/publish', function ($topic, $message) use ($output) {
//這裏要加一個消息去重也就是近5s已經處理過的消息就不處理了
//消息體重有個time字段記錄發現消息的秒級時間戳,同樣時間戳的消息體只處理一次
$md5 = md5($message);
$now = time();
//清理過期緩存
$this->localCache = array_filter($this->localCache, function ($time) use ($now) {
return $now - $time < $this->cacheExpire;
});
//先判斷本地緩存是否有,有就直接返回,沒有的話就繼續處理
if (isset($this->localCache[$md5])) {
MonologUtil::stream('該消息已處理,不再重複處理');
return;
}
//加入本地緩存
$this->localCache[$md5] = $now;
//消息處理邏輯
$this->handleMessage($topic, $message, $output);
}, 1);
// 2. 啓動監聽循環(1秒超時,允許響應信號)
$client = MqttUtil::getPersistentClient();
$client->loop(true);
} catch (MqttClientException $e) {
$errorMsg = "MQTT連接異常: {$e->getMessage()}";
Log::error($errorMsg);
// 斷開舊連接,等待重連
MqttUtil::disconnectPersistentClient();
sleep($reconnectInterval);
} catch (\Exception $e) {
$errorMsg = "系統異常: {$e->getMessage()}";
Log::error($errorMsg);
sleep(1);
}
}
}
/**
* 處理接收到的MQTT消息
*/
private function handleMessage(string $topic, string $message, Output $output)
{
Log::info('收到MQTT消息: ' . json_encode(['topic' => $topic, 'message' => $message], JSON_UNESCAPED_UNICODE));
try {
// 解析設備SN
if (preg_match('/device\/(.*?)\/publish/', $topic, $matches)) {
$deviceSn = $matches[1];
// 處理消息並與數據庫交互
$result = $this->processMessage($deviceSn, $message);
if ($result['need_publish']) {// 發佈處理結果(如果需要發佈的話)
$responseTopic = "device/{$deviceSn}/subscribe";
//MqttUtil::publishTemporary($responseTopic, $result, 1);//不用這種臨時連接
//因為已經有永久連接了,所以沒必要在這裏面再起個臨時鏈接發送
MqttUtil::publishPersistent($responseTopic, $result['data']);
}
} else {
$error = "無法解析設備SN,主題格式不正確: {$topic}";
Log::warning($error);
}
} catch (\Exception $e) {
$errorMsg = '處理消息時發生錯誤: ' . $e->getMessage();
Log::error($errorMsg);
}
}
/**
* 結合數據庫處理消息(業務邏輯)
*/
private function processMessage(string $deviceSn, string $message)
{
try {
// 解析消息
$data = json_decode($message, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('消息格式錯誤: ' . json_last_error_msg());
}
//邏輯處理
return [
'status' => 'success',
'device_sn' => $deviceSn,
'message' => '數據已處理',
'need_publish' => 0, //是否需要發佈給前台信息
];
} catch (\Exception $e) {
Db::rollback();
throw $e;
}
}
}
上面服務指令方法中主要用來處理相關的邏輯,並使用了MqttUtil提供的各種mqtt相關的連接以及常用方法, 因為在業務系統中也需要給頻道發佈一些內容, 所以通過fast\MqttUtil 將基礎的連接和常用方法做一下封裝,方便其他地方調用
<?php
namespace fast;
use PhpMqtt\Client\MqttClient;
use PhpMqtt\Client\ConnectionSettings;
use PhpMqtt\Client\Exceptions\MqttClientException;
use think\Env;
class MqttUtil
{
// 單例實例(用於長連接,如監聽)
public static $persistentClient = null;
// 配置信息
private static $config = [];
/**
* 初始化配置
*/
private static function initConfig(): void
{
if (empty(self::$config)) {
self::$config = [
'server' => 'mqtt.yourdomain.com',//localhost
'port' => 1883,
'username' => '',
'password' => '',
'keepalive' => 60,
'client_id' => 'daoheng',
'qos' => 1,
];
}
}
/**
* 獲取長連接客户端(單例,用於監聽等長期操作)
* @throws MqttClientException
*/
public static function getPersistentClient(): MqttClient
{
self::initConfig();
// 若客户端未初始化或已斷開,重新連接
if (!self::$persistentClient || !self::$persistentClient->isConnected()) {
$clientId = self::$config['client_id'];
//引入環境變量,避免測試環境與正式環境衝突
$env = Env::get('app.env', '');
$clientId .= $env;
self::$persistentClient = new MqttClient(
self::$config['server'],
self::$config['port'],
$clientId
);
// 配置連接參數
$connectionSettings = (new ConnectionSettings)
// ->setUsername(self::$config['username'])
// ->setPassword(self::$config['password'])
->setKeepAliveInterval(self::$config['keepalive']) //設置60秒發送一次心跳
->setConnectTimeout(10) //連接超時時間
->setSocketTimeout(80)// 80秒無響應則超時
->setReconnectAutomatically(true);// 自動重連
// 連接服務器
self::$persistentClient->connect($connectionSettings, true);
Log::info('MQTT長連接客户端已初始化並連接');
}
return self::$persistentClient;
}
/**
* 獲取臨時客户端(用於單次發佈等短期操作)
* @throws MqttClientException
*/
public static function getTemporaryClient(): MqttClient
{
self::initConfig();
//臨時連接的clientId不能重複,否則後面的連接會頂掉前面的連接
$clientId = self::$config['client_id'] . uniqid();
$client = new MqttClient(
self::$config['server'],
self::$config['port'],
$clientId
);
$connectionSettings = (new ConnectionSettings)
// ->setUsername(self::$config['username'])
// ->setPassword(self::$config['password'])
->setKeepAliveInterval(5); // 臨時連接縮短心跳間隔
$client->connect($connectionSettings, true);
return $client;
}
/**
* 訂閲主題(基於長連接客户端)
* @param string $topic 主題
* @param callable $callback 消息回調函數
* @param int $qos QoS級別
* @throws MqttClientException
*/
public static function subscribe(string $topic, callable $callback, int $qos = null): void
{
$qos = is_null($qos) ? (self::$config['qos'] ?? 1) : $qos;
$client = self::getPersistentClient();
$client->subscribe($topic, $callback, $qos);
Log::info("已訂閲主題: {$topic} (QoS: {$qos})");
}
//使用永久連接發佈消息
public static function publishPersistent(string $topic, array $message, int $qos = 1): bool
{
$client = self::getPersistentClient();
$message['time'] = time();//統一加個時間戳
$messageJson = json_encode($message, JSON_UNESCAPED_UNICODE);
$client->publish($topic, $messageJson, $qos);
$event = $message['event'] ?? '';
if ($event != 'heartbeat') {
MonologUtil::stream('php[長連接client]發佈mqtt消息:', [
'topic' => $topic,
'message' => $message,
]);
}
return true;
}
/**
* 發佈消息(默認使用臨時客户端,避免長連接阻塞)
* @param string $topic 主題
* @param array $message 消息內容
* @param int $qos QoS級別
* @return bool 發佈是否成功
*/
public static function publishTemporary(string $topic, array $message, int $qos = null): bool
{
$qos = is_null($qos) ? (self::$config['qos'] ?? 1) : $qos;
$client = null;
try {
// 使用臨時客户端發佈,避免影響長連接
$client = self::getTemporaryClient();
$message = json_encode($message, JSON_UNESCAPED_UNICODE);
$client->publish($topic, $message, $qos);
Log::info("已發佈消息到主題: {$topic},內容: {$message}");
return true;
} catch (MqttClientException $e) {
Log::error("消息發佈失敗 ({$topic}): {$e->getMessage()}");
return false;
} finally {
// 臨時客户端使用後斷開連接
if ($client && $client->isConnected()) {
try {
$client->disconnect();
} catch (MqttClientException $e) {
Log::warning("發佈後斷開連接失敗: {$e->getMessage()}");
}
}
}
}
/**
* 斷開長連接客户端
*/
public static function disconnectPersistentClient(): void
{
if (self::$persistentClient && self::$persistentClient->isConnected()) {
try {
self::$persistentClient->disconnect();
Log::info('MQTT長連接客户端已斷開');
} catch (MqttClientException $e) {
Log::warning("長連接斷開失敗: {$e->getMessage()}");
}
self::$persistentClient = null;
}
}
}