记一次hyperf框架封装swoole自定义进程

背景

公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。

自定义启动服务封装
<?php
/*** 进程启动服务【manager】*/
declare(strict_types=1);namespace App\Command;use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;/*** @Command*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';/*** @var ContainerInterface*/protected $container;protected $coroutine = false;public function __construct(ContainerInterface $container){$this->container = $container;parent::__construct('task');}public function configure(){parent::configure();$this->setDescription('自定义进程任务');$this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');$this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');}public function handle(){$action = $this->input->getArgument('action');if ($action === 'start') {$this->start();} elseif ($action === 'stop') {$this->stop();} elseif ($action === 'restart') {$this->restart();} else {echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;}}/*** 重启:php bin/hyperf.php task restart*/protected function restart(){$this->stop();$this->start();}/*** 停止:php bin/hyperf.php task stop*/protected function stop(){if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {//后期可以写入数据表,根据状态进行重启$managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);echo "stopping...\n";echo "kill pid $managerPid \n";$managerPid = intval($managerPid);$startTime = time();$timeout = config('server.settings.max_wait_time', 10);@Process::kill($managerPid);//等待主进程结束while (@Process::kill($managerPid, 0)) {//waiting process stopecho "waiting...\r";usleep(100000);echo "              \r";echo "waiting.\r";usleep(100000);echo "              \r";//超时 强杀所有子进程if ($managerPid > 0 && time() - $startTime >= $timeout) {echo "wait timeout, kill -9 child process, pid: $managerPid \n";echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;}}unlink(self::MANAGER_PROCESS_PID_PATH);echo "stopped. \n";} else {echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;}}/*** 启动:php bin/hyperf.php task start* 守护进程启动:php bin/hyperf.php task start -d*/protected function start(){$processConfig = config('processes');if ($processConfig) {echo "start now.\n";$daemonize = $this->input->getOption('daemonize');if ($daemonize) {//重定向标准输出到指定日志文件fclose(STDOUT);fclose(STDERR);$STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');$STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');Process::daemon(true, true);}//save pidfile_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());//TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程BaseProcess::setProcessName('manager');//主进程$startFuncMap = [];foreach ($processConfig as $processClass) {$processObj = new $processClass;if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {for ($i = 0; $i < $processObj->nums; $i++) {$startFuncMap[] = [[$processObj, 'handle'],$processObj->enableCoroutine ?? false,$i,];}}}$pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);$pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {[$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];if ($enableCoroutine) {run(function () use ($func, $pool, $workerId, $idx) {$pm = $func[0];//process下的类$idx += 1;BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程call_user_func($func, $pool, $workerId);});} else {$func($pool, $workerId);//baseProcess下的handle}});$pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {echo 'process Message,data=' .json_encode($data). PHP_EOL;});//进程关闭$pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {echo "process WorkerId={$workerId} is stopped". PHP_EOL;});$pool->start();} else {printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);}}/*** 查看运行状态:php bin/hyperf.php task status*/protected function status(){//TODO 查看任务执行状态}public function getProcess($pid = -1){if ($pid === -1) {$pid = getmypid();}return static::$process[$pid] ?? null;}public function getAllProcess(){return static::$process;}
}
基础process封装

此处可以用hyperf框架自带的,也可以自己封装

<?phpdeclare (strict_types = 1);namespace App\Process;use Swoole;
use Swoole\Process\Pool;abstract class BaseProcess {/*** 进程数* @var integer*/public $nums = 0;/*** 进程名称* @var string*/public $name = '';/*** 是否启用协程* @var bool*/public $enableCoroutine = true;/*** 是否随进程启动服务* @var bool*/public $isEnable = true;protected $isRunning = true;protected $process;static $signal = 0;function __construct() {//进程自动命名if (empty($this->name)) {$this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');}}final public function handle(Pool $pool, int $workerId): void {try {$this->processInit($pool->getProcess());$this->beforeRun();while (true) {//进程结束信号if (BaseProcess::$signal === SIGTERM) {$this->onProcessExit();break;}$this->run();}} catch (\Throwable $e) {throw $e;}}protected function onProcessExit() {$this->isRunning = false;}protected function processInit($process) {$this->process = $process;echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;//注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)pcntl_signal(SIGTERM, function () {BaseProcess::$signal = SIGTERM;$maxWaitTime = config('server.settings.max_wait_time', 5);$sTime = time();//检查进程任务状态Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {$coStat = \Swoole\Coroutine::stats();//如果主循环结束,且其它协程任务执行完,清理定时器以退出进程if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {Swoole\Timer::clearAll();$this->process->exit();}//等待超时,强制结束进程elseif (time() - $sTime >= $maxWaitTime) {Swoole\Timer::clearAll();if ($this->isRunning) {$this->onProcessExit();}$this->process->exit();}});});}public static function setProcessName(string $name) {swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);}/*** 事件循环前调用* @return [type] [description]*/abstract function beforeRun();/*** 事件循环,注意这里不能使用死循环* @return [type] [description]*/abstract function run();}
使用demo

demo1

<?phpdeclare (strict_types = 1);namespace App\Process;/*** test*/
class TestProcess extends BaseProcess {/*** 进程数* @var integer*/public $nums = 5;public $enableCoroutine = true;/*** 不随服务启动进程* @var bool */public $isEnable = false;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体echo date('Y-m-d H:i:s').PHP_EOL;usleep(1000);}}

demo2

<?phpnamespace App\Process;use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;class UpdateZeroStock extends BaseProcess
{const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';/*** 进程数* @var integer*/public $nums = 1;public $enableCoroutine = true;/*** 随服务启动进程* @var bool */public $isEnable=true;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体$this->updateZeroStock();echo date('Y-m-d H:i:s').PHP_EOL;sleep(300);}public function updateZeroStock(){// 1.全量更新$list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();$container = ApplicationContext::getContainer();$redis = $container->get(Redis::class);$producer = ApplicationContext::getContainer()->get(Producer::class);$today = date('Y-m-d');if($list_hq){foreach ($list_hq as $item){$spu = trim($item['spu']);$zeroStockKey =  $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){$sendData = $item;$sendData['appKey'] = $this->appSecretKey();$sendData['platform'] = 'hqchip';$message = new UpdateZeroStockProducer($sendData);$res = $producer->produce($message);echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;}}}}/*** 零库存缓存KEY* @param $brand* @param $sku* @return string*/private function getZeroStockKey($day,$platfrom,$brand){return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;}/*** 密钥生产* @return string*/private function appSecretKey(){$a = 'chipmall-spider&V2&' . date('Y-m-d');$appKey = base64_encode(md5($a)   .'||'. base64_encode(time() . '|' . $a));return $appKey;}
}
在配置中进程需要执行的服务

在这里插入图片描述

以守护进程方式启动服务
php bin/hyperf.php task start -d

在这里插入图片描述
查看进程命令

ps -ef|grep taskProcess

在这里插入图片描述

疑惑

这次封装还存在两个点需要完善!!!
1.重复执行:

php bin/hyperf.php task start -d

会启动多个manager进程

2、没有封装查看进程状态的status方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/143588.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Android 编译插桩操纵字节码

本文讲解如何编译插桩操纵字节码。 就使用 ASM 来实现简单的编译插桩效果&#xff0c;通过插桩实现在每一个 Activity 打开时输出相应的 log 日志。实现思路 过程主要包含两步&#xff1a; 1、遍历项目中所有的 .class 文件​ 如何找到项目中编译生成的所有 .class 文件&#…

pycharm中恢复原始界面布局_常用快捷键_常用设置

文章目录 1 恢复默认布局1 .1直接点击file→Manage IDE Settings→Restore Default Settings&#xff08;如下图所示&#xff09;&#xff1a;1.2 直接点击Restore and Restart&#xff0c; 然后Pycharm就会自动重启&#xff0c;重启之后的界面就是最原始的界面了 2 改变主题2.…

时序预测 | MATLAB实现NGO-GRU北方苍鹰算法优化门控循环单元时间序列预测

时序预测 | MATLAB实现NGO-GRU北方苍鹰算法优化门控循环单元时间序列预测 目录 时序预测 | MATLAB实现NGO-GRU北方苍鹰算法优化门控循环单元时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现NGO-GRU北方苍鹰算法优化门控循环单元时间序列预测&#…

接口测试——接口协议抓包分析与mock_L2

目录&#xff1a; 抓包工具charles抓包工具fiddler抓包工具证书配置app抓包实战练习接口测试实战练习 1.抓包工具charles 工具介绍 支持 SSL 代理支持流量控制支持重发网络请求&#xff0c;方便后端调试支持修改网络请求参数支持网络请求的截获并动态修改可以自动将 json 或…

探索Moonbeam路由流动性的强大功能

Moonbeam的GMP预编译作为MRL的接口&#xff0c;有助于将带有Token的消息从GMP协议&#xff08;通过XCMP&#xff09;传输到与Moonbeam链接的平行链。 为何是个重磅消息&#xff1f;因为这项技术使得将流动性从外部区块链转移到其他波卡平行链成为可能&#xff01; 这里补充一…

Python 3.12.0 正式版即将发布!

导读Python 3.12.0 发布了第 2 个 RC 版本&#xff0c;也是最后一个 RC。正式版将于 2023 年 10 月 2 日星期一发布。 开发团队表示&#xff0c;进入候选版本阶段后&#xff0c;只接受经过 review 且修复明确错误的代码。RC2 是发现并修复重要问题的最后机会。 从该版本开始&a…

Mini Linux嵌入式设备服务器

Digi International推出了具有Digi Embedded Linux的Digi Connect ME 9210。Digi Embedded Linux是为在Digi嵌入式模块和微控制器上开发而优化的最新版本。高性能嵌入式开发服务器大约只有一对骰子大小&#xff0c;是嵌入式Linux上最小的。这使OEM可以在空间受限的设备中使用Li…

【产品运营】如何做好B端产品规划

产品规划是基于当下掌握的多维度信息&#xff0c;为追求特定目的&#xff0c;而制定的产品资源投入计划。 产品规划是基于当下掌握的多维度信息&#xff08;客户需求、市场趋势、竞争对手、竞争策略等&#xff09;&#xff0c;为追求特定目的&#xff08;商业增长、客户满意等&…

SSM - Springboot - MyBatis-Plus 全栈体系(十三)

第三章 MyBatis 一、MyBatis 简介 1. 简介 MyBatis 最初是 Apache 的一个开源项目 iBatis, 2010 年 6 月这个项目由 Apache Software Foundation 迁移到了 Google Code。随着开发团队转投 Google Code 旗下&#xff0c; iBatis3.x 正式更名为 MyBatis。代码于 2013 年 11 月迁…

一文教你如何配置路由策略

【微|信|公|众|号&#xff1a;厦门微思网络】 微思-课程介绍 组网需求 如图1所示&#xff0c;某公司的部门A和部门B相距较远&#xff0c;Router_1和Router_6分别作为这两个部门的出口设备&#xff0c;AS 100内部使用OSPF作为IGP。现要求&#xff1a; 通过部署BGP&#xff0c;使…

每日一练 | 华为认证真题练习Day115

1、FEC(Forwarding Equivalence Class)转发等价类&#xff0c;是一组具有某些共性的数据流的集合&#xff1b;FEC可以根据地址进行划分&#xff0c;但是不能根据业务类型、QoS等要素进行划分。 A. 对 B. 错 2、关于OSI参考模型中网络层的功能说法正确的是&#xff1f; A. OS…

阿里云服务器共享型和企业级独享有什么区别?

阿里云ECS云服务器共享型和企业级有什么区别&#xff1f;企业级就是独享型&#xff0c;共享型和企业级云的主要区别CPU调度模式&#xff0c;共享型是非绑定CPU调度模式&#xff0c;企业级是固定CPU调度模式&#xff0c;共享型云服务器在高负载时计算性能可能出现波动不稳定&…

PHP自动识别采集何意网址文章正文内容

在做PHP采集内容时&#xff0c;用过querylist采集组件&#xff0c;但是这个插件采集页面内容时&#xff0c;都必须要写个采集选择器。这样比较麻烦&#xff0c;每个文章页面都必须指定一条采集规则 。就开始着手找一个插件可以能自动识别任意文章url正文内容并采集的&#xff0…

Python:Django框架的Hello wrold示例

Django是Python的目前很常用的web框架&#xff0c;遵循MVC设计模式。 以下介绍如何安装Django框架&#xff0c;并生成最简单的项目&#xff0c;输出Hello world。(开发工具VScode) 一、安装Django 在VScode终端控制台执行以下指令安装Django python install django 如果要查…

前端新轮子Nue,号称替代Vue、React和Svelte

新的简约前端开发工具集Nue.js 于周三发布。在 Hacker News 上介绍它时&#xff0c;前端开发者和Nue.js 的创作者Tero Piirainen表示&#xff0c;它是 React、Vue、Next.js、Vite、Svelte 和 Astro 的替代品。他在 Nue.js的 FAQ 中进一步解释说&#xff0c;它是为网站和响应式用…

Chrome获取RequestId

Chrome获取RequestId 参考&#xff1a;https://help.aliyun.com/zh/redis/how-do-i-obtain-the-id-of-a-request 在浏览器页面按下F12键&#xff0c;打开开发者工具页面&#xff1b; 在开发者工具页面&#xff0c;单击Network(网络)&#xff1b; 在playload(载荷)窗口中找到目…

Nginx代理victoriametrics集群配置

1,首先安装nginx yum install -y nginx 2,生成密钥文件 安装htpasswd工具 yum install -y httpd-tools 生成密钥文件,prometheus为用户名 htpasswd -c /etc/nginx/conf.d/passwd prometheus 3,修改nginx配置文件nginx.conf,增加如下内容 upstream vmselect {server 10.…

【新版】系统架构设计师 - 案例分析 - 架构设计<SOA与微服务>

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 案例分析 - 架构设计&#xff1c;SOA与微服务&#xff1e;例题1例题2例题3例题4 架构 - 案例分析 - 架构设计&#xff1c;SOA与微服务&#xff1e; 这里SOA与微服务的例题只对应找寻了几个&#x…

微信小程序 工具使用(HBuilderX)

微信小程序 工具使用:HBuilderX 一 HBuilderX 的下载二 工具的配置2.1 工具 --> 设置 --> 运行配置2.1.1 微信开发者工具路径2.1.2 node 运行配置 2.2 插件 工具 --> 插件安装2.2.1 下载插件 三 微信小程序端四 同步运行五 BUG5.1 nodemon在终端无法识别 一 HBuilderX…

ubuntu中的系统消息中显卡显示llvmpipe (LLVM 10.0.0, 256 bits)

这是我在使用ubuntu系统时出现的问题&#xff0c;网上搜到很多解决的办法&#xff0c;我是一顿操作&#xff0c;后来看到这位老哥的帖子解决了。 集Linux / Ubuntuwin10双系统安装记录(2):AMD核显驱动引发的问题 - 知乎上一篇中我们提到了 astroR2&#xff1a;Linux / Ubuntuw…