map并行
from multiprocessing import Pool
# 定义函数,每个子进程执行体
def abc(i):
s=i+1
s.to_csv()
# 定义参数列表
flist = list(range(10000))
# 实例化
pool = Pool(10)
# 通过map把flist里的每个元素,传递给子进程,每个子进程按照传入的参数执行abc函数
pool.map(abc,flist)
# 关闭pool,使其不再接受新的(主进程)任务
pool.close()
# 主进程阻塞后,让子进程继续运行直至完成,再把主进程全部关掉
pool.join()
结合tqdm显示进度:
from multiprocessing import Pool
from tqdm import tqdm
def dist(flist):
fin=flist[0]
distance=fin
return distance
with Pool(processes=7,maxtasksperchild=1000) as p:
dist_list = list(tqdm(p.imap(dist_calc, flist), total=len(flist)))
其中,maxtasksperchild本意是每个进程最大的任务量,如果你maxtasksperchild = 2, 那么他每次干完两个任务后,就会spawn(产卵)一个新的进程。
可以防止某个进程内存泄露被oom,这样可以通过原始kill进程的方式回收内存资源。
imap返回结果顺序和输入相同
iter = pool.imap(fn, data)
一旦生成,无论使不使用iter,多进程计算都会开始。
找到结果就中断
import multiprocessing
from skimage.measure import compare_ssim
import cv2
import numpy as np
def find_pos(stride):
f_temp = r'E:\work\code\mouse_keyboard\data\icon\temp_bianhao.png'
f_icon = r'E:\work\code\mouse_keyboard\data\icon\bianhao.png'
# Load / to grayscale
icon = cv2.imdecode(np.fromfile(f_icon, dtype=np.uint8), -1)
icon = cv2.cvtColor(icon, cv2.COLOR_BGR2GRAY)
temp = cv2.imdecode(np.fromfile(f_temp, dtype=np.uint8), -1)
temp = cv2.cvtColor(temp, cv2.COLOR_BGR2GRAY)
# this temp with stride
height_icon = icon.shape[0]
temp_this = temp[stride+0: stride + height_icon, :]
score = compare_ssim(icon, temp_this)
return stride, score
if __name__ == '__main__':
height_icon = 47
height_temp = 186
flist = list(range(height_temp - height_icon + 1))
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
for y in pool.imap_unordered(find_pos, flist):
print(y)
if y[1] > 0.99:
break
参考:
Python 多核并行计算
进程池Pool的imap方法解析