分享两个用php写的队列服务woker代码。
1. 单进程队列服务
<?php include_once __DIR__ . DIRECTORY_SEPARATOR . '../../config.php'; include_once ROOT_DIR . 'lib/config.php'; include_once ROOT_DIR . 'lib/common.php'; include_once __DIR__ . DIRECTORY_SEPARATOR . 'Exception.php'; include_once ROOT_DIR . 'lib/ActionException.php'; ini_set('memory_limit', '32M');//设置最大内存使用量 /** * @param int $errno * @param string $errstr * @param string $errfile * @param int $errline * @throws QueueException */ function error_handler($errno, $errstr, $errfile, $errline) { $content = sprintf('队列程序运行过程中发生错误,文件:%s,第 %d 行,具体错误:%s,调用流程:%s', $errfile, $errline, $errstr, getCallTraceStr()); wlog($content, true, 5); throw new QueueException($content, $errno); } function fatal_handle() { $error = error_get_last(); if (isset($error['type']) && $error['type'] == E_ERROR) { error_handler($error['type'], $error['message'], $error['file'], $error['line']); } } $wsRedis = getWebSocketRedis(); QueueServer::run($argv, PHP_SCRIPT_USER); /** * Class QueueServer * 单进程 redis 队列服务类 * * @version 0.0.1 * @link https://www.maopiaopiao.com/ * @author yhm.1234@163.com */ class QueueServer { static $param = []; static $masterPid = 0; static $pidFile = '/dev/shm/' . PROJECT . '-qs.pid'; static $statusFile = '/dev/shm/' . PROJECT . '-qs.status'; static $owner = ''; static $processName = PROJECT . '-queue-server'; public static function run($param, $owner = '') { exit("本程序为单进程运行,随时都会因为程序错误而导致进程终止,因此不推荐使用本程序,推荐使用本目录下的 multiProcessServer.php 多进程队列服务\r\n"); self::$param = $param; self::$owner = $owner; self::checkParam(); self::{$param[1]}(); } /** * 开启 * @throws QueueException */ private static function start() { self::init(); self::daemonize(); self::status(); self::doSignal(); self::work(); } private static function init() { strtolower(php_sapi_name()) != 'cli' && exit('仅允许在cli模式下运行'); set_time_limit(0); set_error_handler('error_handler'); register_shutdown_function('fatal_handle'); } private static function work() { while (true) { #信号分发 pcntl_signal_dispatch(); $redis = getWebSocketRedis(false); while ($value = $redis->blpop(QUEUE_MASTER, 6)) { //self::log($value); try { if (!$value || !is_array($value) || empty($value[1])) { throw new QueueException('消息体格式错误', 1011); } $value = json_decode($value[1], true); if (json_last_error() || !$value || !is_array($value)) { throw new QueueException('消息体格式错误', 1012); } $file = __DIR__ . DIRECTORY_SEPARATOR . 'consumer/' . $value['type'] . '.php'; if (!file_exists($file)) { throw new QueueException('不存在的消费者类型', 1404); } require_once $file; $value['type']::run($value); $redis->incr(getQueueCountKey('success'));//队列任务成功处理的数 } catch (QueueException $exception) { $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数 self::log('队列执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode()); } catch (ActionException $exception) { $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数 self::log('执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode()); } catch (Exception $exception) { $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数 self::log('发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode()); } unset($value); } } } /** * 停止 * @param bool $exit */ private static function stop($exit = true) { /*if (!self::processHasRun()) { exit("服务未运行\r\n"); }*/ if (!file_exists(self::$pidFile)) { echo "警告:关闭服务过程中发现 pid 文件不存在\r\n"; } else { if (!unlink(self::$pidFile)) { echo "警告:关闭服务过程中删除 pid 文件失败\r\n"; } } exec('pkill -f ' . self::$processName); $exit && exit("服务关闭成功\r\n"); } /** * 重启 */ private static function restart() { self::stop(false); self::start(); } /** * 输出服务当前的状态信息 */ private static function status() { if (!self::processHasRun() || !file_exists(self::$pidFile)) { exit("服务未运行\r\n"); } $status = file_get_contents(self::$statusFile); if (!$status) { exit("状态文件不存在\r\n"); } $status = explode('|', $status); $redis = getWebSocketRedis(); $start_time = fileatime($status[1]); $str = sprintf( "系统当前时间:%s\r\n进程名称:%s\r\n主进程id:%d\r\n进程所属用户:%s\r\n启动时间:%s\r\n已运行:%s\r\n内存占用:%s\r\n内存占用峰值:%s\r\n" . "处理任务数:丢包 %d 总 %d 入队成功 %d 入队失败 %d 处理成功 %d 处理失败 %d 待处理 %d\r\n", date('Y-m-d H:i:s', getServerTimeNow()), self::$processName, $status[0], self::$owner, date('Y-m-d H:i:s', $start_time), secToTime(getServerTimeNow() - $start_time), getSizeDesc(memory_get_usage(true)), getSizeDesc(memory_get_peak_usage(true)), $redis->get(getQueueCountKey('failIncrTotal')), $redis->get(getQueueCountKey('total')), $redis->get(getQueueCountKey('stotal')), $redis->get(getQueueCountKey('ftotal')), $redis->get(getQueueCountKey('success')), $redis->get(getQueueCountKey('fail')), $redis->lLen(QUEUE_MASTER) ); echo $str; if ($GLOBALS['argv'] == 'status') { exit(); } } /** * 检查是否以守护进程的模式运行 * @throws QueueException */ private static function daemonize() { if (!empty(self::$param[2]) && self::$param[2] == '-d') { $pid = pcntl_fork(); if (-1 === $pid) { exit("守护进程创建失败\r\n"); } elseif ($pid > 0) { //主进程会在这里退出,下面的代码还会继续执行,不过是子进程接手继续执行的 exit(0); } posix_setsid();//设置新会话组长,脱离终端 //关闭打开的文件描述符 fclose(STDIN); //fclose(STDOUT); //fclose(STDERR); } self::single(); //设置进程名称 cli_set_process_title(self::$processName); self::setServerProcessOwner(); self::$masterPid = posix_getpid(); file_exists(self::$pidFile) && unlink(self::$pidFile); if (false === file_put_contents(self::$pidFile, self::$masterPid)) { throw new QueueException('保存 pid 信息到 pid 文件:' . self::$pidFile . '失败'); } chmod(self::$pidFile, 0777); $status = sprintf('%d|%s', self::$masterPid, self::$pidFile); if (false === file_put_contents(self::$statusFile, $status)) { throw new QueueException('保存状态信息到文件:' . self::$statusFile . '失败'); } chmod(self::$statusFile, 0777); } /** * 记录日志 * @param $content */ private static function log($content) { wlog($content, true, 5); } /** * 单实例运行 */ private static function single() { if (self::processHasRun()) { exit("程序已经在运行\r\n"); } } /** * 检查执行时候传的参数 */ private static function checkParam() { if (empty(self::$param[1]) || !in_array(self::$param[1], ['start', 'stop', 'restart', 'status'])) { exit('必须加参数,如:start|stop|restart|status'); } } /** * 安装信号处理器 */ private static function doSignal() { pcntl_signal(SIGINT, function ($signo) { //fprintf(STDOUT, "pid:" . posix_getpid() . "接收到一个信号,编号为:%d \n", $signo); if ($signo == '2') { self::stop(); } }); } /** * 检验进程已经存在 * @return bool */ private static function processHasRun(): bool { return (int)exec('ps -ef|grep ' . self::$processName . '|grep -v grep|wc -l') ? true : false; } /** * 获取当前运行进程的执行用户名 * @return string mixed */ private static function getCurrentProcessUser() { $user_info = posix_getpwuid(posix_getuid()); if (!$user_info) { exit('获取当前运行进程的用户信息失败' . "\r\n"); } return $user_info['name']; } /** * 设置当前脚本所属的用户名和用户组 */ private static function setServerProcessOwner() { self::$owner = self::$owner ? self::$owner : self::getCurrentProcessUser(); $info = posix_getpwnam(self::$owner); if (!$info) { exit('设置进程所属用户的过程中获取用户 ' . self::$owner . ' 的信息失败' . "\r\n"); } if (!posix_setgid($info['gid'])) { echo '设置进程所属用户组为 ' . $info['gid'] . ' 失败' . "\r\n"; } if (!posix_setuid($info['uid'])) { echo '设置进程所属用户名为 ' . $info['uid'] . ' 失败' . "\r\n"; } } }
2. 多进程,master worker 模式的队列服务
<?php include_once __DIR__ . DIRECTORY_SEPARATOR . '../../config.php'; include_once ROOT_DIR . 'lib/config.php'; include_once ROOT_DIR . 'lib/common.php'; include_once __DIR__ . DIRECTORY_SEPARATOR . 'Exception.php'; include_once ROOT_DIR . 'lib/ActionException.php'; ini_set('memory_limit', '32M');//设置最大内存使用量 $system_set = getSystemSet(); /** * @param int $errno * @param string $errstr * @param string $errfile * @param int $errline * @throws QueueException */ function error_handler($errno, $errstr, $errfile, $errline) { $content = sprintf('队列程序运行过程中发生错误,文件:%s,第 %d 行,具体错误:%s,调用流程:%s', $errfile, $errline, $errstr, getCallTraceStr()); wlog($content, true, 5, $GLOBALS['system_set'], ['action' => '', 'uid' => 0]); throw new QueueException($content, $errno); } function fatal_handle() { $error = error_get_last(); if (isset($error['type']) && $error['type'] == E_ERROR) { error_handler($error['type'], $error['message'], $error['file'], $error['line']); } } $wsRedis = getWebSocketRedis(); MultiProcessQueueServer::run(); /** * Class MultiProcessQueueServer * 多进程 redis 多队列服务类,支持多队列处理,不同的队列名可以设置不同的处理进程数 * * @version 0.0.1 * @link https://www.maopiaopiao.com/ * @author yhm.1234@163.com */ class MultiProcessQueueServer { public static $param = []; public static $statusFile = '/dev/shm/' . PROJECT . '-qs-multi.status'; public static $owner = PHP_SCRIPT_USER; public static $processName = PROJECT . '-queue-server'; public static $workerNum = 2; public static $masterPid = 0; public static $childProcessList = []; // ['pid' => ['worker_id' => '', 'queue' => '']] public static $queueList = [QUEUE_MASTER => ['worker_num' => 2]/*, QUEUE_TEST => ['worker_num' => 2]*/]; public static function start() { self::single(); self::runMaster(); self::installSignal(); self::writeStatus(); self::status(); self::monitorWorkerProcess(); } public static function stop($exit = true) { exec('pkill -f \'' . self::getMasterProcessName() . '\'', $output); foreach (self::$queueList as $key => $queue) { for ($i = 1; $i <= $queue['worker_num']; $i++) { exec('pkill -f \'' . self::getWorkerProcessName($key, $i) . '\'', $output); } } if (file_exists(self::$statusFile) && !unlink(self::$statusFile)) { echo "状态文件删除失败\r\n"; } $exit && exit("服务关闭成功\r\n"); } public static function restart() { self::stop(false); self::start(); } public static function status() { if (!self::masterProcessHasRun()) { exit("服务未运行\r\n"); } $status = file_get_contents(self::$statusFile); if (!$status) { exit("状态文件不存在\r\n"); } $status = explode('|', $status); $redis = getWebSocketRedis(); $start_time = fileatime(self::$statusFile); $now = getServerTimeNow(); $str = sprintf( "系统当前时间:%s\r\n启动时间:%s\r\n已运行:%s\r\n主进程名称:%s\r\n主进程id:%d\r\nworker进程id:%s\r\n进程所属用户:%s\r\n内存占用:%s\r\n内存占用峰值:%s\r\n当前内存占用:%s\r\n" . "处理任务数:丢包 %d 总 %d 入队成功 %d 入队失败 %d 处理成功 %d 处理失败 %d 待处理 %d 未知 %d\r\n处理任务统计时间段:%s - %s \r\n", date('Y-m-d H:i:s', $now), date('Y-m-d H:i:s', $start_time), secToTime($now - $start_time), self::getMasterProcessName(), $status[0], implode(' ', array_slice($status, 1)), self::$owner, getSizeDesc(memory_get_usage(true)), getSizeDesc(memory_get_peak_usage(true)), getSizeDesc(self::getMemUsedTotal() * 1024), $redis->get(getQueueCountKey('failIncrTotal')), $redis->get(getQueueCountKey('total')), $redis->get(getQueueCountKey('stotal')), $redis->get(getQueueCountKey('ftotal')), $redis->get(getQueueCountKey('success')), $redis->get(getQueueCountKey('fail')), $redis->lLen(QUEUE_MASTER), $redis->get(getQueueCountKey('stotal')) - $redis->get(getQueueCountKey('success')) - $redis->get(getQueueCountKey('fail')), date('Y-m-d H:i:s', $redis->get(getQueueCountKey('stime'))), date('Y-m-d H:i:s', $now) ); echo $str; } public static function initData() { $redis = getWebSocketRedis(false); $redis->set(getQueueCountKey('stime'), getServerTimeNow());//统计的开始时间的时间戳 $redis->set(getQueueCountKey('total'), 0);//发送数据的总数 $redis->set(getQueueCountKey('failIncrTotal'), 0);//数据压入队列时增长总数失败的个数 $redis->set(getQueueCountKey('stotal'), 0);//成功压入到队列的总数 $redis->set(getQueueCountKey('ftotal'), 0);//压入到队列失败的总数 $redis->set(getQueueCountKey('success'), 0);//任务处理成功的总数 $redis->set(getQueueCountKey('fail'), 0);//任务处理失败的总数 } public static function writeStatus() { $str = self::$masterPid . '|' . implode('|', array_keys(self::$childProcessList)); if (!file_put_contents(self::$statusFile, $str, LOCK_EX)) { exit("写入信息到状态文件失败\r\n"); } } public static function run() { exit("本程序仅能启动 worker 进程,没有定时任务进程,因此不推荐使用本程序,推荐使用本目录下的 multiProcessServer.php 多进程队列服务,该服务同时支持 worker 定时任务 子进程!\r\n"); self::init(); self::checkParams(); self::{$GLOBALS['argv'][1]}(); } /** * 单实例运行 */ private static function single() { if (self::masterProcessHasRun()) { exit("程序已经在运行\r\n"); } } private static function installSignal() { pcntl_signal(SIGINT, function ($signo) { //fprintf(STDOUT, "pid:" . posix_getpid() . "接收到一个信号,编号为:%d \n", $signo); if ($signo == '2') { //echo self::$masterPid ."\r\n";print_r(self::$childProcessList); self::stop(); } }); } private static function init() { strtolower(php_sapi_name()) != 'cli' && exit('仅允许在cli模式下运行'); set_time_limit(0); set_error_handler('error_handler'); register_shutdown_function('fatal_handle'); } private static function checkParams() { if (empty($GLOBALS['argv'][1])) { exit("必须输入参数,必须参数如:start|stop|restart|status\r\n"); } if (!in_array($GLOBALS['argv'][1], ['start', 'stop', 'restart', 'status', 'initData'])) { exit("参数输入错误,正确参数如:start|stop|restart|status\r\n"); } if (self::$workerNum < 1 || self::$workerNum > 9) { exit("worker数设置不正确\r\n"); } } //开启主进程 private static function runMaster() { //确保进程有最大操作权限 umask(0); if (!empty($GLOBALS['argv'][2]) && $GLOBALS['argv'][2] == '-d') { $pid = pcntl_fork(); if ($pid > 0) { exit(); } } self::$masterPid = getmypid(); foreach (self::$queueList as $key => $queue) { for ($i = 1; $i <= $queue['worker_num']; $i++) { self::runWorker($key, $i); } } cli_set_process_title(self::getMasterProcessName()); self::setServerProcessOwner(); } //开启子进程 private static function runWorker($queue_name, $worker_id) { umask(0); $pid = pcntl_fork(); if ($pid > 0) {//父进程执行空间 self::$childProcessList[$pid] = ['worker_id' => $worker_id, 'queue' => $queue_name]; } else if ($pid == 0) {//子进程执行空间 cli_set_process_title(self::getWorkerProcessName($queue_name, $worker_id)); self::setServerProcessOwner(); self::work($queue_name, $worker_id); } else { exit("创建子进程失败\r\n"); } } //监控worker进程 private static function monitorWorkerProcess() { while ($pid = pcntl_wait($status)) { $worker = self::$childProcessList[$pid]; pcntl_signal_dispatch(); if ($pid == -1) { break; } else { unset(self::$childProcessList[$pid]); self::runWorker($worker['queue'], $worker['worker_id']); self::writeStatus(); } } } /** * 业务处理逻辑 */ private static function work($queue_name, $worker_id) { while (true) { try { #信号分发 pcntl_signal_dispatch(); $redis = getWebSocketRedis(false); while ($value = $redis->blpop($queue_name, 5 + $worker_id)) { //self::log($value); if (!$value || !is_array($value) || empty($value[1])) { throw new QueueException('消息体格式错误', 1011); } $value = json_decode($value[1], true); if (json_last_error() || !$value || !is_array($value)) { throw new QueueException('消息体格式错误', 1012); } $file = __DIR__ . DIRECTORY_SEPARATOR . 'consumer/' . $value['type'] . '.php'; if (!file_exists($file)) { throw new QueueException('不存在的消费者类型:' . $value['type'], 1404); } require_once $file; $value['type']::run($value); $redis->incr(getQueueCountKey('success'));//队列任务成功处理的数 unset($value); } } catch (QueueException $exception) { saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', '队列处理失败:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value); $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数 self::log('队列执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode()); } catch (ActionException $exception) { saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', 'action 异常:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value); $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数 self::log('执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode()); } catch (Exception $exception) { saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', '未知异常:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value); $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数 self::log('发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode()); } } } private static function getMasterProcessName(): string { return self::$processName . ': master process'; } private static function getWorkerProcessName($queue_name, $worker_id): string { return self::$processName . ': worker process ' . $queue_name . ' ' . $worker_id; } /** * 检验进主程已经存在 * @return bool */ private static function masterProcessHasRun(): bool { return (int)exec('ps -ef|grep \'' . self::getMasterProcessName() . '\'|grep -v grep|wc -l') ? true : false; } /** * 获取当前运行进程的执行用户名 * @return string mixed */ private static function getCurrentProcessUser() { $user_info = posix_getpwuid(posix_getuid()); if (!$user_info) { exit('获取当前运行进程的用户信息失败' . "\r\n"); } return $user_info['name']; } /** * 设置当前脚本所属的用户名和用户组 */ private static function setServerProcessOwner() { self::$owner = self::$owner ? self::$owner : self::getCurrentProcessUser(); $info = posix_getpwnam(self::$owner); if (!$info) { exit('设置进程所属用户的过程中获取用户 ' . self::$owner . ' 的信息失败' . "\r\n"); } if (!posix_setgid($info['gid'])) { echo '设置进程所属用户组为 ' . $info['gid'] . ' 失败' . "\r\n"; } if (!posix_setuid($info['uid'])) { echo '设置进程所属用户名为 ' . $info['uid'] . ' 失败' . "\r\n"; } } /** * 记录日志 * @param $content */ private static function log($content) { wlog($content, true, 5); } /** * 获取所有进程的内存当前实际使用总量 * @return int */ private static function getMemUsedTotal(): int { $used = 0; $exec = 'ps -aux|grep \'' . self::getMasterProcessName() . '\'|grep -v \'grep\'|awk \'{print $6}\''; exec($exec, $output); $used += $output[0]; foreach (self::$queueList as $key => $queue) { for ($i = 1; $i <= $queue['worker_num']; $i++) { $exec = 'ps -aux|grep \'' . self::getWorkerProcessName($key, $i) . '\'|grep -v \'grep\'|awk \'{print $6}\''; exec($exec, $output); $used += $output[0]; } } return $used; } }