先説一下遇到的問題,使用的是beanstalk隊列,有兩個tube, 使用 supervisor 監控 beanstalk 消費隊列(主進程A),主進程A產生兩個子進程(子進程B,子進程C),每個子進程處理一個tube的數據。
supervisor配置如下:
[program:queue-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
處理消費隊列的代碼如下:
/**
* 啓動消費隊列
*
* @command php console.php queue worker
*/
public function wokerAction()
{
echo "------ worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
$beanstalk->addWorker(
'main',
function (BeanstalkJob $job) use ($config, $logger) {
$taskId = $job->getBody();
try {
$manager = new MainQueue();
$manager->handle($taskId);
} catch (\Throwable $e) {
$logger->error("tube:main, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
exit(0);
}
);
$beanstalk->addWorker(
'notice',
function (BeanstalkJob $job) use ($config, $logger) {
$taskId = $job->getBody();
try {
$manager = new NoticeQueue();
$manager->handle($taskId);
} catch (\Throwable $e) {
$logger->error("tube:notice, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
exit(0);
}
);
$beanstalk->doWork();
}
經常會出現下面的報錯,子進程B或者C就退出了,但是主進程沒事。
shmop_open(): unable to attach or create shared memory segment 'No such file or directory'
也查閲了supervisor的文檔,裏面有個針對組的配置,但是試了不起作用,這裏的組配置應該是針對主進程的,主進程沒事,子進程死活就不管了。
stopasgroup=true
killasgroup=true
這個事情折騰了好幾天,子進程不定時的會死,一直找不到出現共享內存錯誤的原因,進程的創建使用的是現成的類庫,如果總找不到原因程序就不穩定了。
後來轉變思路,反正只有兩個tube,乾脆搞兩個獨立進程算了,不糾結什麼子進程了,簡單的事情搞複雜了,把監控的事情託付給 supervisor,讓它幹擅長的事情。此次事件不是 supervisor 的問題,是我們使用的姿勢不對。
supervisor 配置如下:
[program:queue-main-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue main_worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
[program:queue-notice-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue notice_worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
處理消費隊列的代碼如下:
(1)處理 main tube 隊列
/**
* 啓動main消費隊列
*
* @command php console.php queue main_worker
*/
public function mainWorkerAction()
{
$tube = 'main';
echo "------{$tube} worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
while (true) {
$job = $beanstalk->reserveFromTube($tube);
if ($job instanceof BeanstalkJob) {
$taskId = $job->getBody();
try {
$manager = new MainQueue();
$manager->handle($taskId);
$job->delete();
} catch (\Throwable $e) {
$logger->error("tube:{$tube}, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
} else {
sleep(1);
}
}
}
(2)處理 notice tube 隊列
/**
* 啓動notice消費隊列
*
* @command php console.php queue notice_worker
*/
public function noticeWorkerAction()
{
$tube = 'notice';
echo "------{$tube} worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
while (true) {
$job = $beanstalk->reserveFromTube($tube);
if ($job instanceof BeanstalkJob) {
$taskId = $job->getBody();
try {
$manager = new NoticeQueue();
$manager->handle($taskId);
$job->delete();
} catch (\Throwable $e) {
$logger->error("tube:{$tube}, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
} else {
sleep(1);
}
}
}
經驗總結
至於當初會選擇使用fork子進程的方式,説到底還是圖方便,因為有現成的東西,拿來就用,但是出了問題,又遲遲搞不定。吃過虧才發現,還是用簡單穩定的方案實在,哪怕看上去沒有那麼美。
項目組件
- 後台框架:phalcon 3.4.5
- 前端框架:layui 2.8.2
- 全文檢索:xunsearch 1.4.9
- 即時通訊:workerman 3.5.22
- 基礎依賴:php7.3, mysql5.7, redis5.0
項目文檔
- 運行環境搭建
- 系統服務配置
- 客户終端配置
意見反饋
- 碼雲平台
- 官方社區