python 异步编程---curio - Go语言中文社区

python 异步编程---curio


为什么使用协程

C10K问题

在互联网开始的早期,使用互联网的人较少,一台服务器同时在线的连接也不是很多,所以最初的服务器设计的时候使用进程或者是线程的方式分配一个TCP连接,这个时候不存在C10K的难题。

当到了Web2.0的时代,互联网不再是单纯的浏览网页了,它开始需要进行交互,随着互联网的进一步发展,用户界面和界面交互都变得非常复杂起来,应用程序的逻辑也随之变的更加复杂,即时通信和在线的实时互动已经变的非常普遍了,假设每个用户都必须要与服务器保持一个或者多个TCP连接,而且每一个TCP连接需要占用一个进程(线程)的资源,这样的话,一个服务器的并发连接数是非常高的,一个普通的大一点网页服务的连接可能就过亿了。进程是操作系统最宝贵的资源,一台机器创建不了这么多进程,如果是C10k就要创建1万个进程,这个是操作系统无法承受的。就算是分布式系统,维持1亿用户在线也需要10万台服务器,成本是巨大的,只有FLAG、BAT这样的公司才有财力购买如此多的服务器。

怎么样解决C10K问题

既然有了C10K问题,程序员们就开始行动去解决它。为了解决这一问题,出现了「用同一进程/线程来同时处理若干连接」的思路,也就是I/O多路复用。于是FreeBSD推出了kqueue,Linux推出了epoll,Windows推出了IOCP。这些操作系统提供的功能就是为了解决C10K问题。因为Linux是互联网企业中使用率最高的操作系统,Epoll就成为C10K killer、高并发、高性能、异步非阻塞这些技术的代名词了。

epoll技术的编程模型就是异步非阻塞回调,也可以叫做Reactor,事件驱动,事件轮循(EventLoop)。Epoll就是为了解决C10K问题而生。使用Epoll技术,使得小公司也可以玩高并发。不需要购买很多服务器,有几台服务器就可以服务大量用户。Nginx,libevent,node.js这些就是Epoll时代的产物。

就这样C10K问题解决了,然后又来更高的问题,也就是C100K,C1M等。Epoll既然能解决C10K,解决什么C100K,C1M也是可以的。秘诀就是使用epoll模型,然后多买一些服务器就可以了。但是问题又来了

异步嵌套回调太TM难写了。尤其是Node.js层层回调,缩进了几十层,要把程序员逼疯了。于是一个新的技术被提出来了,那就是协程(coroutine)。这个技术本质上也是异步非阻塞技术,它是将事件回调进行了包装,让程序员看不到里面的事件循环。程序员就像写阻塞代码一样简单。比如调用 client->recv() 等待接收数据时,就像阻塞代码一样写。实际上是底层库在执行recv时悄悄保存了一个状态,比如代码行数,局部变量的值。然后就跳回到EventLoop中了。什么时候真的数据到来时,它再把刚才保存的代码行数,局部变量值取出来,又开始继续执行。

这个就像时间禁止的游戏一样,国王对巫师说“我必须马上得到宝物,不然就砍了你的脑袋”,巫师念了一句时间停止的咒语,直到过了1年后勇士们才把宝物送来。这时候巫师解开咒语,把宝物交给国王。这里国王就可以理解成协程,他根本没感觉到时间停止,在他停止到醒来期间发生了什么他不知道,也不关心。

这就是协程的本质。协程是异步非阻塞的另外一种展现形式。Golang,Erlang,Lua协程都是这个模型。

说的有点远了,关于协程和epoll模型,你可能需要到网上找一些更加详细的资料看看,现在开始我们今天的主题 – curio库使用指南

curio-一个用同步写法进行异步编程的库

如何把同步的代码改成异步的

首先看一个同步的例子:

def handle(id):

    subject = get_subject_from_db(id)

    buyinfo = get_buyinfo(id)

    change = process(subject, buyinfo)

    notify_change(change)

    flush_cache(id)
import curio

async def handle(id):

    async with TaskGroup() as g:

        subject = await g.spawn(get_subject_from_db, id)

        buyinfo = await g.spawn(get_buyinfo, id)

    change = await process(subjetc.result, buginfo)

    await change.join()

    await notifu_change(change.result)

    await flush_cache(id)

其实就是把函数包装成一个Task对象或者说future对象,使用spawn可以把函数包装为Task,然后等待函数完成后,从Task的result属性获取返回值。

下篇我们来聊一聊curio具体有哪些东西和怎么样去使用他们进行异步编程。

一些基本概念

  • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。

  • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别

定义一个协程

定义协程很简单,使用python3.5的关键字async,可以像定义普通的函数一样:

import curio

async def countdown(n):

    while n > 0:

        print('T-minus', n)

        await curio.sleep(1)

        n -= 1

if __name__ == '__main__':

    curio.run(countdown, 10)

使用async定义一个协程(coroutine),协程也是一种对象。协程不能直接运行,需要把协程加入到事件循环(loop),由后者在适当的时候调用协程。curio使用curio kernel来运行协程,run()方法可以开始kernel并且初始化Task。

创建一个task

协程对象不能直接运行,在运行kernel的时候,可以curio.spawn方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。

import curio

async def countdown(n):

    while n > 0:

        print('T-minus', n)

        await curio.sleep(1)

        n -= 1

async def kid():

    print('Building the Millenium Falcon in Minecraft')

    await curio.sleep(1000)

async def parent():

    kid_task = await curio.spawn(kid)

    await curio.sleep(5)

    print("Let's go")

    count_task = await curio.spawn(countdown, 10)

    await count_task.join()

    print("We're leaving!")

    await kid_task.join()

    print('Leaving')

if __name__ == '__main__':

    curio.run(parent)

在当前程序中,parent()使用curio.spawn()创建新的子任务,当sleep一段时间后,countdown开始运行,join()方法会等待这个Task运行结束,在首先等待countdown()完成后,然后程序等待kid()完成,在你运行这个程序时,可以得到下面的结果:

curio monitor

在上个程序中的kid()将会阻塞1000秒,而parent的join方法会等待kid()的完成后才会结束。你可以将代码改成下面的样子来开启monitor:


if __name__ == '__main__':

    curio.run(parent, with_monitor=True)

运行程序,当程序阻塞在kid()的时候,打开monitor工具:

curio > ps

Task State Cycles Timeout Task


1 FUTURE_WAIT 1 None Monitor.monitor_task

2 READ_WAIT 1 None Kernel._run_coro.<locals>._kernel_task

3 TASK_JOIN 3 None parent

4 TIME_SLEEP 1 None kid

curio >

还可以使用where查看追踪task:

curio > w 3

这样手动取消task会抛出TaskCancelled异常,表示程序没有正常运行。因此你需要结束task的时候需要在程序中手动取消:

当然,你在parent取消kid的时候,kid可以捕捉到这个消除请求并且清除它:

同步机制

curio模块包含多种同步机制,它提供和线程一样的同步机制(Event, Lock, Semaphore, and Condition)。看下面使用Event的例子:

start_evt = curio.Event()

async def kid():

print('Can I play?')

await start_evt.wait()

print('Building the Millenium Falcon in Minecraft')

async with curio.TaskGroup() as f:

    await f.spawn(friend, 'Max')

    await f.spawn(friend, 'Lillian')

    await f.spawn(friend, 'Thomas')

    try:

        await curio.sleep(1000)

    except curio.CancelledError:

        print('Fine. Saving my work.')

        raise

async def parent():

kid_task = await curio.spawn(kid)

await curio.sleep(5)

print('Yes, go play')

await start_evt.set()

await curio.sleep(5)

print("Let's go")

count_task = await curio.spawn(countdown, 10)

await count_task.join()

print("We're leaving!")

try:

    await curio.timeout_after(10, kid_task.join)

except curio.TaskTimeout:

    print('I warned you!')

    await kid_task.cancel()

print('Leaving!')

在程序运行kid()的时候,await start_evt.wait()会等待,直到await start_evt.set()运行。

转自:
https://heshangbuxitou.github.io/2017/11/04/curio%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97%EF%BC%88%E4%BA%8C%EF%BC%89/

参考:
聊聊C10K问题及解决方案

Curio官方文档

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/78fac0669583
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:10:04
  • 阅读 ( 2016 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢