from concurrent.futures.thread import ThreadPoolExecutor
defjob(a, b): return a+b
defparallel_run(input_dir, a, b, num_worker): thread_pool = ThreadPoolExecutor(max_workers=num_worker) print('Thread Pool is created!') for i inrange(len(input_dir)): thread_pool.submit(job, a, b) thread_pool.shutdown(wait=True)
num_worker = 16# 线程池数量 input_dir = '' result_list = [] # 存储多线程的返回值 a, b = 1, 2 pool = Pool(num_worker) for i, indis inenumerate(input_dir): result_list.append(pool.apply_async(func=job, args=(a, i, b))) pool.close() pool.join() # join的目的是把for循环的多线程都执行完,再执行后面的其他进程。
# 异步async的返回值需要通过get获取 res = [] for i in result_list: res.append(i.get()) # res即函数的返回值
上面的方法二选一,笔者使用的是第二种方法
多卡并行
直接修改job函数即可
1 2 3 4 5 6 7
defjob(a, i, c): available_GPU = [0,1,2,3] # 指定你可以使用的GPU_id GPU_ID = i % len(available_GPU) # 将for循环中的每个推理按照顺序放在不同的GPU上 torch.cuda.set_device(available_GPU[GPU_ID]) # 设置GPU a = torch.tensor(a).cuda() # 将要处理的数据放在cuda上,原来是np格式,转成tensor可以直接用 c = torch.tensor(c).cuda() # 将要处理的数据放在cuda上,原来是np格式,转成tensor可以直接用 return infer(a, c) # infer为模型推理函数
踩的一些坑
1 2 3 4
1.# 记得不要把.get()接到异步函数后面,像这样会拖慢并行的速度,各位bloger可以自行尝试 result_list.append(pool.apply_async(func=job, args=(a, i, b)).get()) 2.# 在异步调用class中的函数时,千万不要用装饰器,否则会提示 Can't pickle <class '__main__.Test'>: it's not the same objectas __main__.Test
defrun(msg): print('msg:%s' %msg) # 程序随眠3秒, time.sleep(3) print('end') return msg if __name__ == "__main__": print("开始执行主程序") start_time=time.time() # 使用进程池创建子进程 size=3 pool=Pool(size) print("开始执行子进程") results = [] # for i in range(size): # a = run(i) # results.append(a) # for res in results: # print(res) for i inrange(size): results.append(pool.apply_async(run,(i,))) pool.close() pool.join() for res in results: print(res.get()) print(results) print("主进程结束耗时%s"%(time.time()-start_time))