«

分享两个用php写的队列服务woker代码

时间:2024-2-22 14:02     作者:韩俊     分类: PHP


分享两个用php写的队列服务woker代码。

1. 单进程队列服务

<?php

include_once __DIR__ . DIRECTORY_SEPARATOR . '../../config.php';
include_once ROOT_DIR . 'lib/config.php';
include_once ROOT_DIR . 'lib/common.php';
include_once __DIR__ . DIRECTORY_SEPARATOR . 'Exception.php';
include_once ROOT_DIR . 'lib/ActionException.php';

ini_set('memory_limit', '32M');//设置最大内存使用量

/**
 * @param int $errno
 * @param string $errstr
 * @param string $errfile
 * @param int $errline
 * @throws QueueException
 */
function error_handler($errno, $errstr, $errfile, $errline) {
    $content = sprintf('队列程序运行过程中发生错误,文件:%s,第 %d 行,具体错误:%s,调用流程:%s', $errfile, $errline, $errstr, getCallTraceStr());
    wlog($content, true, 5);
    throw new QueueException($content, $errno);
}

function fatal_handle() {
    $error = error_get_last();
    if (isset($error['type']) && $error['type'] == E_ERROR) {
        error_handler($error['type'], $error['message'], $error['file'], $error['line']);
    }
}

$wsRedis = getWebSocketRedis();
QueueServer::run($argv, PHP_SCRIPT_USER);

/**
 * Class QueueServer
 * 单进程 redis 队列服务类
 *
 * @version 0.0.1
 * @link https://www.maopiaopiao.com/
 * @author yhm.1234@163.com
 */
class QueueServer {

    static $param = [];
    static $masterPid = 0;
    static $pidFile = '/dev/shm/' . PROJECT . '-qs.pid';
    static $statusFile = '/dev/shm/' . PROJECT . '-qs.status';
    static $owner = '';
    static $processName = PROJECT . '-queue-server';

    public static function run($param, $owner = '') {
        exit("本程序为单进程运行,随时都会因为程序错误而导致进程终止,因此不推荐使用本程序,推荐使用本目录下的 multiProcessServer.php 多进程队列服务\r\n");
        self::$param = $param;
        self::$owner = $owner;
        self::checkParam();
        self::{$param[1]}();
    }

    /**
     * 开启
     * @throws QueueException
     */
    private static function start() {
        self::init();
        self::daemonize();
        self::status();
        self::doSignal();
        self::work();
    }

    private static function init() {
        strtolower(php_sapi_name()) != 'cli' && exit('仅允许在cli模式下运行');
        set_time_limit(0);
        set_error_handler('error_handler');
        register_shutdown_function('fatal_handle');
    }

    private static function work() {
        while (true) {
            #信号分发
            pcntl_signal_dispatch();
            $redis = getWebSocketRedis(false);
            while ($value = $redis->blpop(QUEUE_MASTER, 6)) {
                //self::log($value);
                try {
                    if (!$value || !is_array($value) || empty($value[1])) {
                        throw new QueueException('消息体格式错误', 1011);
                    }
                    $value = json_decode($value[1], true);
                    if (json_last_error() || !$value || !is_array($value)) {
                        throw new QueueException('消息体格式错误', 1012);
                    }
                    $file = __DIR__ . DIRECTORY_SEPARATOR . 'consumer/' . $value['type'] . '.php';
                    if (!file_exists($file)) {
                        throw new QueueException('不存在的消费者类型', 1404);
                    }
                    require_once $file;
                    $value['type']::run($value);
                    $redis->incr(getQueueCountKey('success'));//队列任务成功处理的数
                } catch (QueueException $exception) {
                    $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
                    self::log('队列执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
                } catch (ActionException $exception) {
                    $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
                    self::log('执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
                } catch (Exception $exception) {
                    $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
                    self::log('发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
                }

                unset($value);
            }
        }
    }

    /**
     * 停止
     * @param bool $exit
     */
    private static function stop($exit = true) {
        /*if (!self::processHasRun()) {
            exit("服务未运行\r\n");
        }*/
        if (!file_exists(self::$pidFile)) {
            echo "警告:关闭服务过程中发现 pid 文件不存在\r\n";
        } else {
            if (!unlink(self::$pidFile)) {
                echo "警告:关闭服务过程中删除 pid 文件失败\r\n";
            }
        }
        exec('pkill -f ' . self::$processName);
        $exit && exit("服务关闭成功\r\n");
    }

    /**
     * 重启
     */
    private static function restart() {
        self::stop(false);
        self::start();
    }

    /**
     * 输出服务当前的状态信息
     */
    private static function status() {
        if (!self::processHasRun() || !file_exists(self::$pidFile)) {
            exit("服务未运行\r\n");
        }

        $status = file_get_contents(self::$statusFile);
        if (!$status) {
            exit("状态文件不存在\r\n");
        }

        $status = explode('|', $status);
        $redis = getWebSocketRedis();

        $start_time = fileatime($status[1]);
        $str = sprintf(
            "系统当前时间:%s\r\n进程名称:%s\r\n主进程id:%d\r\n进程所属用户:%s\r\n启动时间:%s\r\n已运行:%s\r\n内存占用:%s\r\n内存占用峰值:%s\r\n" .
            "处理任务数:丢包 %d 总 %d 入队成功 %d 入队失败 %d 处理成功 %d 处理失败 %d 待处理 %d\r\n",
            date('Y-m-d H:i:s', getServerTimeNow()),
            self::$processName,
            $status[0],
            self::$owner,
            date('Y-m-d H:i:s', $start_time),
            secToTime(getServerTimeNow() - $start_time),
            getSizeDesc(memory_get_usage(true)),
            getSizeDesc(memory_get_peak_usage(true)),
            $redis->get(getQueueCountKey('failIncrTotal')),
            $redis->get(getQueueCountKey('total')),
            $redis->get(getQueueCountKey('stotal')),
            $redis->get(getQueueCountKey('ftotal')),
            $redis->get(getQueueCountKey('success')),
            $redis->get(getQueueCountKey('fail')),
            $redis->lLen(QUEUE_MASTER)
        );

        echo $str;

        if ($GLOBALS['argv'] == 'status') {
            exit();
        }
    }

    /**
     * 检查是否以守护进程的模式运行
     * @throws QueueException
     */
    private static function daemonize() {
        if (!empty(self::$param[2]) && self::$param[2] == '-d') {
            $pid = pcntl_fork();
            if (-1 === $pid) {
                exit("守护进程创建失败\r\n");
            } elseif ($pid > 0) {
                //主进程会在这里退出,下面的代码还会继续执行,不过是子进程接手继续执行的
                exit(0);
            }

            posix_setsid();//设置新会话组长,脱离终端
            //关闭打开的文件描述符
            fclose(STDIN);
            //fclose(STDOUT);
            //fclose(STDERR);
        }

        self::single();

        //设置进程名称
        cli_set_process_title(self::$processName);
        self::setServerProcessOwner();

        self::$masterPid = posix_getpid();

        file_exists(self::$pidFile) && unlink(self::$pidFile);

        if (false === file_put_contents(self::$pidFile, self::$masterPid)) {
            throw new QueueException('保存 pid 信息到 pid 文件:' . self::$pidFile . '失败');
        }

        chmod(self::$pidFile, 0777);

        $status = sprintf('%d|%s', self::$masterPid, self::$pidFile);

        if (false === file_put_contents(self::$statusFile, $status)) {
            throw new QueueException('保存状态信息到文件:' . self::$statusFile . '失败');
        }

        chmod(self::$statusFile, 0777);
    }

    /**
     * 记录日志
     * @param $content
     */
    private static function log($content) {
        wlog($content, true, 5);
    }

    /**
     * 单实例运行
     */
    private static function single() {
        if (self::processHasRun()) {
            exit("程序已经在运行\r\n");
        }
    }

    /**
     * 检查执行时候传的参数
     */
    private static function checkParam() {
        if (empty(self::$param[1]) || !in_array(self::$param[1], ['start', 'stop', 'restart', 'status'])) {
            exit('必须加参数,如:start|stop|restart|status');
        }
    }

    /**
     * 安装信号处理器
     */
    private static function doSignal() {
        pcntl_signal(SIGINT, function ($signo) {
            //fprintf(STDOUT, "pid:" . posix_getpid() . "接收到一个信号,编号为:%d \n", $signo);
            if ($signo == '2') {
                self::stop();
            }
        });
    }

    /**
     * 检验进程已经存在
     * @return bool
     */
    private static function processHasRun(): bool {
        return (int)exec('ps -ef|grep ' . self::$processName . '|grep -v grep|wc -l') ? true : false;
    }

    /**
     * 获取当前运行进程的执行用户名
     * @return string mixed
     */
    private static function getCurrentProcessUser() {
        $user_info = posix_getpwuid(posix_getuid());

        if (!$user_info) {
            exit('获取当前运行进程的用户信息失败' . "\r\n");
        }

        return $user_info['name'];
    }

    /**
     * 设置当前脚本所属的用户名和用户组
     */
    private static function setServerProcessOwner() {
        self::$owner = self::$owner ? self::$owner : self::getCurrentProcessUser();
        $info = posix_getpwnam(self::$owner);

        if (!$info) {
            exit('设置进程所属用户的过程中获取用户 ' . self::$owner . ' 的信息失败' . "\r\n");
        }

        if (!posix_setgid($info['gid'])) {
            echo '设置进程所属用户组为 ' . $info['gid'] . ' 失败' . "\r\n";
        }

        if (!posix_setuid($info['uid'])) {
            echo '设置进程所属用户名为 ' . $info['uid'] . ' 失败' . "\r\n";
        }
    }
}

2. 多进程,master worker 模式的队列服务

<?php

include_once __DIR__ . DIRECTORY_SEPARATOR . '../../config.php';
include_once ROOT_DIR . 'lib/config.php';
include_once ROOT_DIR . 'lib/common.php';
include_once __DIR__ . DIRECTORY_SEPARATOR . 'Exception.php';
include_once ROOT_DIR . 'lib/ActionException.php';

ini_set('memory_limit', '32M');//设置最大内存使用量

$system_set = getSystemSet();

/**
 * @param int $errno
 * @param string $errstr
 * @param string $errfile
 * @param int $errline
 * @throws QueueException
 */
function error_handler($errno, $errstr, $errfile, $errline) {
    $content = sprintf('队列程序运行过程中发生错误,文件:%s,第 %d 行,具体错误:%s,调用流程:%s', $errfile, $errline, $errstr, getCallTraceStr());
    wlog($content, true, 5, $GLOBALS['system_set'], ['action' => '', 'uid' => 0]);
    throw new QueueException($content, $errno);
}

function fatal_handle() {
    $error = error_get_last();
    if (isset($error['type']) && $error['type'] == E_ERROR) {
        error_handler($error['type'], $error['message'], $error['file'], $error['line']);
    }
}

$wsRedis = getWebSocketRedis();
MultiProcessQueueServer::run();

/**
 * Class MultiProcessQueueServer
 * 多进程 redis 多队列服务类,支持多队列处理,不同的队列名可以设置不同的处理进程数
 *
 * @version 0.0.1
 * @link https://www.maopiaopiao.com/
 * @author yhm.1234@163.com
 */
class MultiProcessQueueServer {

    public static $param = [];
    public static $statusFile = '/dev/shm/' . PROJECT . '-qs-multi.status';
    public static $owner = PHP_SCRIPT_USER;
    public static $processName = PROJECT . '-queue-server';
    public static $workerNum = 2;
    public static $masterPid = 0;
    public static $childProcessList = []; // ['pid' => ['worker_id' => '', 'queue' => '']]
    public static $queueList = [QUEUE_MASTER => ['worker_num' => 2]/*, QUEUE_TEST => ['worker_num' => 2]*/];

    public static function start() {
        self::single();
        self::runMaster();
        self::installSignal();
        self::writeStatus();
        self::status();
        self::monitorWorkerProcess();
    }

    public static function stop($exit = true) {
        exec('pkill -f \'' . self::getMasterProcessName() . '\'', $output);
        foreach (self::$queueList as $key => $queue) {
            for ($i = 1; $i <= $queue['worker_num']; $i++) {
                exec('pkill -f \'' . self::getWorkerProcessName($key, $i) . '\'', $output);
            }
        }
        if (file_exists(self::$statusFile) && !unlink(self::$statusFile)) {
            echo "状态文件删除失败\r\n";
        }
        $exit && exit("服务关闭成功\r\n");
    }

    public static function restart() {
        self::stop(false);
        self::start();
    }

    public static function status() {
        if (!self::masterProcessHasRun()) {
            exit("服务未运行\r\n");
        }

        $status = file_get_contents(self::$statusFile);
        if (!$status) {
            exit("状态文件不存在\r\n");
        }

        $status = explode('|', $status);
        $redis = getWebSocketRedis();

        $start_time = fileatime(self::$statusFile);
        $now = getServerTimeNow();
        $str = sprintf(
            "系统当前时间:%s\r\n启动时间:%s\r\n已运行:%s\r\n主进程名称:%s\r\n主进程id:%d\r\nworker进程id:%s\r\n进程所属用户:%s\r\n内存占用:%s\r\n内存占用峰值:%s\r\n当前内存占用:%s\r\n" .
            "处理任务数:丢包 %d 总 %d 入队成功 %d 入队失败 %d 处理成功 %d 处理失败 %d 待处理 %d 未知 %d\r\n处理任务统计时间段:%s - %s \r\n",
            date('Y-m-d H:i:s', $now),
            date('Y-m-d H:i:s', $start_time),
            secToTime($now - $start_time),
            self::getMasterProcessName(),
            $status[0],
            implode(' ', array_slice($status, 1)),
            self::$owner,
            getSizeDesc(memory_get_usage(true)),
            getSizeDesc(memory_get_peak_usage(true)),
            getSizeDesc(self::getMemUsedTotal() * 1024),
            $redis->get(getQueueCountKey('failIncrTotal')),
            $redis->get(getQueueCountKey('total')),
            $redis->get(getQueueCountKey('stotal')),
            $redis->get(getQueueCountKey('ftotal')),
            $redis->get(getQueueCountKey('success')),
            $redis->get(getQueueCountKey('fail')),
            $redis->lLen(QUEUE_MASTER),
            $redis->get(getQueueCountKey('stotal')) - $redis->get(getQueueCountKey('success')) - $redis->get(getQueueCountKey('fail')),
            date('Y-m-d H:i:s', $redis->get(getQueueCountKey('stime'))),
            date('Y-m-d H:i:s', $now)
        );

        echo $str;
    }

    public static function initData() {
        $redis = getWebSocketRedis(false);
        $redis->set(getQueueCountKey('stime'), getServerTimeNow());//统计的开始时间的时间戳
        $redis->set(getQueueCountKey('total'), 0);//发送数据的总数
        $redis->set(getQueueCountKey('failIncrTotal'), 0);//数据压入队列时增长总数失败的个数
        $redis->set(getQueueCountKey('stotal'), 0);//成功压入到队列的总数
        $redis->set(getQueueCountKey('ftotal'), 0);//压入到队列失败的总数
        $redis->set(getQueueCountKey('success'), 0);//任务处理成功的总数
        $redis->set(getQueueCountKey('fail'), 0);//任务处理失败的总数
    }

    public static function writeStatus() {
        $str = self::$masterPid . '|' . implode('|', array_keys(self::$childProcessList));
        if (!file_put_contents(self::$statusFile, $str, LOCK_EX)) {
            exit("写入信息到状态文件失败\r\n");
        }
    }

    public static function run() {
        exit("本程序仅能启动 worker 进程,没有定时任务进程,因此不推荐使用本程序,推荐使用本目录下的 multiProcessServer.php 多进程队列服务,该服务同时支持 worker 定时任务 子进程!\r\n");
        self::init();
        self::checkParams();
        self::{$GLOBALS['argv'][1]}();
    }

    /**
     * 单实例运行
     */
    private static function single() {
        if (self::masterProcessHasRun()) {
            exit("程序已经在运行\r\n");
        }
    }

    private static function installSignal() {
        pcntl_signal(SIGINT, function ($signo) {
            //fprintf(STDOUT, "pid:" . posix_getpid() . "接收到一个信号,编号为:%d \n", $signo);
            if ($signo == '2') {
                //echo self::$masterPid ."\r\n";print_r(self::$childProcessList);
                self::stop();
            }
        });
    }

    private static function init() {
        strtolower(php_sapi_name()) != 'cli' && exit('仅允许在cli模式下运行');
        set_time_limit(0);
        set_error_handler('error_handler');
        register_shutdown_function('fatal_handle');
    }

    private static function checkParams() {
        if (empty($GLOBALS['argv'][1])) {
            exit("必须输入参数,必须参数如:start|stop|restart|status\r\n");
        }

        if (!in_array($GLOBALS['argv'][1], ['start', 'stop', 'restart', 'status', 'initData'])) {
            exit("参数输入错误,正确参数如:start|stop|restart|status\r\n");
        }

        if (self::$workerNum < 1 || self::$workerNum > 9) {
            exit("worker数设置不正确\r\n");
        }
    }

    //开启主进程
    private static function runMaster() {
        //确保进程有最大操作权限
        umask(0);

        if (!empty($GLOBALS['argv'][2]) && $GLOBALS['argv'][2] == '-d') {
            $pid = pcntl_fork();
            if ($pid > 0) {
                exit();
            }
        }

        self::$masterPid = getmypid();

        foreach (self::$queueList as $key => $queue) {
            for ($i = 1; $i <= $queue['worker_num']; $i++) {
                self::runWorker($key, $i);
            }
        }

        cli_set_process_title(self::getMasterProcessName());
        self::setServerProcessOwner();
    }

    //开启子进程
    private static function runWorker($queue_name, $worker_id) {
        umask(0);
        $pid = pcntl_fork();
        if ($pid > 0) {//父进程执行空间
            self::$childProcessList[$pid] = ['worker_id' => $worker_id, 'queue' => $queue_name];
        } else if ($pid == 0) {//子进程执行空间
            cli_set_process_title(self::getWorkerProcessName($queue_name, $worker_id));
            self::setServerProcessOwner();
            self::work($queue_name, $worker_id);
        } else {
            exit("创建子进程失败\r\n");
        }
    }

    //监控worker进程
    private static function monitorWorkerProcess() {
        while ($pid = pcntl_wait($status)) {
            $worker = self::$childProcessList[$pid];
            pcntl_signal_dispatch();
            if ($pid == -1) {
                break;
            } else {
                unset(self::$childProcessList[$pid]);
                self::runWorker($worker['queue'], $worker['worker_id']);
                self::writeStatus();
            }
        }
    }

    /**
     * 业务处理逻辑
     */
    private static function work($queue_name, $worker_id) {
        while (true) {
            try {
                #信号分发
                pcntl_signal_dispatch();
                $redis = getWebSocketRedis(false);
                while ($value = $redis->blpop($queue_name, 5 + $worker_id)) {
                    //self::log($value);
                    if (!$value || !is_array($value) || empty($value[1])) {
                        throw new QueueException('消息体格式错误', 1011);
                    }
                    $value = json_decode($value[1], true);
                    if (json_last_error() || !$value || !is_array($value)) {
                        throw new QueueException('消息体格式错误', 1012);
                    }
                    $file = __DIR__ . DIRECTORY_SEPARATOR . 'consumer/' . $value['type'] . '.php';
                    if (!file_exists($file)) {
                        throw new QueueException('不存在的消费者类型:' . $value['type'], 1404);
                    }
                    require_once $file;
                    $value['type']::run($value);
                    $redis->incr(getQueueCountKey('success'));//队列任务成功处理的数
                    unset($value);
                }
            } catch (QueueException $exception) {
                saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', '队列处理失败:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value);
                $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
                self::log('队列执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
            } catch (ActionException $exception) {
                saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', 'action 异常:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value);
                $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
                self::log('执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
            } catch (Exception $exception) {
                saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', '未知异常:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value);
                $redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
                self::log('发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
            }
        }
    }

    private static function getMasterProcessName(): string {
        return self::$processName . ': master process';
    }

    private static function getWorkerProcessName($queue_name, $worker_id): string {
        return self::$processName . ': worker process ' . $queue_name . ' ' . $worker_id;
    }

    /**
     * 检验进主程已经存在
     * @return bool
     */
    private static function masterProcessHasRun(): bool {
        return (int)exec('ps -ef|grep \'' . self::getMasterProcessName() . '\'|grep -v grep|wc -l') ? true : false;
    }

    /**
     * 获取当前运行进程的执行用户名
     * @return string mixed
     */
    private static function getCurrentProcessUser() {
        $user_info = posix_getpwuid(posix_getuid());

        if (!$user_info) {
            exit('获取当前运行进程的用户信息失败' . "\r\n");
        }

        return $user_info['name'];
    }

    /**
     * 设置当前脚本所属的用户名和用户组
     */
    private static function setServerProcessOwner() {
        self::$owner = self::$owner ? self::$owner : self::getCurrentProcessUser();
        $info = posix_getpwnam(self::$owner);

        if (!$info) {
            exit('设置进程所属用户的过程中获取用户 ' . self::$owner . ' 的信息失败' . "\r\n");
        }

        if (!posix_setgid($info['gid'])) {
            echo '设置进程所属用户组为 ' . $info['gid'] . ' 失败' . "\r\n";
        }

        if (!posix_setuid($info['uid'])) {
            echo '设置进程所属用户名为 ' . $info['uid'] . ' 失败' . "\r\n";
        }
    }

    /**
     * 记录日志
     * @param $content
     */
    private static function log($content) {
        wlog($content, true, 5);
    }

    /**
     * 获取所有进程的内存当前实际使用总量
     * @return int
     */
    private static function getMemUsedTotal(): int {
        $used = 0;

        $exec = 'ps -aux|grep \'' . self::getMasterProcessName() . '\'|grep -v \'grep\'|awk \'{print $6}\'';
        exec($exec, $output);

        $used += $output[0];

        foreach (self::$queueList as $key => $queue) {
            for ($i = 1; $i <= $queue['worker_num']; $i++) {
                $exec = 'ps -aux|grep \'' . self::getWorkerProcessName($key, $i) . '\'|grep -v \'grep\'|awk \'{print $6}\'';
                exec($exec, $output);
                $used += $output[0];
            }
        }

        return $used;
    }
}

标签: php php教程

热门推荐