Flask使用Celery与多进程管理:优雅处理长时间任务与子进程终止技巧(multiprocessing)(subprocess)

在许多任务处理系统中,我们需要使用异步任务队列来处理繁重的计算或长时间运行的任务,如模型训练。Celery是一个广泛使用的分布式任务队列,而在某些任务中,尤其是涉及到调用独立脚本的场景中,我们需要混合使用multiprocessingsubprocess模块来启动和管理这些任务进程。然而,这种组合有时会带来一些挑战,如进程冲突和子进程无法正确终止的问题。

本文将讨论如何使用Celery、Multiprocessing和Subprocess来处理这些问题,并在需要时正确关闭子进程,实现完美的进程管理与切换。

问题描述

当通过celery.control.revoke来终止Celery任务时,如果任务启动了多个子进程,例如使用multiprocessingsubprocess模块,这些子进程不会被立即终止。在某些情况下,子进程会继续运行,导致任务无法彻底停止,并可能造成系统资源浪费。

解决方案

我们可以通过组合使用psutil库来实现对子进程的监控和终止,从而确保所有相关的进程都能正确关闭。以下是具体实现步骤。

安装必要的库

确保你已经安装psutil库:

pip install psutil

修改代码实现

1. Celery任务与Multiprocessing结合Subprocess

首先,我们创建一个Celery任务。当任务启动时,它会使用multiprocessing模块启动一个新的进程,该进程将执行独立的Python脚本。代码如下:

import json
import os
import psutil
import multiprocessing
import subprocess
from celery_app import celeryimport torch.multiprocessing as mpmp.set_start_method('spawn', True)def run_script(json_test_path, uid):command = f"python training.py {json_test_path}"process = subprocess.Popen(command, shell=True)print("===========================================PID:", process.pid)print("===========================================uid:", uid)process.wait()return process.pid@celery.task(bind=True)
def lora_train_task(self, json_test_demo):# 将json_test保存到临时文件中json_test_path = f"training_config_{json_test_demo['uid']}.json"json_test_path = os.path.abspath(json_test_path)with open(json_test_path, 'w') as f:json.dump(json_test_demo, f)# 使用多进程调用独立脚本p = multiprocessing.Process(target=run_script, args=(json_test_path, json_test_demo['uid']))p.start()p.join()return 0
2. 使用psutil关闭子进程

我们通过调用psutil库来监控并关闭所有相关的子进程。以下是实现终止任务和子进程的代码示例:

import psutildef terminate_process_tree(pid):try:parent = psutil.Process(pid)for child in parent.children(recursive=True):  # This will recursively find all child processeschild.terminate()parent.terminate()except psutil.NoSuchProcess:pass# 示例:终止任务时调用终止子进程函数
celery.control.revoke(args['task_id'], terminate=True, signal='SIGKILL')
if args['pid_id']:terminate_process_tree(int(args['pid_id']))

完整示例

将上述代码组合起来,我们得到完成的实现。如下所示:

import json
import os
import psutil
import multiprocessing
import subprocess
from celery_app import celery
import torch.multiprocessing as mpmp.set_start_method('spawn', True)def run_script(json_test_path, uid):command = f"python training.py {json_test_path}"process = subprocess.Popen(command, shell=True)print("===========================================PID:", process.pid)print("===========================================uid:", uid)process.wait()return process.pid@celery.task(bind=True)
def lora_train_task(self, json_test_demo):# 将json_test保存到临时文件中json_test_path = f"training_config_{json_test_demo['uid']}.json"json_test_path = os.path.abspath(json_test_path)with open(json_test_path, 'w') as f:json.dump(json_test_demo, f)# 使用多进程调用独立脚本process = multiprocessing.Process(target=run_script, args=(json_test_path, json_test_demo['uid']))process.start()process.join()return 0def terminate_process_tree(pid):try:parent = psutil.Process(pid)for child in parent.children(recursive=True):  # 递归找到所有子进程child.terminate()parent.terminate()except psutil.NoSuchProcess:pass# 示例:终止任务时调用终止子进程函数
celery.control.revoke(args['task_id'], terminate=True, signal='SIGKILL')
if args['pid_id']:terminate_process_tree(int(args['pid_id']))

结论

本文示范了如何通过混合使用Celery、Multiprocessing与Subprocess来处理复杂的任务执行场景,同时介绍了通过psutil库来正确管理和终止子进程。这种方法能够确保系统资源的合理使用,并避免出现僵尸进程问题。希望本文对你在实际项目中处理类似问题时有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/35491.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

BGP路径属性与路由反射器

BGP路径属性 路径属性: 任何一条BGP路由都拥有多个路径属性 当路由器将BGP路由通告给它的对等体时,一并被通告的还有路由所携带的各各路径属性 BGP的路径属性将影响路由优选 路径四个属性分类: 公认必遵:必须包括在每个upda…

C语言期末考试——重点考点

目录 1.C语言的结构 2.三种循环结构 3.逻辑真假判断 4. printf函数 5. 强制类型转化 6. 多分支选择结构 7. 标识符的定义 8. 三目运算符 1.C语言的结构 选择结构、顺序结构、循环结构 2.三种循环结构 for、while、do-while 3.逻辑真假判断 C语言用0表示false,用非0(不…

ci/cd配置任务超时时间

有两个地方决定了任务超时时间: 1. 2.gitlab-runner

JUC:Synchronized和锁升级

1. 面试题 谈谈你对Synchronized的理解Sychronized的锁升级你聊聊Synchronized实现原理,monitor对象什么时候生成的?知道monitor的monitorenter和monitorexit这两个是怎么保证同步的嘛?或者说这两个操作计算机底层是如何执行的偏向锁和轻量级…

梯度下降法以及 Python 实现

文章目录 1. 引言2. 梯度法3. 例子4. 代码实现5. 讨论 — 学习率 η \eta η5.1 当 η \eta η 设置过大5.2 当 η \eta η 设置过小 参考 1. 引言 梯度下降法,可以根据微分求出的斜率计算函数的最小值。 在人工智能中,经常被应用于学习算法。 2. 梯…

OpenCV-图像阈值

简单阈值法 此方法是直截了当的。如果像素值大于阈值,则会被赋为一个值(可能为白色),否则会赋为另一个值(可能为黑色)。使用的函数是 cv.threshold。第一个参数是源图像,它应该是灰度图像。第二…

详细了解IO分类

按照数据流方向如何划分? 输入流(Input Stream):从源(如文件、网络等)读取数据到程序。 输出流(Output Stream):将数据从程序写出到目的地(如文件、网络、控…

Appium 安装问题汇总

好生气好生气,装了几天了, opencv4nodejs 和 mjpeg-consumer 就是装不了,气死我了不管了,等后面会装的时候再来完善,气死了气死了。 目录 前言 1、apkanalyzer.bat 2、opencv4nodejs 3、ffmpeg 4、mjpeg-consume…

目标检测知识点总结

1、数据增强 2、指标 3、vit 、swint ViT算法,创新性地将图像划分成一个个patch,并将每个patch展平为一个向量,使得图像数据转化为序列化数据,之后输入到Transformer模型中,实现了Transformer在图像分类任务中的应用。…

Lecture 11 - List,Set,Map

List, Set and Map are all interfaces: they define how these respective types work, but they don’t provide implementation code overview 1. List(列表): (1) 创建、访问和操作列表:ArrayList …

ElfBoard开源项目|基于百度智能云平台的车牌识别项目

本项目基于百度智能云平台,旨在利用其强大的OCR服务实现车牌号码的自动识别。选择百度智能云的原因是其高效的API接口和稳定的服务质量,能够帮助开发者快速实现车牌识别应用。 本项目使用摄像头捕捉图像后,通过集成百度OCR服务的API&#xf…

应对超声波测试挑战,如何选择合适的数字化仪?

一、数字化仪的超声波应用 超声波是频率大于人类听觉范围上限的声学声压(声学)波。超声波设备的工作频率为20 kHz至几千MHz。表1总结了一些更常见的超声波应用的特征。 每个应用中使用的频率范围都反映了实际情况下的平衡。提高工作频率可以通过提高分…

product/admin/list?page=0size=10field=jancodevalue=4562249292272

文章目录 1、ProductController2、AdminCommonService3、ProductApiService4、ProductCommonService5、ProductSqlService https://api.crossbiog.com/product/admin/list?page0&size10&fieldjancode&value45622492922721、ProductController GetMapping("ad…

博物馆导览系统方案(一)背景需求分析与核心技术实现

维小帮提供多个场所的室内外导航导览方案,如需获取博物馆导览系统解决方案可前往文章最下方获取,如有项目合作及技术交流欢迎私信我们哦~撒花! 一、博物馆导览系统的背景与市场需求 在数字化转型的浪潮中,博物馆作为文化传承和知…

Flink学习连载文章11--双流Join

双流 Join 和两个流合并是不一样的 两个流合并:两个流变为 1 个流 union connect 双流 join: 两个流 join,其实这两个流还是原来的,只是满足条件的数据会变为一个新的流。 可以结合 sql 语句中的 union 和 join 的区别。 在离线 Hive 中&…

vue3+wangeditor富文本编辑器详细教程

一、前言 在这篇教程中,我将指导如何使用 Vue 3 和 WangEditor 创建一个功能丰富的富文本编辑器。WangEditor 是一个轻量级的富文本编辑器,它非常适合集成到 Vue 项目中。这个例子展示了如何配置富文本编辑器,包括工具栏、编辑器配置以及如何…

Python学习39天

my_tools.py文件提供工具函数 """ 此文件编写工具函数,供程序员使用 my_tools """def read_confirm_select():"""让用户输入:Y/N,不区分大小写,将用户输入值转为小写返回&#xff…

LCA - Lowest Common Ancestor

LCA - Lowest Common Ancestor https://www.luogu.com.cn/problem/SP14932 题目描述 A tree is an undirected graph in which any two vertices are connected by exactly one simple path. In other words, any connected graph without cycles is a tree. - Wikipedia T…

unity打包web,发送post请求,获取地址栏参数,解决TypeError:s.replaceAll is not a function

发送post请求 public string url "http://XXXXXXXXX";// 请求数据public string postData "{\"user_id\": 1}";// Start is called before the first frame updatevoid Start(){// Post();StartCoroutine(PostRequestCoroutine(url, postData…

恒创科技:如何区分网站的域名主机名

如何区分网站的域名主机名?它们都是网址机制的一部分,当你在地址栏输入它们,就能访问互联网上想去的地方。你可曾思考过主机名和域名的区别呢? 简单来说,域名就像网址,而主机名用于标识网络中的设备。不过,这只是表面…