python异步io

0x0 关联概念

  1. 异步vs同步 阻塞vs非阻塞 的概念
  • 同步和异步:描述的是通信的双方约定的返回结果的方式。

    • 同步: 请求方发出请求后,等待直到对方有结果,把结果取回来
    • 异步:请求方发出请求后,对方有结果了,对方把结果送过来

      比如我们需要在 7 点钟出门,同步的方式就是一直看时间,看到7点,知道时间到了;异步的方式是调个7点的闹钟,闹钟一响,知道时间到了。

  • 阻塞和非阻塞:描述的是程序调用(耗时的动作,如IO)发生后的调用方的状态

    • 阻塞: 调用后,调用方的线程就停下来,知道调用完成返回才继续执行
    • 非阻塞: 调用后,直接返回,调用方继续执行

      比如: 我们需要7点出门,查看时间是否到了7点,阻塞的方式,就是一直看着时钟,等到7点; 非阻塞的方式可以是,瞄一下时钟,如果没到7点,打一盘游戏,再瞄一下,没到继续玩,反复直到时间到了。

  • 我觉得要讨论一个过程是同步/异步,一个调用时阻塞/非阻塞,必须针对具体的对象来说。

我们应用可以用异步的方式封装 select、poll 这种同步结构,因为针对的对象不同,就称为不同的方式。 asyncio 成为异步io,但是底层可能用到是 select 实现的。

  1. 协程是什么 和线程、进程有什么区别
  • 进程是操作系统任务的单位: 每个进程有独立的内存空间,互不干扰,创建进程需要创建或者拷贝进程空间,占用很多资源,进程切换需要内核切换上下文
  • 线程是同一个进程内,共享进程空间的任务单位,线程每个线程有独立的控制块,是虚拟出来的并发执行的任务,但是共享内存,线程切换需要内核切换上下文
  • 协程是用户态实现的,协程切换任务由应用层面实现,开销很小。同个调度器下的协程应该是在同一个线程内
  1. gevent 是什么
    http://sdiehl.github.io/gevent-tutorial/
    https://segmentfault.com/a/1190000006945621

gevent 是一个 python 库,用于 “异步化” python 阻塞代码,可以实现用单线程并发执行多任务。
比如正常 socket 库是同步的,我们正常写的 socket.connect/read/write 都是阻塞,要并发,通常使用多线程,而gevent通过monkey patch的
方式,直接修改了socket标准库的运行时代码,使得代码运行起来是非阻塞的。

  1. gunicorn
    http://docs.gunicorn.org/en/stable/design.html

  2. Twisted

  3. asyncio 是什么
    python3.5+ 的标准库,用于编写异步应用 (Note)
    http://asyncio.readthedocs.io/en/latest/getting_started.html

异步IO是计算机操作系统对输入输出的一种处理方式:发起IO请求的线程不等IO操作完成,就继续执行随后的代码,IO结果用其他方式通知发起IO请求的程序。与异步IO相对的是更为常见的“同步(阻塞)IO”:发起IO请求的线程不从正在调用的IO操作函数返回(即被阻塞),直至IO操作完成。
– Wikipedia

0x1 Asyncio 编程

协程是基于生成器 yield 的特性实现的,不是 python3.5 之后才有的,但是 python3.5之后,带来了 async, await 关键字等更加自然的支持,我们可以比较容易的理解和使用协程,所以这里我都是以 python3.5+ 的特性学习。

但是协程不是生成器,协程和生成器混淆就很头大了。

tornado 的特点就是基于异步io,后面的版本也迁移到 asyncio 上。
https://github.com/tornadoweb/tornado/blob/master/docs/releases/v5.0.0.rst

  1. async, await 关键字
  • async def: 用于定义协程(coroutine function),与 @asyncio.coroutine 一样

    async def get_web_page(url):
        return page
    
    iscoroutinefunction(get_web_page)
    
  • await: 与 yield from 作用一样,等待一个协程返回

  1. coroutine vs future vs task
    A task is a subclass of Future.
    A task is a future that is wrapping a coroutine in particular.
    Task 是Futhure 的一个特例,它包装的是一个 coroutine,有一些负责处理 coroutine 的方法
    http://lucumr.pocoo.org/2016/10/30/i-dont-understand-asyncio/
    future 是一个最终会返回结果的对象, 太难解释了,还是等我真正理解了在说吧 😝

  2. eventloop
    是一个集中调度Task的处理器,

下面是一个简单的event loop 调度协程的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0) # e)
return x + y

async def print_sum(x, y):
result = await compute(x, y) #d)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop() # a)
task = loop.creat_task(print_sum(1, 2)) # b)
loop.run_until_complete(task) # c)
loop.close()

a) b) c) 使用 event loop 创建了一个 task,并且运行 task
d) task运行中, 执行 print_sum 遇到 await compute(x, y),需要等待协程 compute 执行,转而执行 compute
e) compute 协程运行遇到 await asyncio.sleep(1.0),转而执行协程 sleep,sleep协程中创建了新的future用来定时,并且等待future完成,而future里肯定封装了 event loop 的调度魔法,使得 task 暂停了,并且跑去执行用于定时的 task
z) 定时完成之后,event loop 重新执行 task,又恢复到了 compute 协程,继续运行,compute 返回,协程结束,又回到 print_sumx协程, print_sum 也返回协程结束,task 也结束

协程调度图示

  1. 常用 api
    rtfm: https://docs.python.org/3.6/library/asyncio-eventloop.html
    base_event.py 查看源码实现
    理解协程工作原理,一定要时刻记得协程是在一个线程上运行的,这样就会更容易理清关系。
  • task.add_done_callback:给任务添加一个callback,callback 方法的签名是当个参数,参数是 future,任务完成后,通过 call_soon 调度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    async def cor():
    print("cor start")
    await asyncio.sleep(1)

    print("cor wake")
    return True

    def callback(future):
    print("result:", future.result()) # result: True
    print("stoping loop")
    loop.stop()

    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    task = loop.create_task(cor())
    task.add_done_callback(callback)

    loop.run_forever()
    print("leave forever")
    loop.close()
  • task.cancel:取消一个任务,会向任务对应的协程抛出一个 asyncio.CancelledError 异常,如果协程没有捕获,task就会结束;如果协程内拦截了该异常,则task不会被取消

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    async def my_coroutine():
    try:
    print("mytask start")
    await asyncio.sleep(2)
    except asyncio.CancelledError as e:
    print("exception: asyncio.CancelledError, "
    "task.cancel() 会向被协程抛 CancelledError "
    "如果捕获不想上抛,则阻止了本次cancel")
    raise # 取消
    print("mytask end")
    return True

    async def stop_coroutine(task):
    await asyncio.sleep(1)
    task.cancel() # 取消任务

    if __name__ == '__main__':
    with closing(asyncio.get_event_loop()) as loop:
    task = loop.create_task(my_coroutine())
    loop.run_until_complete(stop_coroutine(task))
    print('canceled: ', task.cancelled())
    loop.run_forever()
    loop.close()
  • ayncio.gather(futures): 创建一个 future (outer),把 一组futures作为它是子任务,等待所有子任务都完成了,返回子任务的 result 列表,result顺序和 futures 顺序一一对应。创建的future,别忘了丢到event loop 里

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    async def cor1():
    print("cor1")
    return "cor1 result"

    async def cor2():
    print("cor2")
    return "cor2 result"

    if __name__ == '__main__':
    with closing(asyncio.get_event_loop()) as loop:
    outer_future = asyncio.gather(cor1(), cor2()) # 汇集成一个future
    result = loop.run_until_complete(outer_future) # 返回两个协程的结果列表
    print("result: ", result)
  • ayncio.wait_for:

  • ayncio.wait: 是一个生成器方法, wait() 是一个协程
    coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
    done, pending = await asyncio.wait(fs)
    返回完成的和为完成的两组 Futures 结果, 用 future.result() 获取结果
    它会等待 futures 执行的结果,默认 return_when=ALL_COMPLETE 会等到所有futures都完成才返回, FIRST_COMPLETED 表示有一个完成就返回,FIRST_EXCEPTION表示有一个产生异常就返回

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    async def cor1():
    print("cor1")
    await asyncio.sleep(1)
    return "cor1 result"

    async def cor2():
    print("cor2")
    return "cor2 result"


    with closing(asyncio.get_event_loop()) as loop:
    outer_future = asyncio.wait(
    (cor1(), cor2()),
    return_when=asyncio.FIRST_COMPLETED # 有一个完成就返回
    )
    # 返回两组task, 完成的与未完成的
    completed, pending = loop.run_until_complete(outer_future)
    print("compled: ", completed)
    for c in completed:
    print("完成的result:", c.result())

    print("pending: ", pending) # 有一个任务没有完成,pending了

    outer_future2 = asyncio.wait_for(list(pending)[0], timeout=1) # 继续等待未完成的
    result2 = loop.run_until_complete(outer_future2)
    print("pending 的task 继续完成的result:", result2)
    print("the end ")
  • ensure_future:传入协程或者 future,如果是协程,则调用 creat_task 创建task,并返回;如果传入的是future,且已经是属于一个event loop的,则直接返回。作用是确保参数 coro_or_future 加入在event loop中调度

    NOTE: Task 是 Future 的子类

  • asyncio.shield: 使协程或future不可取消,它拦截掉
    下面代码和签名cancel例子唯一的不同就是

1
2
# task = loop.create_task(my_coroutine())  # 协程可被取消
task = loop.create_task(asyncio.shield(my_coroutine())) # 保护协程不被取消
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def another_coroutine():
print("hello")
return "hi"

async def my_coroutine():
try:
print("mytask start")
await asyncio.sleep(2)

except asyncio.CancelledError as e:
print("exception: asyncio.CancelledError, "
"task.cancel() 会向被协程抛 CancelledError "
"如果捕获不想上抛,则阻止了本次cancel")
raise

await asyncio.sleep(3)
print("mytask end")
return True

async def stop_coroutine(task):
await asyncio.sleep(1)
task.cancel() # 取消其他任务
result = await another_coroutine()
print(result)

if __name__ == '__main__':
with closing(asyncio.get_event_loop()) as loop:

# task = loop.create_task(my_coroutine()) # 协程可被取消
task = loop.create_task(asyncio.shield(my_coroutine())) # 保护协程不被取消

loop.run_until_complete(stop_coroutine(task))
print('canceled: ', task.cancelled())
loop.run_forever()
  • loop.create_task: 创建 Task 到 event loop 调度器中,run_forevent 才开始调度

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    async def say(what, when):
    await asyncio.sleep(when)
    print("say {0} on {1}".format(what, when))

    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # create tasks
    task1 = loop.create_task(say("hello", 2))
    task2 = loop.create_task(say("hi", 3))
    task3 = loop.create_task(say("hey", 1))
    # run task created in even loop
    loop.run_until_complete(asyncio.wait((task1, task2, task3)))
    print("all task done") # would not happen here
    loop.close()
  • loop.create_task vs asyncio.ensure_future
    参考: https://stackoverflow.com/questions/36342899/asyncio-ensure-future-vs-baseeventloop-create-task-vs-simple-coroutine
    create_task 把一个协程包装成task进行调度; ensure_future 如果传入的是一个协程,则调用 create_task 转换成task进行调度,如果是future,则什么都不做,语义是确保传入的对象能被调度。
    前者精确的调度协程,后者是协程或future, 如果明确的知道传入对象的类型是协程,那就用 create_task, 如果不确定,那就用 ensure_future —— 通常在编写内部api时会需要,参考asyncio.gather 源码。

当我们使用 create_task 意味着我们需要后台运行指定任务,被创建的任务和当前所在任务是并发运行的,如果不需要后台运行,则使用 await 串行运行

  • loop.run_until_complete: 传入 coro_or_future,作用是等待一个 future 完成,并返回result,如果传入的是 cor,会用 ensure_future 包装为任务(多次调用同一个cor会产生不同的任务,如果希望只有一个,可用 ensure_future 先创建future)。内部实现是利用add_done_callback设置回调,然后run_forever 知道完成或异常

  • loop.run_forever:持续运行事件循环,子任务只能通过 Future.add_done_callback() 来得到结果,或者调用 loop.stop() 停止run_forever

  • loop.call_later/call_soon/call_at: 再 loop 中(延迟)执行传入的函数,这里的函数不是协程,是普通函数, 返回handler, handler.cancel 可以取消
    callback 用于执行简单的任务,注意他是不同函数,不是协程,所以不能使用 await 来和协程协作,执行要么成功返回要么异常

  • 什么时候发生调度
    并不是 await 导致event loop 任务调度,而是 await 执行的协程内部调用 asyncio 的相关api导致的,直接在 await 所执行的协程内部用 time.sleep(10) 暴力阻塞,就会发现,其他协程都没运行机会

例子代码源码https://github.com/chenjiancan/asyncio_exam

0x2 Transports and protocols

https://docs.python.org/3.6/library/asyncio-protocol.html

Transports are classes provided by asyncio in order to abstract various kinds of communication channels. You generally won’t instantiate a transport yourself; instead, you will call an AbstractEventLoop method which will create the transport and try to initiate the underlying communication channel, calling you back when it succeeds.

Once the communication channel is established, a transport is always paired with a protocol instance. The protocol can then call the transport’s methods for various purposes.

When subclassing a protocol class, it is recommended you override certain methods. Those methods are callbacks: they will be called by the transport on certain events (for example when some data is received); you shouldn’t call them yourself, unless you are implementing a transport.

Transport 定义了数据传输通道,比如 BaseTransport,TCP, UDP, SSL,subprocess pipe 是 asyncio 已经实现的 transport,我们可以继承他们来试下自己的传输层
protocol 定义了数据传输的协议,asyncio.Protocol 定义了基本接口,通过继承它可以实现自己的协议,asyncio 实现了 Protocol(for tcp or ssl), BufferedProtocol,DatagramProtocol, SubprocessProtocol 等。

protocol 是和 transpport 配对使用的。

使用步骤是通过继承的方式,实现一个 Protocol 类作为 protocol factory,然后使用 asyncio 的API, eg: create_connection 创建对应的 协程或task

  1. tcp
    AbstractEventLoop.create_server: 创建 tcp server端
    AbstractEventLoop.create_connection: 创建tcp连接到 server
    代码: https://github.com/chenjiancan/asyncio_exam/transport_protocol/tcp

  2. udp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    loop.create_datagram_endpoint(self, protocol_factory,
    local_addr=None, remote_addr=None, *,
    family=0, proto=0, flags=0,
    reuse_address=None, reuse_port=None,
    allow_broadcast=None, sock=None)
    ```
    服务端绑定端口使用 local_addr
    客户端使用 remote_addr 指定服务端地址
    代码: https://github.com/chenjiancan/asyncio_exam/transport_protocol/udp


    ## 0x3 Streams
    1. stream tcp
    ```python
    coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
  • start_server: 启动tcp或unix server,并再 event loop 监听连接,参数
  • open_connection: 客户端发起连接

一个简单的tcp server & client 模型: https://github.com/chenjiancan/asyncio_exam/echo_tcp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 客户端连接进来的回调
async def on_client_connected(reader, writer):
print("connected")
loop = asyncio.get_event_loop()
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))

print("Send: %r" % message)
writer.write(data)

await writer.drain() # flush
print("Close the client socket")
writer.close()

async def main():
# create tcp server
print("start server!")
# start_server is a coroutine, await it
server = await asyncio.start_server(client_connected_cb=on_client_connected, host="127.0.0.1", port=8090,
loop=asyncio.get_event_loop())
print('Serving on {}'.format(server.sockets[0].getsockname()))

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

loop.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def main():
reader, writer = await asyncio.open_connection('127.0.0.1', 8090, loop=asyncio.get_event_loop())
message = "hi"
print('Send: %r' % message)
writer.write(message.encode())

data = await reader.read(100)
print('Received: %r' % data.decode())

print('Close the socket')
writer.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()