本篇日志记录python3 标准库中并发编程模块 concurrent.futures 的使用
doc
0x0 concurrent.futures 模块
concurrent.futures 模块是python3.2 之后加入标准库的,它为异步执行任务提供了抽象接口,通过 ThreadPoolExecutor 和 ProcessPoolExecutor 分别提供基于线程和进程的异步执行,接口由 Executor 抽象类定义。
- Executor 抽象类
1 | class concurrent.futures.Executor: |
定义了以下方法:
submit(fn, *args, **kwargs) : 提交一个函数到 executor 中执行
map(func, *iterables, timeout=None, chunksize=1) :提交一个函数和一组输入,映射为多个任务worker到executor中执行, chuncksize 表示将输入分成数量为 chunksize 的分组,输入到一个任务,只对 ProcessPoolExecutor 有效;返回值是每个任务执行 func 的返回值列表
shutdown(wait=True) : 关闭executor,wait=True 表示等待所有任务完成再关闭
- TheadpoolExecutor 线程池
1 | class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') |
- max_workers: 指定池中最大的线程数,默认为cpu核心数*5
- thread_name_prefix:线程名字前缀,方便调试
- ProcessPoolExecutor 进程池
1 | class concurrent.futures.ProcessPoolExecutor(max_workers=None) |
- max_workers: 指定池中最大的进程程数,默认为cpu核心数
Future 对象
executor.submit 返回到是 future 对象,返回代表executor已经登记了这个异步任务,future对象包装了func,提供控制异步任务的接口class concurrent.futures.Future
cancel(): 取消任务,返回值True表示成功取消
cancelled():是否已经取消
running():是否正在运行
done(): 是否完成
result(timeout=None): 等待异步任务的返回值, 如果取消了抛 CancelledError 异常
exception(timeout=None): 如果发生了异常,取出异常
add_done_callback(fn): 添加回调函数,异步任务完成则回调,回调会在添加回调的线程执行,如果抛出的异常是 Exception的子类,会被忽略
以下几个用于单元测试
set_running_or_notify_cancel()
set_result(result)
set_exception(exception)模块方法,用于等待 future 结果
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
与 asyncio 的 api ayncio.wait 是一样的功能;
返回完成的和为完成的两组 Futures 结果, 用 future.result() 获取结果
它会等待 futures 执行的结果,默认 return_when=ALL_COMPLETE 会等到所有futures都完成才返回, FIRST_COMPLETED 表示有一个完成就返回,FIRST_EXCEPTION表示有一个产生异常就返回concurrent.futures.as_completed(fs, timeout=None)
返回的是一个生成器对象,先完成的future的结果先返回
0x1 使用 ThreadPoolExecutor 进行编程
使用 with 语句创建一个 executor
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor使用 executor.submit 提交任务
对返回的多个future
需要使用模块函数 concurrent.futures.as_completed 或 wait 进行处理;
as_completed 处理的 futures 先完成先返回结果
NOTE: wait 返回到是两个值, 分别是完成的 future 列表和pending的future列表或使用 executor.map 创建任务多个任务
map 直接返回结果
列表,所有任务都完成,才返回等待结果
1 | import time |
0x2 使用 ProcessPoolExecutor 进行编程
api 与 ThreadPoolExecutor 是相同的, 上面例子直接替换 ThreadPoolExecutor 为 ProcessPoolExecutor 就变成了进程池。
0x3 multiprocessing.pool 模块
python 另一个模块 multiprocessing 也提供了进程池和线程池,分别是:
multiprocessing.Pool 进程池
multiprocessing.pool.ThreadPool 线程池
用法类似 executor 的 map 方法
1 | p = Pool(5) # 线程版 p = ThreadPool(5) |
用法跟 executor 很接近, 但是 python 文档里一直没有关于 ThreadPool 的部分,所以 ThreadPool 是一个未完成的和充分测试的模块,谨慎使用。