背景介紹
舊項目是藉助swoole並通過'php websocketServer.php' 啓動websocket服務的, 設備端想通過websocket推送一些數據給服務器, 服務器將相應的數據單獨保存起來方便設備那邊後期查看. 由於採用的websocket服務的啓動方式導致不能直接使用thinkphp提供的很多方法,我就想借助reids, 當websocket收到設備推送的數據後先存到redis列表中, 然後在thinkphp中創建一個自定義指令專門負責處理對應redis列表中的數據.
具體實現步驟如下所示:
-
websocket中保存相應數據到redis列表中
//監聽WebSocket消息事件。 $ws->on('Message', function ($ws, $frame) use ($redis, $userWsTypeMap) { $ws->push($frame->fd, 'server receive a message!'); $data = json_decode($frame->data, true); //設備websocket連接 if (!empty($data['deviceId'])) { //記錄設備連接 $oldFid = $redis->hGet('device:links', $data['deviceId']); if ($oldFid != $frame->fd) { $redis->hSet('device:links', $data['deviceId'], $frame->fd); } if (isset($data['type'])) {//開關門兒相關 switch ($data['type']) { case 4://設備推送的信息 $logData = $data['data'] ?? '';//日誌數據 if ($logData) { $logDataStr = json_encode($logData, JSON_UNESCAPED_UNICODE); if ($logDataStr === false) { $logDataStr = $logData; } //將設備推送的信息保存的device:log:list這個redis列表中 $redis->lPush('device:log:list', $logDataStr); $ws->push($frame->fd, 'log_upload_response'); //給前台一個回覆 } break; } } } }); -
在thinkphp中構建自定義指令來監聽device:log:list隊列
在thinkphp中構建自定義值了來監聽隊列, 如果隊列中有內容就進行處理
<?php namespace app\device\command; use app\common\service\RedisService; use fast\MonologUtil; use RedisException; use think\console\Command; use think\console\Input; use think\console\Output; class DeviceLog extends Command { public static $deviceLogList = 'device:log:list'; //通過隊列記錄前台傳遞的設備logs //配置命令 protected function configure() { $this->setName(self::$deviceLogList) ->setDescription('記錄設備傳遞的日誌信息'); } protected function execute(Input $input, Output $output) { $output->writeln('開始監聽設備日誌信息隊列:' . self::$deviceLogList); // 設置最大運行時間(3600秒=1小時),避免長期運行導致內存泄漏 $maxExecutionTime = 3600; $startTime = time(); $redis = null; try { while (true) {//循環監聽隊列 if (time() - $startTime >= $maxExecutionTime) {//判斷是否超時 return 1; } // 確保Redis連接有效,無效則重建 if (!$redis || !$redis->isConnected()) { $redis = RedisService::getInstance(); $redis->select(15);//選擇數據庫15 $output->writeln('Redis連接已建立或重建'); } try { $timeout = $maxExecutionTime - (time() - $startTime); // 限制最小超時時間,避免負數 $timeout = max(1, $timeout); $task = $redis->brPop(self::$deviceLogList, $timeout); if ($task) { $startMsg = "開始處理任務:redis列表名:{$task[0]} 數據:{$task[1]}"; $output->writeln($startMsg); // $task 格式:[0 => 隊列名, 1 => 數據] $taskData = json_decode($task[1], true); //記錄日誌 MonologUtil::info('設備日誌', $taskData, 'device/log'); $output->writeln('任務處理結束'); } } catch (RedisException $e) { // 捕獲Redis命令級錯誤(如連接中斷),標記連接無效並重試 $output->writeln('Redis命令執行錯誤: ' . $e->getMessage() . ',將重試連接'); $redis = null; // 標記連接失效 // 短暫延遲避免頻繁重試 sleep(1); } } } catch (\Exception $e) { $errorMsg = '監聽器發生致命錯誤: ' . $e->getMessage(); $output->writeln($errorMsg); MonologUtil::error('監聽器致命錯誤', [ 'error' => $e->getMessage(), 'stack_trace' => $e->getTraceAsString() ], 'device/fatal_error'); return 0; // 返回非1狀態碼錶示異常退出 } } } -
在command.php中添加對應自定義指令
return [ \app\device\command\DeviceLog::class,//記錄設備通過websocket傳遞的日誌 ]; - 在服務器中配配置守護進程執行對應自定義指令
總結
通過上面的方式就可以藉助redis作為中間件實現設備推送數據到websocket服務器, websocket服務器將對應數據先存到redis列表中, 然後藉助thinkphp的自定義指令監聽並處理相應redis列表中的數據. 之所以想借助thinkphp的自定義指令就是想使用thinkphp中已經寫好的方法和類(例如這裏的MonologUtil類,就是之前創建的monelog日誌管理類)
最終實現的效果如下圖所示, 在對應位置就會有保存的設備相關日誌信息: