社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
单线程里面不断切换这个单线程中的微线程,即通过代码来实现让一个线程中的更小线程来回切换,相对于多线程多进程可以节省线程切换的时间。
协程在Python中使用yield生成器实现,每次执行到yield位置代码停止,返回一个数据,随后在别的地方可以接手这个数据后,代码恢复继续执行
import time
def fun_1():
while 1:
print('FUN_1 函数开始执行')
n = yield 'FUN_1 执行完毕,切换到FUN_2'
print('n is -------', n)
# 函数运行到yield会暂停函数执行,存储这个值。并且有next():调用这个值,与send():外部传入一个值
if not n:
return
time.sleep(1)
print('FUN_1 函数执行结束')
def fun_2(t):
next(t)
while 1:
print('FUN_2 函数开始执行')
time.sleep(1)
ret = t.send('over')
print(ret)
print('FUN_2 函数执行结束')
# t.close()
if __name__ == '__main__':
n = fun_1()
fun_2(n)
# 执行结果:
FUN_1 函数开始执行
FUN_2 函数开始执行
n is ------- over
FUN_1 函数执行结束
FUN_1 函数开始执行
FUN_1 执行完毕,切换到FUN_2
FUN_2 函数执行结束
FUN_2 函数开始执行
n is ------- over
FUN_1 函数执行结束
FUN_1 函数开始执行
FUN_1 执行完毕,切换到FUN_2
FUN_2 函数执行结束
...
可以看到,没有使用多线程处理,依然在两个函数中不断切换循环。
在Python3中新增asyncio库,在 3.5+ 版本中, asyncio 有两样语法非常重要, async, await. 弄懂了它们是如何协同工作的, 我们就完全能发挥出这个库的功能了。
我们要时刻记住,asyncio 不是多进程, 也不是多线程, 单单是一个线程, 但是是在 Python 的功能间切换着执行. 切换的点用 await 来标记, 使用async关键词将其变成协程方法, 比如 async def function():。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。
import time
def job(t):
print('Start job ', t)
time.sleep(t) # wait for "t" seconds
print('Job ', t, ' takes ', t, ' s')
def main():
[job(t) for t in range(1, 3)]
if __name__ == '__main__':
t1 = time.time()
main()
print("NO async total time : ", time.time() - t1)
# 执行结果:
Start job 1
Job 1 takes 1 s
Start job 2
Job 2 takes 2 s
NO async total time : 3.008066177368164
结果:可以看出, 我们的 job 是按顺序执行的, 必须执行完 job 1 才能开始执行 job 2, 而且 job 1 需要1秒的执行时间, 而 job 2 需要2秒. 所以总时间是 3 秒多.
import asyncio
import time
async def job(t): # async 形式的功能
print('Start job ', t)
await asyncio.sleep(t) # 等待t秒, 期间切换其他任务
print('Job ', t, ' takes ', t, ' s')
async def main(loop): # async 形式的功能
tasks = [loop.create_task(job(t)) for t in range(1, 3)] # 创建任务, 但是不执行
await asyncio.wait(tasks) # 执行并等待所有任务完成
if __name__ == '__main__':
t1 = time.time()
loop = asyncio.get_event_loop() # 建立 loop
loop.run_until_complete(main(loop)) # 执行 loop,并且等待所有任务结束
loop.close() # 关闭 loop
print("Async total time : ", time.time() - t1)
# 执行结果
Start job 1
Start job 2
Job 1 takes 1 s
Job 2 takes 2 s
Async total time : 2.0045559406280518
结果:可以看出, 我们没有等待 job 1 的结束才开始 job 2, 而是 job 1 触发了 await 的时候就切换到了 job 2 了. 这时, job 1 和 job 2 同时在等待 await asyncio.sleep(t), 所以最终的程序完成时间, 取决于等待最长的 t, 也就是 2秒. 这和上面用普通形式的代码相比(3秒), 的确快了很多.
工作原理:协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。
import asyncio
import requests
async def scan(url):
r = requests.get(url).status_code
return r
task = asyncio.ensure_future(scan('http://www.langzi.fun'))
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(task.result()) # 200
以上代码的含义:把任务赋值给task,然后loop为申请调度(这么理解),然后执行。
第一个函数执行后,执行的结果传递给第二个函数继续执行
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task):
print('Status:', task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
# 执行结果
Task: <Task pending coro=<request() running at 006.py:15> cb=[callback() at 006.py:21]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at 006.py:15> result=<Response [200]>>
解释说明:
在这里我们定义了一个 request() 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print() 语句。随后我们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,然后调用 print() 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。
那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback() 方法即可,我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了。
把所有的任务加载到一个列表中,然后依次执行
可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
if __name__ == "__main__":
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())
# 执行结果
Tasks: [<Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>, <Task pending coro=<request() running at 007.py:15>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
解释:使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来
import asyncio
import requests
import time
async def request():
url = 'http://www.langzi.fun'
print('Waiting for', url)
response = requests.get(url)
print('Get response from', url, 'status:', response.status_code)
if __name__ == '__main__':
start = time.time()
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
# 执行结果
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Waiting for http://www.langzi.fun
Get response from http://www.langzi.fun status: 200
Cost time: 2.0420401096343994
import asyncio
import aiohttp
import time
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://www.langzi.fun'
print('Waiting for', url)
result = await get(url)
print('Get response from', url)
# print('Get response from', url, 'result:', result)
if __name__ == "__main__":
start = time.time()
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
# 执行结果
'''
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
Waiting for http://www.langzi.fun
009.py:20: RuntimeWarning: coroutine 'ClientSession.close' was never awaited
session.close()
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x104cd7c18>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822ca8>, 1.189907949)]']
connector: <aiohttp.connector.TCPConnector object at 0x1053b1518>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5fd0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822d08>, 1.410301241)]']
connector: <aiohttp.connector.TCPConnector object at 0x10581f0b8>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5f98>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822e28>, 1.854444905)]']
connector: <aiohttp.connector.TCPConnector object at 0x1057f0c18>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5c88>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822be8>, 2.065381857)]']
connector: <aiohttp.connector.TCPConnector object at 0x1056a5cc0>
Get response from http://www.langzi.fun
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x1056a5940>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x105822e88>, 2.52941069)]']
connector: <aiohttp.connector.TCPConnector object at 0x1057f07b8>
Get response from http://www.langzi.fun
Cost time: 2.5775160789489746
'''
结果:为什么使用aiohttp呢?在之前就说过requests这个库是堵塞的,并不支持异步,而aiohttp是支持异步的网络请求的库。
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
import asyncio
async def myfun(i):
print('start {}th'.format(i))
await asyncio.sleep(1)
print('finish {}th'.format(i))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
myfun_list = [asyncio.ensure_future(myfun(i)) for i in range(4)]
loop.run_until_complete(asyncio.wait(myfun_list))
# 执行结果
start 0th
start 1th
start 2th
start 3th
finish 0th
finish 1th
finish 2th
finish 3th
这种用法和上面一种的不同在于后面调用的是asyncio.gather还是asyncio.wait,当前看成完全等价即可,所以平时使用用上面哪种都可以。
另外,二者其实是有细微差别的
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
if __name__ == '__main__':
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
# 执行结果
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
TIME: 4.002034902572632
如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
if __name__ == '__main__':
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
# 执行结果
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.0041937828063965
不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
if __name__ == '__main__':
start = now()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print('Task ret: ', result)
print('TIME: ', now() - start)
# 执行结果
Waiting: 1
Waiting: 2
Waiting: 2
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 2s
TIME: 2.0021467208862305
或者返回使用asyncio.wait方式挂起协程:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.wait(tasks)
if __name__ == '__main__':
start = now()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print('Task ret: ', result)
print('TIME: ', now() - start)
# 执行结果
Waiting: 1
Waiting: 2
Waiting: 2
Task ret: {<Task finished coro=<do_some_work() done, defined at 013.py:17> result='Done after 2s'>, <Task finished coro=<do_some_work() done, defined at 013.py:17> result='Done after 2s'>, <Task finished coro=<do_some_work() done, defined at 013.py:17> result='Done after 1s'>}
Task ret: set()
TIME: 2.005409002304077
也可以使用asyncio的as_completed方法:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print('Task ret: {}'.format(result))
if __name__ == '__main__':
start = now()
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)
# 执行结果
Waiting: 1
Waiting: 2
Waiting: 2
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 2s
TIME: 2.0059380531311035
由此可见,协程的调用和组合十分灵活,尤其是对于结果的处理,如何返回,如何挂起,需要逐渐积累经验和前瞻的设计。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!