

本节讲学习Python的多进程。
多进程 Multiprocessing 和多线程 threading 类似, 他们都是在 python 中用来并行运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的GIL.
使用 multiprocessing 也非常简单, 如果对 threading 有一定了解的朋友, 你们的享受时间就到了. 因为 python 把 multiprocessing 和 threading 的使用方法做的几乎差不多. 这样我们就更容易上手. 也更容易发挥你电脑多核系统的威力了!
import multiprocessing as mp
import threading as td
def job(a,d):
print('aaaaa')
t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()从上面的使用对比代码可以看出,线程和进程的使用方法相似。
在运用时需要添加上一个定义main函数的语句
if __name__=='__main__':
完整的应用代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_test.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(a, d): print a, d if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) p1.start() p1.join()
运行环境要在terminal环境下,可能其他的编辑工具会出现运行结束后没有打印结果,在terminal中的运行后打印的结果为:
? baseLearn python ./process/process_test.py 1 2 ? baseLearn
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果
process_queue.py
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp # 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果 def job(q): res = 0 for i in range(1000): res += i + i**2 + i**3 q.put(res) #queue if __name__ == '__main__': q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) # 分别启动、连接两个线程 p1.start() p2.start() p1.join() p2.join() # 上面是分两批处理的,所以这里分两批
打印输出结果:
? python ./process/process_queue.py 249833583000 249833583000
进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题。
首先import multiprocessing 和定义job()
import multiprocessing as mp def job(x): return x*x
然后我们定义一个Pool
pool = mp.Pool()
有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果
res = pool.map(job, range(10))
让我们来运行一下
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
完成代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(x): return x*x # 注意这里的函数有return返回值 def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
执行结果:
? baseLearn python ./process/process_pool.py [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量
def multicore(): pool = mp.Pool(processes=3) # 定义CPU核数量为3 res = pool.map(job, range(10)) print(res)
Pool除了map()外,还有可以返回结果的方式,那就是apply_async().
apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get())
运行结果;
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map() 4 # apply_async()
Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
map() 放入迭代参数,返回多个结果
apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
这节我们学习如何定义共享内存。只有用共享内存才能让CPU之间有交流。
我们可以通过使用Value数据存储在一个共享的内存表中。
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)其中d和i参数用来设置数据类型的,d表示一个双精浮点类型 double,i表示一个带符号的整型。
| Type code | C Type | Python Type | Minimum size in bytes |
|---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'u' | Py_UNICODE | Unicode character | 2 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 2 |
'I' | unsigned int | int | 2 |
'l' | signed long | int | 4 |
'L' | unsigned long | int | 4 |
'q' | signed long long | int | 8 |
'Q' | unsigned long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
在Python的 mutiprocessing 中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。
array = mp.Array('i', [1, 2, 3, 4])这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.
错误形式
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list
"""
TypeError: an integer is required
"""让我们看看没有加进程锁时会产生什么样的结果。
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_no_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num): for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让
在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。
结果打印:
? baseLearn python ./process/process_no_lock.py 1 5 9 9 13 13 17 17 18 18 ? baseLearn
我们可以看到,进程1和进程2在相互抢着使用共享内存v。
为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
首先需要定义一个进程锁
l = mp.Lock() # 定义一个进程锁
然后将进程锁的信息传入各个进程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=(v,3,l))
在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # v.value获取共享内存 print(v.value) l.release() # 释放
全部代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让
运行一下,让我们看看是否还会出现抢占资源的情况:
结果打印:
? baseLearn python ./process/process_lock.py 1 2 3 4 5 9 13 17 21 25
显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行
