python 并发编程——Futures

本篇日志记录python3 标准库中并发编程模块 concurrent.futures 的使用

doc

0x0 concurrent.futures 模块

concurrent.futures 模块是python3.2 之后加入标准库的,它为异步执行任务提供了抽象接口,通过 ThreadPoolExecutor 和 ProcessPoolExecutor 分别提供基于线程和进程的异步执行,接口由 Executor 抽象类定义。

  1. Executor 抽象类
1
class concurrent.futures.Executor:

定义了以下方法:

  • submit(fn, *args, **kwargs) : 提交一个函数到 executor 中执行

  • map(func, *iterables, timeout=None, chunksize=1) :提交一个函数和一组输入,映射为多个任务worker到executor中执行, chuncksize 表示将输入分成数量为 chunksize 的分组,输入到一个任务,只对 ProcessPoolExecutor 有效;返回值是每个任务执行 func 的返回值列表

  • shutdown(wait=True) : 关闭executor,wait=True 表示等待所有任务完成再关闭

  1. TheadpoolExecutor 线程池
1
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
  • max_workers: 指定池中最大的线程数,默认为cpu核心数*5
  • thread_name_prefix:线程名字前缀,方便调试
  1. ProcessPoolExecutor 进程池
1
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
  • max_workers: 指定池中最大的进程程数,默认为cpu核心数
  1. Future 对象
    executor.submit 返回到是 future 对象,返回代表executor已经登记了这个异步任务,future对象包装了func,提供控制异步任务的接口

    class concurrent.futures.Future

    cancel(): 取消任务,返回值True表示成功取消

    cancelled():是否已经取消

    running():是否正在运行

    done(): 是否完成

    result(timeout=None): 等待异步任务的返回值, 如果取消了抛 CancelledError 异常

    exception(timeout=None): 如果发生了异常,取出异常

    add_done_callback(fn): 添加回调函数,异步任务完成则回调,回调会在添加回调的线程执行,如果抛出的异常是 Exception的子类,会被忽略

    以下几个用于单元测试
    set_running_or_notify_cancel()
    set_result(result)
    set_exception(exception)

  2. 模块方法,用于等待 future 结果

  • concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
    与 asyncio 的 api ayncio.wait 是一样的功能;
    返回完成的和为完成的两组 Futures 结果, 用 future.result() 获取结果
    它会等待 futures 执行的结果,默认 return_when=ALL_COMPLETE 会等到所有futures都完成才返回, FIRST_COMPLETED 表示有一个完成就返回,FIRST_EXCEPTION表示有一个产生异常就返回

  • concurrent.futures.as_completed(fs, timeout=None)
    返回的是一个生成器对象,先完成的future的结果先返回

0x1 使用 ThreadPoolExecutor 进行编程

  1. 使用 with 语句创建一个 executor
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor

  2. 使用 executor.submit 提交任务
    对返回的多个 future 需要使用模块函数 concurrent.futures.as_completed 或 wait 进行处理;
    as_completed 处理的 futures 先完成先返回结果
    NOTE: wait 返回到是两个值, 分别是完成的 future 列表和pending的future列表

  3. 或使用 executor.map 创建任务多个任务
    map 直接返回结果列表,所有任务都完成,才返回

  4. 等待结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import time
import random
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']

def load_url(url, timeout=None):
print("load url ", url, threading.current_thread().getName())
time.sleep(random.random())
return "page for " + url

def submit_example():
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:

# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
time.sleep(10)
# 等待执行完成, 先完成先返回
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))

def map_example():
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 开始执行,全部执行完成才返回,按顺序返回
results = executor.map(load_url, URLS, chunksize=2) # chunksize is ignored by ThreadPoolExecutor.
print("result: ", list(results))

if __name__ == '__main__':
submit_example()
map_example()

0x2 使用 ProcessPoolExecutor 进行编程

api 与 ThreadPoolExecutor 是相同的, 上面例子直接替换 ThreadPoolExecutor 为 ProcessPoolExecutor 就变成了进程池。

0x3 multiprocessing.pool 模块

python 另一个模块 multiprocessing 也提供了进程池和线程池,分别是:
multiprocessing.Pool 进程池
multiprocessing.pool.ThreadPool 线程池

用法类似 executor 的 map 方法

1
2
p = Pool(5)   # 线程版 p = ThreadPool(5)
results = p.map(func, [1, 2, 3])

用法跟 executor 很接近, 但是 python 文档里一直没有关于 ThreadPool 的部分,所以 ThreadPool 是一个未完成的和充分测试的模块,谨慎使用。

例子源码: https://github.com/chenjiancan/asyncio_exam/combo