Python asyncio 异步编程(一) - Go语言中文社区

Python asyncio 异步编程(一)


一、简介

网络模型有很多种类,为了实现高并发也有很多方案,比如多进程、多线程、协程。多进程和多线程中 IO 的调度更多取决于系统,而协程中的调度由用户控制,用户可以在函数中 yield 一个状态。使用协程可以实现高效的并发任务。Python3.4 中引入了协程的概念,这是以生成器对象为基础的协程,Python3.5 则确定了协程的语法。实现协程的不仅仅是 asyncio,tornado 和 gevent 都实现了类似的功能。

一些概念:
  • event_loop 事件循环:程序开启一个无限的循环,并把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程

  • coroutine 协程:协程对象,使用 async 关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用

  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态

  • future :代表将来执行或没有执行的任务的结果。它和 task 没有本质的区别

  • async / await 关键字:Python3.5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口

二、例子

2.1 基本协程
import time
import asyncio
import functools

# 版本一
def one():
    start = time.time()
    # async 关键字可定义一个协程函数
    # @asyncio.coroutine 这个装饰器也可以定义协程
    # 注意,函数的返回值才是协程,也叫事件
    # 协程不能单独运行,需要作为事件注入到事件循环 loop 里
    async def do_some_work(x):
        time.sleep(.01)
        print('[do_some_work]  这是个协程任务')
        print('[do_some_work]  Coroutine {}'.format(x))
    coroutine = do_some_work('one')     # 创建协程
    loop = asyncio.get_event_loop()     # 创建事件循环
    loop.run_until_complete(coroutine)  # 将协程注入事件循环生成任务对象并启动
    print('----- 运行耗时:{:.4f}s -----'.format(time.time()-start))

运行结果:

$ python3 d.py
[do_some_work]  这是个协程任务
[do_some_work]  Coroutine one
----- 运行耗时:0.0126s -----
2.2 task 任务

协程对象不能直接运行,在注册事件循环的时候,其实是 run_until_complete 方法将协程包装成为了一个任务(task)对象,保存了协程运行后的状态,用于未来获取协程的结果

# 版本二
def two():
    start = time.time()
    async def do_some_work(x):
        time.sleep(.01)
        print('[do_some_work]  这是个协程任务')
        print('[do_some_work]  Coroutine {}'.format(x))
    coroutine = do_some_work('two')         # 创建协程
    loop = asyncio.get_event_loop()         # 创建事件循环
    task = loop.create_task(coroutine)      # 将协程作为参数创建任务
    # task = asyncio.ensure_future(coroutine) 作用同上
    # task 是 asyncio.Task 类的实例,asyncio.Task 是 asyncio.Future 的子类
    # 为什么要用协程创建 task ?这个过程中 asyncio.Task 类中做了一些工作
    # 其中包括预激协程,以及协程运行过程中遇到某些异常时的处理
    print(isinstance(task, asyncio.Task))   
    print(isinstance(task, asyncio.Future))
    print('[task] ', task._state)           # 打印任务状态
    loop.run_until_complete(task)           # 将任务注入事件循环并启动
    print('[task] ', task._state)           # 打印任务状态
    print('----- 运行耗时:{:.4f}s -----'.format(time.time()-start))

运行结果:

$ python3 d.py
True
True
[task]  PENDING
[do_some_work]  这是个协程任务
[do_some_work]  Coroutine two
[task]  FINISHED
----- 运行耗时:0.0113s -----
2.3 绑定回调

假如协程是一个 IO 操作,等它处理完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 对象中添加回调来实现。回调函数的最后一个参数是 future 对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入

# 版本三
def three():
    start = time.time()
    async def do_some_work(x):
        time.sleep(.01)
        print('[do_some_work]  这是个协程任务')
        print('[do_some_work]  Coroutine {}'.format('three'))
    def callback(name, future):             # 回调函数,在任务中被使用
        print('[callback]  回调函数,干点儿别的')
        print('[callback]  {} 状态: {}'.format(name, future._state))
    coroutine = do_some_work(2)             # 创建协程
    loop = asyncio.get_event_loop()         # 创建事件循环
    task = loop.create_task(coroutine)      # 将协程作为参数创建任务
    print('[task] ', task._state)           # 打印任务状态
    # 给任务添加回调函数,任务完成后执行,注意回调函数的最后一个参数须为 future
    # functools.partial 创建偏函数作为 add_done_callback 方法的参数
    # 而 task 本身作为回调函数的最后一个参数
    task.add_done_callback(functools.partial(callback, 'coroutine'))
    loop.run_until_complete(task)           # 将任务注入事件循环并启动
    print('[task] ', task._state)           # 打印任务状态
    print('----- 运行耗时:{:.4f}s -----'.format(time.time()-start))

运行结果:

$ python3 d.py
[task]  PENDING
[do_some_work]  这是个协程任务
[do_some_work]  Coroutine three
[callback]  回调函数,干点儿别的
[callback]  coroutine 状态: FINISHED
[task]  FINISHED
----- 运行耗时:0.0112s -----

多数情况下无需调用 my_future.add_done_callback 方法,因为可以直接把回调函数操作放在协程中 yield from my_future 表达式的后面。这是协程的一大优势:协程是可以暂停和恢复的函数

多数情况下同样无需调用 my_future.result 方法,因为 yield from 从 future 中产出的值就是结果

2.4 多个协程任务

实际项目中,往往有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,需要借助 asyncio.gather 方法。任务的 result 方法可以获得对应的协程函数的 return 值

# 版本四
def four():
    start = time.time()
    async def do_some_work(t):
        print('[do_some_work]  这是个协程任务 {}'.format(t))
        # await 等同于 yield from
        # 只有由 asyncio.coroutine 装饰的函数内才可使用 yield from
        # 只有由 async 关键字定义的函数才可使用 await
        await asyncio.sleep(t)              # 假装 IO 耗时 t 秒
        return '[do_some_work] Coroutine four done {}'.format(t)
    coroutine1 = do_some_work(5)                # 创建协程
    coroutine2 = do_some_work(3)                # 创建协程
    task1 = asyncio.ensure_future(coroutine1)   # 将协程作为参数创建任务
    task2 = asyncio.ensure_future(coroutine2)   # 将协程作为参数创建任务
    # 多任务异步,需要创建一个协程或任务收集器
    # 将其作为 asyncio.run_until_complete 的参数
    # 这样的结果就是多任务异步执行,直到全部完成后释放 CPU
    # 然后控制权交给 four 函数
    # 全部完成耗时为任务中耗时最多的那个
    gather = asyncio.gather(task1, task2)       # 协程或任务收集器
    loop = asyncio.get_event_loop()             # 创建事件循环
    loop.run_until_complete(gather)             # 将任务收集器注入事件循环并启动
    print('[task1] ', task1._state)             # 打印任务状态
    print('[task2] ', task2._state)             # 打印任务状态
    # 打印协程返回值,注意只有协程结束后才可获得返回值
    print(task1.result())
    print(task2.result())
    print('----- 运行耗时:{:.4f}s -----'.format(time.time()-start))

运行结果:

$ python3 d.py
[do_some_work]  这是个协程任务 5
[do_some_work]  这是个协程任务 3
[task1]  FINISHED
[task2]  FINISHED
[do_some_work] Coroutine four done 5
[do_some_work] Coroutine four done 3
----- 运行耗时:5.0047s -----

以上示例都没有调用 loop.close 方法,好像也没有什么问题。所以到底要不要调 loop.close 呢?简单来说,loop 只要不关闭,就还可以再运行 run_until_complete 方法,关闭后不可运行。建议调用 loop.close,彻底清理 loop 对象防止误用

2.5 取消任务

创建新文件 async_cancel.py 并写入以下代码:

# File Name: async_cancel.py 

import asyncio

async def work(id, t):
    print('Wroking...')
    await asyncio.sleep(t)
    print('Work {} done'.format(id))

def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]
    # 程序运行过程中,快捷键 Ctrl + C 会触发 KeyboardInterrupt 异常
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))
    except KeyboardInterrupt:
        # 取消所有任务,停止事件循环
        loop.stop()
    finally:
        loop.close()

if __name__ == '__main__':
    main()

终端执行文件,等待过程中手动 Ctrl + C 停止程序:

$ python3 async_cancel.py 
Wroking...
Wroking...
Wroking...
Work 1 done
^C%

修改 main 函数如下:

def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]
    # 程序运行过程中,快捷键 Ctrl + C 会触发 KeyboardInterrupt 异常
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))
    except KeyboardInterrupt:
        print()
        # 每个线程里只能有一个事件循环
        # 此方法可以获得事件循环中的所有任务的集合
        # 任务的状态有 PENDING 和 FINISHED 两种
        tasks = asyncio.Task.all_tasks()
        for i in tasks:
            print('取消任务:{}'.format(i))
            # 任务的 cancel 方法可以取消未完成的任务
            # 取消成功返回 True ,已完成的任务取消失败返回 False
            print('取消状态:{}'.format(i.cancel()))
    finally:
        loop.close()

再次以同样的方式运行程序:

$ python3 async_cancel.py 
Wroking...
Wroking...
Wroking...
Work 1 done
^C
取消任务:<Task finished coro=<work() done, defined at b.py:3> result=None>
取消状态:False
取消任务:<Task pending coro=<work() running at b.py:5> wait_for=<Future ...
取消状态:True
取消任务:<Task pending coro=<work() running at b.py:5> wait_for=<Future ...
取消状态:True
2.6 loop.run_forever

loop.run_until_complete 方法运行事件循环,当其中的全部任务完成后,自动停止事件循环;loop.run_ferever 方法为无限运行事件循环,需要自定义 loop.stop 方法并执行之,举例说明:

import asyncio

async def work(loop, t):                    # 协程函数
    print('start')
    await asyncio.sleep(t)                  # 子协程,模拟 IO 操作
    print('after {}s stop'.format(t))
    loop.stop()     # 停止事件循环,stop 后仍可重新运行,close 后不可

loop = asyncio.get_event_loop()             # 创建事件循环
task = asyncio.ensure_future(work(loop, 1)) # 创建任务,该任务会自动加入事件循环
loop.run_forever()  # 无限运行事件循环,直至 loop.stop 停止
loop.close()        # 关闭事件循环,只有 loop 处于停止状态才会执行

以上是单任务的事件循环,将 loop.stop 方法写入协程函数中,捎带执行。下面是多任务事件循环,使用回调函数执行 loop.stop 停止事件循环:

import time
import asyncio
import functools

def loop_stop(loop, future):    # 函数的最后一个参数须为 future
    loop.stop()                 # 停止事件循环,stop 后仍可重新运行,close 后不可

async def work(t):              # 协程函数
    print('start')
    await asyncio.sleep(t)      # 子协程,模拟 IO 操作
    print('after {}s stop'.format(t))

def main():
    loop = asyncio.get_event_loop()
    # 创建任务收集器,参数为任意数量的协程,任务收集器本身也是 task 也是 future
    tasks = asyncio.gather(work(1), work(2))
    # 任务收集器的 add_done_callback 方法添加回调函数
    # 当所有任务完成后,自动运行此回调函数
    # 注意 add_done_callback 方法的参数是回调函数
    # 这里使用 functools.partial 方法创建偏函数以便将 loop 作为参数加入
    tasks.add_done_callback(functools.partial(loop_stop, loop))
    loop.run_forever()  # 无限运行事件循环,直至 loop.stop 停止
    loop.close()        # 关闭事件循环,只有 loop 处于停止状态才会执行

if __name__ == '__main__':
    t = time.time()
    main()
    print('耗时:{:.4f}s'.format(time.time() - t))

loop.run_until_complete 方法本身也是通过调用 loop.run_forever 方法,然后通过回调函数调用 loop.stop 方法实现的

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/7fd361cde22c
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:09:48
  • 阅读 ( 981 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢