最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

python中进程间数据通讯模块multiprocessing.Manager的介绍

来源:动视网 责编:小采 时间:2020-11-27 14:20:23
文档

python中进程间数据通讯模块multiprocessing.Manager的介绍

python中进程间数据通讯模块multiprocessing.Manager的介绍:本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Mana
推荐度:
导读python中进程间数据通讯模块multiprocessing.Manager的介绍:本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Mana


本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共享(同一个父进程).

dict使用说明

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
# 3. 创建一个测试程序
def test(idx, test_dict):
 test_dict[idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

too simple.

简单的源码分析

这时我们再看一个例子

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
 test_dict['test'][idx] = idx
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

可以看到输出结果是奇怪的{'test': {}}
如果我们简单修改一下代码

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict):
 row = test_dict['test']
 row[idx] = idx
 test_dict['test'] = row
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict))
pool.close()
pool.join()
print(temp_dict)

这时输出结果就符合预期了.

为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.

def Manager():
 '''
 Returns a manager associated with a running server process

 The managers methods such as `Lock()`, `Condition()` and `Queue()`
 can be used to create shared objects.
 '''
 from multiprocessing.managers import SyncManager
 m = SyncManager()
 m.start()
 return m
 
...
 def start(self, initializer=None, initargs=()):
 '''
 Spawn a server process for this manager object
 '''
 assert self._state.value == State.INITIAL

 if initializer is not None and not hasattr(initializer, '__call__'):
 raise TypeError('initializer must be a callable')

 # pipe over which we will retrieve address of server
 reader, writer = connection.Pipe(duplex=False)

 # spawn process which runs a server
 self._process = Process(
 target=type(self)._run_server,
 args=(self._registry, self._address, self._authkey,
 self._serializer, writer, initializer, initargs),
 )
 ident = ':'.join(str(i) for i in self._process._identity)
 self._process.name = type(self).__name__ + '-' + ident
 self._process.start()
...

上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.

回到上面的奇怪现象上, 这个操作test_dict['test'][idx] = idx实际上在拉取到server上的数据后进行了修改, 但并没有返回给server, 所以temp_dict的数据根本没有变化. 在第二段正常代码, 就相当于先向服务器请求数据, 再向服务器传送修改后的数据. 这样就可以解释这个现象了.

进程间数据安全

这个时候如果出现一种情况, 两个进程同时请求了一份相同的数据, 分别进行修改, 再提交到server上会怎么样呢? 那当然是数据产生异常. 基于此, 我们需要Manager的另一个对象, Lock(). 这个对象也不难理解, Manager本身就是一个server, dict跟lock都来自于这个server, 所以当你lock住的时候, 其他进程是不能取到数据, 自然也不会出现上面那种异常情况.

代码示例:

import multiprocessing
# 1. 创建一个Manger对象
manager = multiprocessing.Manager()
# 2. 创建一个dict
temp_dict = manager.dict()
lock = manager.Lock()
temp_dict['test'] = {}
# 3. 创建一个测试程序
def test(idx, test_dict, lock):
 lock.acquire()
 row = test_dict['test']
 row[idx] = idx
 test_dict['test'] = row
 lock.release()
# 4. 创建进程池进行测试
pool = multiprocessing.Pool(4)
for i in range(100):
 pool.apply_async(test, args=(i, temp_dict, lock))
pool.close()
pool.join()
print(temp_dict)

切忌不要进程里自己新建lock对象, 要使用统一的lock对象.

本篇文章到这里就已经全部结束了,更多其他精彩内容可以关注PHP中文网的python视频教程栏目!

文档

python中进程间数据通讯模块multiprocessing.Manager的介绍

python中进程间数据通讯模块multiprocessing.Manager的介绍:本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Mana
推荐度:
标签: 数据 通讯 模块
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top