Python多进程间通讯(包含共享内存方式)

文章目录

  • 1 通过非共享内存配合队列方式
  • 2 通过共享内存配合队列方式


注:本博文测试环境为Linux系统。


1 通过非共享内存配合队列方式

下面是一个常见的生产者与消费者的模式示例,这里分别启动了两个子进程,一个为生产者(producer)一个为消费者(consumer)。生产者负责生产Numpy的NDArray数据(这里为了体现进程间传递数据会耗时故创建的NDArray的shape比较大),然后将数据放入队列Queue。消费者监控队列Queue一旦有数据就取出并简单打印下shape信息和填充的Value信息。

import time
import multiprocessing as mp
from multiprocessing import Process, Queueimport numpy as npdef producer_task(queue: Queue):for i in range(10):data = np.full(shape=(1, 3, 2048, 2048), fill_value=i, dtype=np.float32)queue.put(data)time.sleep(0.1)# send exit signalqueue.put(None)print("producer exit.")def consumer_task(queue: Queue):while True:data = queue.get()if data is None:breakprint(f"get data shape:{data.shape}, fill value:{data[0][0][0][0]}")print("consumer exit.")def main():queue = Queue()producer = Process(target=producer_task, args=(queue,), name="producer")consumer = Process(target=consumer_task, args=(queue,), name="consumer")producer.start()consumer.start()producer.join()consumer.join()if __name__ == '__main__':mp.set_start_method("spawn")main()

执行以上代码终端输出以下内容:

get data shape:(1, 3, 2048, 2048), fill value:0.0
get data shape:(1, 3, 2048, 2048), fill value:1.0
get data shape:(1, 3, 2048, 2048), fill value:2.0
get data shape:(1, 3, 2048, 2048), fill value:3.0
get data shape:(1, 3, 2048, 2048), fill value:4.0
get data shape:(1, 3, 2048, 2048), fill value:5.0
get data shape:(1, 3, 2048, 2048), fill value:6.0
get data shape:(1, 3, 2048, 2048), fill value:7.0
get data shape:(1, 3, 2048, 2048), fill value:8.0
get data shape:(1, 3, 2048, 2048), fill value:9.0
producer exit.
consumer exit.

为了进一步看清进程之间传递数据的过程,这里使用viztracer工具进一步分析(直接通过pip install viztraver即可安装)。使用指令如下,其中main.py就是上面的代码内容。跑完后会在当前目录下生成一个result.json文件。

viztracer main.py

通过如下指令可视化result.json文件:

vizviewer result.json

在终端输入上述指令后,终端会提示你打开网页并进入http://localhost:9001,如果使用的是VSCODE IDE在右下角也会提示你打开浏览器。
在这里插入图片描述

在这里插入图片描述
可以看到生产者进程在将数据放入队列后会先进行ForkingPickler.dump即数据序列化的过程,大概耗时12ms。然后开始posix.write即开始将数据从一个进程传递到另一个进程,大概耗时34ms。最后在消费者进程进行_pickle.loads即数据的反序列化,大概耗时6ms。从生产者进程将数据放入队列到消费者进程拿到数据总耗时约53ms。从这个示例中可以看到,当在进程间传递的数据量很大时会很耗时。


2 通过共享内存配合队列方式

下面示例代码将传递的数据改为了共享内存的方式,这样可以大幅减小进程间数据传递的成本。这里主要是使用multiprocessing库中的shared_memory.SharedMemory对象。创建新的共享内存时需要将create参数设置为True(如果是复用已有的共享内存时设置为False),然后指定具体的size大小,该参数为数据的字节大小,比如要申请一块存放数据类型为float32shape(1, 3, 2048, 2048)的空间所需字节数为1 * 3 * 2048 * 2048 * 4float32为4个字节)。根据Python官方文档介绍,当一个进程不在使用该共享内存时应关闭指向共享内存的文件描述符/句柄,具体做法是调用共享内存对象的close方法。当某块共享内存不在需要时,需在最后一个使用到的进程中调用unlink方法显示释放掉(如果不调用该方法,共享内存会一直存在,如果后续再不断申请新的共享内存则会出现共享内存泄露的问题,或者当程序未正常退出时该共享内存块会成为僵尸共享内存?)。例如在当前示例中,生产者进程创建了共享内存并放入队列里后可调用close方法关闭当前进程指向共享内存的文件描述符/句柄,在消费者进程中拿到数据并消费完后除了调用close方法外还会调用unlink方法删除该共享内存。有关共享内存的详细介绍看查看Python官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.shared_memory.html#multiprocessing.shared_memory.SharedMemory

import time
import multiprocessing as mp
from multiprocessing import Process, Queue, shared_memoryimport numpy as npdef producer_task(queue: Queue):for i in range(10):shm = shared_memory.SharedMemory(name=f"data_{i}",create=True,size=1 * 3 * 2048 * 2048 * 4)np_data = np.ndarray(shape=(1, 3, 2048, 2048), dtype=np.float32, buffer=shm.buf)np_data.fill(i)queue.put(shm.name)shm.close()time.sleep(0.1)# send exit signalqueue.put(None)print("producer exit.")def consumer_task(queue: Queue):while True:shm_name = queue.get()if shm_name is None:breakshm = shared_memory.SharedMemory(name=shm_name, create=False)np_data = np.ndarray(shape=(1, 3, 2048, 2048), dtype=np.float32, buffer=shm.buf)print(f"get data shape:{np_data.shape}, fill value:{np_data[0][0][0][0]}")shm.close()shm.unlink()print("consumer exit.")def main():queue = Queue()producer = Process(target=producer_task, args=(queue,), name="producer")consumer = Process(target=consumer_task, args=(queue,), name="consumer")producer.start()consumer.start()producer.join()consumer.join()if __name__ == '__main__':mp.set_start_method("spawn")main()

同样我们使用viztracer来看看进程间的通讯情况:
在这里插入图片描述

数据从生产者进程传递到消费者进程耗时为245us相比之前不使用共享内存方法的53ms,速度比值为53000/245≈216X,提升还是非常明显的。但是这有个很奇怪的现象我无法理解,就是在生产者进程中调用close方法用了1.8ms,而在消费者进程里调用close方法只用了15us,unlink用了8us,如果有知道的大神希望能帮忙解释下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/14511.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

djang5 官网_polls_app_05( 关于代码测试)

这篇教程从 教程4 结束的地方开始。已经构建了一个网络投票应用程序,现在将为其创建一些自动化测试。 1. 原因: 雅各布卡普兰-莫斯(Jacob Kaplan-Moss),Django的原始开发者之一,说过:“没有测…

准双向/弱上拉(标准8051输出模式)、仅为输入(高阻)、开漏输出、推挽输出、上拉电阻、下拉电阻都是什么?

准双向/弱上拉(标准8051输出模式): 弱上拉:即输出的1驱动能力是有限的 准双向:可以输入也可以输出 为什么是弱上拉呢? 当三极管断开的时候,“内部输入”处应该是高电平(前提的后端…

Linux高阶——1110—死锁问题原子访问线程控制与调度线程同步

目录 1、旋转锁 2、死锁问题 死锁问题举例: 1、双线程死锁 代码 成功截图 2、单线程死锁 死锁问题处理: 死锁问题预防: 有向图 3、原子访问 1、原子访问概念 2、原子访问可用函数 原代码 未加锁代码输出 修改后代码 修改后截…

python入门3

IDE的概念 IDE(Integrated Development Environment)又被称为集成开发环境。说白了,就是有一款图形化界面的软件,它集成了编辑代码,编译代码,分析代码,执行代码以及调试代码等功能。在我们Python开发中,最常…

Ollama—87.4k star 的开源大模型服务框架!!

这一年来,AI 发展的越来越快,大模型使用的门槛也越来越低,每个人都可以在自己的本地运行大模型。今天再给大家介绍一个最厉害的开源大模型服务框架——ollama。 项目介绍 Ollama 是一个开源的大语言模型(LLM)服务工具…

mysql中的EXISTS和NOT EXISTS使用详解

本文来编写一个实例说下mysql中的EXISTS和NOT EXISTS使用详解 文章目录 exists用法SQL中in, not in, exists, not exists的区别使用实例本文小结 exists用法 exists: 如果括号内子查询语句返回结果不为空,说明where条件成立,就会执行主SQL语句。如果括号…

海量数据去重的哈希与布尔过滤器

目录 散列表 hash与平衡二叉树比较: 散列表组成: hash函数 作用&#xff1a; 怎么选择hash&#xff1a; 选择标准: 常用hash: hash的操作: hash冲突 产生原因 如何描述冲突程度: 解决冲突: 在合理范围内:used < size: 不在合理范围内&#xff08;used > s…

快速掌握——python类 封装[私有属性方法]、继承【python进阶】(内附代码)

1.类的定义 与 实例化对象 在python中使用class关键字创建一个类。 举例子 class Stu(object):id 1001name 张三def __init__(self):passdef fun1(self):pass# 实例化对象 s1 Stu() s2 Stu() print(s1.name) print(s2.name) 第一个方法 __init__是一种特殊的方法&#x…

PO 证书链

提到服务器间证书交换会不会头大,这两天遇到一个B2B接口的通讯证书问题,借机涨姿势,分享之 通常服务器之间通讯证书使用有两种方式: 如果不是生产机,可以简单的使用自签名证书,自签名证书就是下面这两个信息相同,都是自己,工具这里就不介绍了,多的是。双方互换证书,你…

HarmonyOS App 购物助手工具的开发与设计

文章目录 摘要引言功能需求分析技术方案与设计架构设计技术选型 代码示例Demo数据抓取模块数据存储模块历史价格查询和数据可视化模块完整界面布局和调用示例代码详解 QA环节总结参考资料 摘要 随着促销活动的增多&#xff0c;用户面临真假折扣的困惑&#xff0c;特别是在一些…

MPTCP协议

介绍 多路径TCP或 MPTCP协议是标准的扩展传输控制协议并在中进行了描述 RFC 8684号文件它允许设备同时使用多个接口通过单个MPTCP连接发送和接收TCP数据包。MPTCP可以聚合多个接口的带宽&#xff0c;也可以选择延迟最低的接口。它还允许在一条路径断开时进行故障切换&#xff…

1. 初始认识 Spring Cloud

1. 初始认识 Spring Cloud 文章目录 1. 初始认识 Spring Cloud前言2. Spring Cloud 基本介绍3. 系统架构的演变过程3.1 单机架构3.2 动静分离架构&#xff1a;静态缓存 文件存储3.3 分布式架构&#xff1a;业务拆分 负载均衡3.4 微服务架构&#xff1a;使用 Spring Cloud 4. …

网络学习第四篇

引言&#xff1a; 我们在第三篇的时候出现了错误&#xff0c;我们要就行排错&#xff0c;那么我们要知道一下怎么配置静态路由实现ping通&#xff0c;这样子我们才知道下一跳到底是什么&#xff0c;为什么这样子做。 实验目的 理解和掌握静态路由的基本概念和配置方法。 实…

【rf】robotframework自动化测试环境搭建

robotframework自动化测试环境搭建 前言&#xff1a; 1、在2019年之前&#xff0c;robotframework-ride的版本一直是1.5.2.1&#xff0c;是2016年1月份的版本&#xff0c;只能安装在python2.7的环境上&#xff0c;导致如果想同时使用robotframework做测试且又需要python3环境…

opencv入门学习总结

opencv学习总结 不多bb&#xff0c;直接上代码&#xff01;&#xff01;&#xff01; 案例一&#xff1a; import cv2 # 返回当前安装的 OpenCV 库的版本信息 并且是字符串格式 print(cv2.getVersionString()) """ 作用&#xff1a;它可以读取不同格式的图像文…

《DiffusionDet: Diffusion Model for Object Detection》ICCV2023

摘要 本文提出了一种新的框架DiffusionDet&#xff0c;它将目标检测任务表述为从带噪声的边界框到目标边界框的去噪扩散过程&#xff08;如图一所示&#xff09;。在训练阶段&#xff0c;目标边界框逐渐扩散到随机分布&#xff0c;模型学习逆转这一加噪过程。在推理阶段&#…

加深深度学习矩阵计算理解--用人类直觉 走进线性代数(非应试)

文章目录 前言一、向量二、线性组合、空间与基三、矩阵和线性变换四、矩阵乘法与线性变化复合1、矩阵乘法代表线性变换的复合2、实例说明 五、三维空间的线性变换1、基本性质2、直觉理解3、矩阵表示 六、行列式一、行列式的定义2、行列式在空间中的抽象理解 七、逆矩阵 列空间秩…

AIGC学习笔记(5)——AI大模型开发工程师

文章目录 AI大模型开发工程师004 垂直领域的智能在线搜索平台1 智能在线搜索平台需求分析大模型不够“聪明”增强大模型的方式需求分析2 智能在线搜索平台方案设计方案设计技术选型大模型版本GLM-4大模型注册使用Google Cloud平台注册创建可编程的搜索引擎3 智能在线搜索平台代…

【C++滑动窗口】1234. 替换子串得到平衡字符串|1877

本文涉及的基础知识点 C算法&#xff1a;滑动窗口及双指针总结 LeetCode1234. 替换子串得到平衡字符串 有一个只含有 ‘Q’, ‘W’, ‘E’, ‘R’ 四种字符&#xff0c;且长度为 n 的字符串。 假如在该字符串中&#xff0c;这四个字符都恰好出现 n/4 次&#xff0c;那么它就…

源码分享-Springboot+Vue大学生社团活动平台附源码,sql文件,配套论文

源码获取: 复制链接到浏览器打开即可领取 夸克网盘领取链接&#xff1a;https://pan.quark.cn/s/187d2ca0e3ec 百度网盘领取链接&#xff1a;https://pan.baidu.com/s/1apbO6k1cEqFXV-USf0I2IA?pwdccaj 提取码: ccaj 1.1课题背景及意义 随着现代网络技术发展&#xff0…