背景
最近在开发实践中,接触到了需要多进程开发的场景。众所周知,进程和线程最大的区别就在于:进程是资源分配的最小单位,线程是cpu调度的最小单位。对于多进程开发来说,每一个进程都占据一块独立的虚拟内存空间,想要在多进程之间交互难道只能选择管道(PIPE)、套接字(sokect)、远程过程调用(RPC)这种麻烦的东西吗?
实际上在本地进行多进程开发的时候,只要本进程处于你的系统之下,你可以在自己的父进程中设定共享内存区域,使得子进程可以访问该共享内存变量。
共享内存
在Python中,进程间共享变量最常用的方法之一是通过共享内存。Python的multiprocessing
模块提供了对共享内存的支持,主要通过Value、Array、Manager
等类来实现。
使用multiprocessing.Value
共享单个值
multiprocessing.Value
用于在多个进程之间共享一个基本数据类型(如int, float, string等)的值。
import multiprocessing
import time def writer(shared_value): for i in range(5): shared_value.value = i print(f"Writer: {shared_value.value}") time.sleep(1) def reader(shared_value): while True: if shared_value.value != -1: # 使用-1作为结束信号 print(f"Reader: {shared_value.value}") time.sleep(0.5) else: break if __name__ == "__main__": # 创建一个共享的int值 shared_value = multiprocessing.Value('i', 0) # 'i'表示int类型 # 创建进程 p1 = multiprocessing.Process(target=writer, args=(shared_value,)) p2 = multiprocessing.Process(target=reader, args=(shared_value,)) # 启动进程 p1.start() p2.start() # 等待writer进程完成 p1.join() # 发送结束信号给reader shared_value.value = -1 # 等待reader进程完成 p2.join()
需要注意的是multiprocessing.Value
的使用方式:
对于共享整数或者单个字符,初始化比较简单,参照下图映射关系即可。如:
i = multiprocessing.Value('i', 1)
c = multiprocessing.Value('c', '0')
注意,如果我们使用的code在上表不存在,则会抛出错误:
TypeError: this type has no size
如果共享的是字符串,则在上表是找不到映射关系的,就是没有code可用。所以我们需要使用原始的ctype类型,如:
from ctypes import c_char_p
ss =multiprocessing.
Value(c_char_p, 'ss')ctype类型可从下表查阅:
使用multiprocessing.Array
共享复杂数据结构
当需要共享更复杂的数据结构(如数组或列表)时,可以使用multiprocessing.Array
。
注意:Array
只能存储基本数据类型的数组。和上上图一致
import multiprocessing
import ctypes def writer(shared_array): for i in range(5): shared_array[i] = i * i print(f"Writer: {list(shared_array[:i+1])}") time.sleep(1) def reader(shared_array): while True: if shared_array[0] != -1: # 假设我们使用第一个元素作为结束信号 print(f"Reader: {list(shared_array)}") time.sleep(0.5) else: break if __name__ == "__main__": import time # 创建一个共享的整数数组 shared_array = multiprocessing.Array(ctypes.c_int, 5) # 创建一个长度为5的整数数组 # 创建进程 p1 = multiprocessing.Process(target=writer, args=(shared_array,)) p2 = multiprocessing.Process(target=reader, args=(shared_array,)) # 启动进程 p1.start() p2.start() # 等待writer进程完成 p1.join() # 发送结束信号给reader(这里简单地将第一个元素设为-1) shared_array[0] = -1 # 等待reader进程完成 p2.join()
使用multiprocessing.Manager
共享更复杂数据结构
Manager提供了一种方法创建数据,数据能够在不同进程之间共享,包括跨网络的运行在不同机器上的进程。manager对象控制有共享对象的服务进程。其他进程通过代理后也能操作共享对象。
import multiprocessing def worker(d, key, value): d[key] = value print(f'Worker: Setting {key} to {value}') if __name__ == '__main__': with multiprocessing.Manager() as manager: # 创建一个共享字典 d = manager.dict() # 创建并启动进程 p1 = multiprocessing.Process(target=worker, args=(d, 'foo', 1)) p2 = multiprocessing.Process(target=worker, args=(d, 'bar', 2)) p1.start() p2.start() # 等待进程完成 p1.join() p2.join() # 打印最终的共享字典内容 print('Main: Dictionary contents:', d)
参考文献
Python Multiprocessing 多进程并行计算 - Jeremy Feng (fengchao.pro)
python多进程共享变量Value使用tips - 简书 (jianshu.com)
Python多进程并行操作-multiprocessing-Managers - 知乎 (zhihu.com)