前言

当你显存足够,想用空间换时间;或者想单纯的加速模型。正好用的是python的for循环,而且每一次循环都是一次模型的推理过程,那么本 blog内容将非常适合你。

单卡并行

现在并行的方法有很多,本文没有对现有的方法进行归类,但是找到了适合自己用的,就分享在这里。比如:

1
2
3
4
5
6
7
8
9
10
11
from concurrent.futures.thread import ThreadPoolExecutor

def job(a, b):
return a+b

def parallel_run(input_dir, a, b, num_worker):
thread_pool = ThreadPoolExecutor(max_workers=num_worker)
print('Thread Pool is created!')
for i in range(len(input_dir)):
thread_pool.submit(job, a, b)
thread_pool.shutdown(wait=True)

再比如异步执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Pool

# job可以是你的推理函数
def job(a, b, c):
return a+b+c

num_worker = 16 # 线程池数量
input_dir = ''
result_list = [] # 存储多线程的返回值
a, b = 1, 2
pool = Pool(num_worker)
for i, indis in enumerate(input_dir):
result_list.append(pool.apply_async(func=job, args=(a, i, b)))
pool.close()
pool.join() # join的目的是把for循环的多线程都执行完,再执行后面的其他进程。

# 异步async的返回值需要通过get获取
res = []
for i in result_list:
res.append(i.get())
# res即函数的返回值

上面的方法二选一,笔者使用的是第二种方法

多卡并行

直接修改job函数即可

1
2
3
4
5
6
7
def job(a, i, c):
available_GPU = [0,1,2,3] # 指定你可以使用的GPU_id
GPU_ID = i % len(available_GPU) # 将for循环中的每个推理按照顺序放在不同的GPU上
torch.cuda.set_device(available_GPU[GPU_ID]) # 设置GPU
a = torch.tensor(a).cuda() # 将要处理的数据放在cuda上,原来是np格式,转成tensor可以直接用
c = torch.tensor(c).cuda() # 将要处理的数据放在cuda上,原来是np格式,转成tensor可以直接用
return infer(a, c) # infer为模型推理函数

踩的一些坑

1
2
3
4
1. # 记得不要把.get()接到异步函数后面,像这样会拖慢并行的速度,各位bloger可以自行尝试
result_list.append(pool.apply_async(func=job, args=(a, i, b)).get())
2. # 在异步调用class中的函数时,千万不要用装饰器,否则会提示
Can't pickle <class '__main__.Test'>: it's not the same object as __main__.Test

第一个坑可以用如下代码进行尝试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
from multiprocessing import Pool

def run(msg):
print('msg:%s' %msg)
# 程序随眠3秒,
time.sleep(3)
print('end')
return msg

if __name__ == "__main__":
print("开始执行主程序")
start_time=time.time()
# 使用进程池创建子进程
size=3
pool=Pool(size)
print("开始执行子进程")
results = []
# for i in range(size):
# a = run(i)
# results.append(a)
# for res in results:
# print(res)
for i in range(size):
results.append(pool.apply_async(run,(i,)))
pool.close()
pool.join()
for res in results:
print(res.get())
print(results)
print("主进程结束耗时%s"%(time.time()-start_time))

小结

本文仅简单记录笔者学到的方法和踩到的坑,欢迎各位指点!~