Toccata in Nowhere.

Python 多进程 multiprocessing 并行

2020.07.09

multiprocessing 是 Python 并行计算的另一种实现方式(另一种是mpi4py),可以绕开GIL锁机制从而实现多进程并行。相比于mpi4py并行,实现更为简单,但功能也较为简易。

依赖

import multiprocessing
from multiprocessing import Process
from multiprocessing import Pool

获取核心数目

import multiprocessing as mp

num_cores = int(mp.cpu_count())
print(num_cores)

这里返回的是虚拟核的数目。

使用例

以下由两个测试函数分别测试多种不同的多进程调用与数据分配方式:

def delayFunction(x):
	print("start from " + str(x))
	m = np.random.random([2000,2000])
	np.linalg.svd(m)
	print("end from " + str(x))
	
def delayFunctionB(x, y):
	print("start from " + str(x) + ", " + str(y))
	m = np.random.random([x,y])
	np.linalg.svd(m)
	print("end from " + str(x) + ", " + str(y))

函数 delayFunction(x) 进行一个 $2000 \times 2000$ 大小矩阵的 SVD(奇异值分解),需要一定的计算时间;函数 delayFunctionB(x, y) 则进行 $x \cdot y$ 大小矩阵的 SVD 分解。

另外值得注意的是,因为numpy本身的C实现绕开了 GIL 锁实现了多核运算(参考),实际占用的CPU比例会高于进程数。

阻塞式

分配多进程 Process

  1. 声明 multiprocessing.Process 对象
  2. 使用 .start() 方法开始 Process 执行
  3. 使用 .join() 方法等待执行完毕。

例如:

p0 = Process(target=delayFunction, args=(1,))
p1 = Process(target=delayFunction, args=(2,))
p0.start()
p1.start()
p0.join()
p1.join()

进程池 Pool

可以通过构建进程池的方法避免 Process 在创建、销毁时的开销。

函数 map
with Pool(2) as pool:
	pool.map(func=delayFunction, iterable=[1,2,3,4])
	
with Pool(2) as pool:
	pool.starmap(func=delayFunctionB, iterable=[(1,2),(3,4)])

在以上例子中,pool.map函数对应单参数的函数,starmap 则对应多参数的函数。 值得注意的是 pool.map 对应的 iterable 要求list对象,pool.starmapiterable则要求元组内可迭代。 此外,可以使用 pool.imap(func=,iterable=) ,允许分配Iterable对象,具有更强的灵活性。

函数 apply
with Pool(5) as pool:
		for ii in range(5):
			pool.apply(delayFunction, [1])

以上例子实际是进行了串行的计算,因为每次apply后都会阻塞等待 Process 运行完毕后再继续进行之后的计算。所以实际上 apply 方法极少使用。

异步非阻塞

函数 map_async / starmap_async

以上两种方案在运行时都会等待所有Process的 map()/starmap()运行结束后才会继续,如要使用非阻塞(异步)方案。可使用map_async/ starmap_async

with Pool(5) as pool:
	result = pool.map_async(delayFunction, [1,2,3,4,5])
	result.get()
	
print("ending")

对于以上例子,则会等待result.get(),当Process运算结束,且代码运行到 result.get()时,才会继续运行,需要注意的是 result.get() 需要在 with Pool的作用范围内,如果想在 Pool 外使用,可以:

with Pool(5) as pool:
	result = pool.map_async(delayFunction, [1,2,3,4,5])
	result.wait()	
	
output = result.get()
print("ending")

函数 apply_async

result = []
with Pool(5) as pool:
	for ii in range(4):
		result.append(pool.apply_async(delayFunction, [ii]))
	
	for r in result:
		r.get()

print("ending")

需要注意的是,在for循环中如果使用result.wait() 则因为循环内阻塞,失去并行性。

超时参数 timeout

可以使用 result.get(timeout=2) 定义超时时间(单位:秒),同时可以使用try...except捕获异常:

with Pool(5) as pool:
	result = pool.map_async(delayFunction, [1,2,3,4,5])
	try:
		print(result.get(timeout=2))
	except multiprocessing.context.TimeoutError:
		print(TimeoutError.__name__)

需要注意的是,timeout要在 with Pool 作用域内使用,如果使用 result.wait(),则在作用域范围外的result.get(timeout=)设定会失效,依然会得到所有的(无论超时与否)的结果,同时所有的计算都会继续直到完成。