Python之协程 - Go语言中文社区

Python之协程


协程的介绍

定义

单线程里面不断切换这个单线程中的微线程,即通过代码来实现让一个线程中的更小线程来回切换,相对于多线程多进程可以节省线程切换的时间。

优点
  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
缺点
  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

相关代码

初识协程

协程在Python中使用yield生成器实现,每次执行到yield位置代码停止,返回一个数据,随后在别的地方可以接手这个数据后,代码恢复继续执行

import time
def fun_1():
    while 1:
        print('FUN_1 函数开始执行')
        n = yield 'FUN_1 执行完毕,切换到FUN_2'
        print('n is -------', n)
        # 函数运行到yield会暂停函数执行,存储这个值。并且有next():调用这个值,与send():外部传入一个值
        if not n:
            return
        time.sleep(1)
        print('FUN_1 函数执行结束')
def fun_2(t):
    next(t)
    while 1:
        print('FUN_2 函数开始执行')
        time.sleep(1)
        ret = t.send('over')
        print(ret)
        print('FUN_2 函数执行结束')
    # t.close()
if __name__ == '__main__':
    n = fun_1()
    fun_2(n)
# 执行结果:
FUN_1 函数开始执行
FUN_2 函数开始执行
n is ------- over
FUN_1 函数执行结束
FUN_1 函数开始执行
FUN_1 执行完毕,切换到FUN_2
FUN_2 函数执行结束
FUN_2 函数开始执行
n is ------- over
FUN_1 函数执行结束
FUN_1 函数开始执行
FUN_1 执行完毕,切换到FUN_2
FUN_2 函数执行结束
...

可以看到,没有使用多线程处理,依然在两个函数中不断切换循环。

Python3.5之后的协程

在Python3中新增asyncio库,在 3.5+ 版本中, asyncio 有两样语法非常重要, async, await. 弄懂了它们是如何协同工作的, 我们就完全能发挥出这个库的功能了。

基本用法

我们要时刻记住,asyncio 不是多进程, 也不是多线程, 单单是一个线程, 但是是在 Python 的功能间切换着执行. 切换的点用 await 来标记, 使用async关键词将其变成协程方法, 比如 async def function():。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

重要概念
  • event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。
  • coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。
  • task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别
  • async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

异步与非异步方式的对比
非异步方式
import time
def job(t):
    print('Start job ', t)
    time.sleep(t)  # wait for "t" seconds
    print('Job ', t, ' takes ', t, ' s')
def main():
    [job(t) for t in range(1, 3)]
if __name__ == '__main__':
    t1 = time.time()
    main()
    print("NO async total time : ", time.time() - t1)

# 执行结果:
Start job  1
Job  1  takes  1  s
Start job  2
Job  2  takes  2  s
NO async total time :  3.008066177368164

结果:可以看出, 我们的 job 是按顺序执行的, 必须执行完 job 1 才能开始执行 job 2, 而且 job 1 需要1秒的执行时间, 而 job 2 需要2秒. 所以总时间是 3 秒多.

异步方式
import asyncio
import time
async def job(t):  # async 形式的功能
    print('Start job ', t)
    await asyncio.sleep(t)  # 等待t秒, 期间切换其他任务
    print('Job ', t, ' takes ', t, ' s')
async def main(loop):  # async 形式的功能
    tasks = [loop.create_task(job(t)) for t in range(1, 3)]  # 创建任务, 但是不执行
    await asyncio.wait(tasks)  # 执行并等待所有任务完成
if __name__ == '__main__':
    t1 = time.time()
    loop = asyncio.get_event_loop()  # 建立 loop
    loop.run_until_complete(main(loop))  # 执行 loop,并且等待所有任务结束
    loop.close()  # 关闭 loop
    print("Async total time : ", time.time() - t1)

# 执行结果
Start job  1
Start job  2
Job  1  takes  1  s
Job  2  takes  2  s
Async total time :  2.0045559406280518

结果:可以看出, 我们没有等待 job 1 的结束才开始 job 2, 而是 job 1 触发了 await 的时候就切换到了 job 2 了. 这时, job 1 和 job 2 同时在等待 await asyncio.sleep(t), 所以最终的程序完成时间, 取决于等待最长的 t, 也就是 2秒. 这和上面用普通形式的代码相比(3秒), 的确快了很多.


协程

最简单的协程

工作原理:协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。

import asyncio
import requests
async def scan(url):
    r = requests.get(url).status_code
    return r
task = asyncio.ensure_future(scan('http://www.langzi.fun'))
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(task.result())  # 200

以上代码的含义:把任务赋值给task,然后loop为申请调度(这么理解),然后执行。

绑定回调函数的协程

第一个函数执行后,执行的结果传递给第二个函数继续执行

import asyncio
import requests
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
def callback(task):
    print('Status:', task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

# 执行结果
Task: <Task pending coro=<request() running at 006.py:15> cb=[callback() at 006.py:21]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at 006.py:15> result=<Response [200]>>

解释说明:
在这里我们定义了一个 request() 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print() 语句。随后我们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,然后调用 print() 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。
那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback() 方法即可,我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了。

多任务协程(依次执行方式)

把所有的任务加载到一个列表中,然后依次执行
可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行

import asyncio
import requests
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
if __name__ == "__main__":
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    print('Tasks:', tasks)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        print('Task Result:', task.result())

# 执行结果
Tasks: [<Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

解释:使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来

多任务协程实现
非异步执行方式
import asyncio
import requests
import time
async def request():
    url = 'http://www.langzi.fun'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'status:', response.status_code)
if __name__ == '__main__':
    start = time.time()
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print('Cost time:', end - start)

# 执行结果
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Cost time: 2.0420401096343994
异步方式
import asyncio
import aiohttp
import time


async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result


async def request():
    url = 'http://www.langzi.fun'
    print('Waiting for', url)
    result = await get(url)
    print('Get response from', url)
    # print('Get response from', url, 'result:', result)


if __name__ == "__main__":
    start = time.time()
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print('Cost time:', end - start)

# 执行结果
'''
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
009.py:20: RuntimeWarning: coroutine 'ClientSession.close' was never awaited
  session.close()
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x104cd7c18>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822ca8>, 1.189907949)]']
connector: <aiohttp.connector.TCPConnector object at 0x1053b1518>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5fd0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822d08>, 1.410301241)]']
connector: <aiohttp.connector.TCPConnector object at 0x10581f0b8>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5f98>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822e28>, 1.854444905)]']
connector: <aiohttp.connector.TCPConnector object at 0x1057f0c18>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5c88>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822be8>, 2.065381857)]']
connector: <aiohttp.connector.TCPConnector object at 0x1056a5cc0>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5940>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822e88>, 2.52941069)]']
connector: <aiohttp.connector.TCPConnector object at 0x1057f07b8>
Get response from http://www.langzi.fun
Cost time: 2.5775160789489746
'''

结果:为什么使用aiohttp呢?在之前就说过requests这个库是堵塞的,并不支持异步,而aiohttp是支持异步的网络请求的库。

协程嵌套

使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。

import asyncio
async def myfun(i):
    print('start {}th'.format(i))
    await asyncio.sleep(1)
    print('finish {}th'.format(i))
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    myfun_list = [asyncio.ensure_future(myfun(i)) for i in range(4)]
    loop.run_until_complete(asyncio.wait(myfun_list))

# 执行结果
start 0th
start 1th
start 2th
start 3th
finish 0th
finish 1th
finish 2th
finish 3th

这种用法和上面一种的不同在于后面调用的是asyncio.gather还是asyncio.wait,当前看成完全等价即可,所以平时使用用上面哪种都可以。
另外,二者其实是有细微差别的

  • gather更擅长于将函数聚合在一起
  • wait更擅长筛选运行状况
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print('Task ret: ', task.result())
if __name__ == '__main__':
    start = now()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print('TIME: ', now() - start)

# 执行结果
Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 2s
Task ret:  Done after 4s
Task ret:  Done after 1s
TIME:  4.002034902572632

如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。

import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    results = await asyncio.gather(*tasks)
    for result in results:
        print('Task ret: ', result)
if __name__ == '__main__':
    start = now()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print('TIME: ', now() - start)

# 执行结果
Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 4s
TIME:  4.0041937828063965

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。

import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.gather(*tasks)
if __name__ == '__main__':
    start = now()
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main())
    for result in results:
        print('Task ret: ', result)
    print('TIME: ', now() - start)

# 执行结果
Waiting:  1
Waiting:  2
Waiting:  2
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 2s
TIME:  2.0021467208862305

或者返回使用asyncio.wait方式挂起协程:

import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.wait(tasks)

if __name__ == '__main__':
    start = now()
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main())
    for result in results:
        print('Task ret: ', result)
    print('TIME: ', now() - start)

# 执行结果
Waiting:  1
Waiting:  2
Waiting:  2
Task ret:  {<Task finished coro=<do_some_work() done, defined at 013.py:17> result='Done after 2s'>, <Task finished coro=<do_some_work() done, defined at 013.py:17> result='Done after 2s'>, <Task finished coro=<do_some_work() done, defined at 013.py:17> result='Done after 1s'>}
Task ret:  set()
TIME:  2.005409002304077

也可以使用asyncio的as_completed方法:

import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))
if __name__ == '__main__':
    start = now()
    loop = asyncio.get_event_loop()
    done = loop.run_until_complete(main())
    print('TIME: ', now() - start)

# 执行结果
Waiting:  1
Waiting:  2
Waiting:  2
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 2s
TIME:  2.0059380531311035

由此可见,协程的调用和组合十分灵活,尤其是对于结果的处理,如何返回,如何挂起,需要逐渐积累经验和前瞻的设计。

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/2df762b7d606
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:00:09
  • 阅读 ( 1224 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢