topthink/think-queue 是 ThinkPHP 框架的官方隊列擴展,用於處理異步任務(如訂單通知、郵件發送、數據同步等),避免因耗時操作阻塞主流程。以下是其常見用法和核心功能説明:
一. 安裝與配置
-
安裝擴展
composer require topthink/think-queue -
配置隊列驅動
在 config/queue.php 中配置隊列驅動(支持 sync 同步、redis、database 等):return [ 'default' => 'redis', // 默認隊列驅動 'connections' => [ 'sync' => [ 'type' => 'sync', // 同步執行(用於調試) ], 'redis' => [ 'type' => 'redis', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 12, // 數據庫編號 'timeout' => 0, // redis連接的超時時間 'persistent' => false, //'prefix' => 'think_queue:', // 隊列鍵前綴 ], ], ];
二. 創建任務類
任務類需藉助 think\queue\Job,並實現 fire 方法(任務執行邏輯)和 failed 方法(任務失敗處理)。
示例: app\device\jobs\DeviceJob(設備訂單執行任務)
namespace app\device\jobs;
use think\Log;
use think\queue\Job;
class DeviceJob
{
/**
* 任務執行邏輯
* @param Job $job 任務對象
* @param array $data 傳遞的參數(如訂單ID、手機號等)
*/
public function fire(Job $job, array $data)
{
try {
// 實際業務邏輯處理
$result = $this->doBusiness($data);
if ($result) {
// 任務執行成功,刪除任務
$job->delete();
echo "任務執行成功\n";
} else {
// 執行失敗,判斷是否需要重試
if ($job->attempts() < 3) { // 最多重試3次
$job->release(5); // 5秒後重試
echo "任務執行失敗,將在5秒後重試\n";
} else {
// 超過重試次數,標記失敗
$job->delete();
$this->failed($data); // 調用失敗處理
echo "任務多次失敗,已放棄\n";
}
}
} catch (\Exception $e) {
// 捕獲異常,按失敗處理
$job->release(10); // 10秒後重試
echo "執行異常:{$e->getMessage()}\n";
}
}
//執行業務邏輯
public function doBusiness($data)
{
// 業務邏輯處理
return true;
}
/**
* 任務失敗處理(如記錄日誌、人工干預)
* @param array $data 任務參數
*/
public function failed($data)
{
// 記錄失敗日誌(可寫入數據庫或文件)
Log::error("任務失敗:" . json_encode($data));
}
}
三. 推送任務到隊列
在控制器或服務中推送任務到隊列。
public function test(Request $request)
{
//在測試方法中推送任務到隊列
\queue(app\device\jobs\DeviceJob::class, ['device_sn' => 'A8:4F:A4:CA:B9:AB']);
}
上面的test()方法執行完成後在redis隊列中就會多一條記錄:
四. 啓動隊列worker進程
任務推送後需啓動 worker 進程消費隊列,通過命令行執行:
-
基本命令(消費默認隊列)
php think queue:work -
消費指定隊列(推薦)
php think queue:work --queue order_notify,order_check # 同時消費 order_notify 和 order_check 隊列,優先級按順序排列 -
守護進程模式(生產環境)
php think queue:work --daemon --queue order_notify # --daemon:以守護進程運行(進程常駐,自動重啓) # 退出需用 kill 命令終止進程