异步 IO,多路复用学习+生成器/协程 - Go语言中文社区

异步 IO,多路复用学习+生成器/协程


异步 IO:遇到 IO 请求不等待,IO 请求完成后自动调用回调函数即可。
IO 多路复用:监听多个 socket 对象,当其有数据时,自动通知。有 select,poll 和 epoll 模型。

# socket 不阻塞时候该怎么写
import socket
sk = socket.socket()

sk.bind(('127.0.0.1', 8000))
sk.setblocking(False)
sk.listen()
conn_l = []
del_conn = []
while True:
    try:
        conn, addr = sk.accept()
        print('建立连接了')
        # msg = conn.recv(1024)   # 不阻塞 ,但是没有消息会报错
        # print(msg)
        conn_l.append(conn)
    except BlockingIOError as e:
        for con in conn_l: 
            try:
                msg = conn.recv(1024)
                if msg == b'':
                    del_conn.append(conn)
                print(msg)
                conn.send(b'byebye')
            except BlockingIOError as e:
                pass
        for con in del_conn:
            if con in conn_l:
                conn_l.remove(conn)
        del_conn.clear()

select 模型

# select
import socket
import select
socket = socket()
sk.bind(('127.0.0.1', 8000))
sk.setblocking(False)
sk.listen()

read_lst = []
while  True:
    r_lst, w_lst, x_lst = select.select(read_lst, [], [])
    for i in r_lst:
        if i is sk:
            conn, addr = i.accept()
            read_lst.append(conn)
        else:
            ret= i.recv(1024)
            if ret == b'':
                i.close()
                read_lst.remove(i)
                continue
            print(ret)
# rlist 表示有人给我发送数据
# wlist 表示我已经和别人建立连接
# 对象必须有 fileno 方法,只要对 socket 对象进行一次封装即可
import socket
import select


class HttpReqeust:
    def __init__(self, sk, host, callback):
        self.socket = sk
        self.host = host
        self.callback= callback
    def fileno(slef):
        return self.socket.fileno()


class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = []    # 用于检测是否连接是否成功

    def add_reqeust(self, host, callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host, 80))
        except BlockingIOError as e:
            pass

        request = HttpReqeust(sk, host, callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):
        while True:
            rlist, wlst, elist  = select.select(self,conn, self.connection, self.conn, 0.05)
            for w in wlist:
                tpl = "get/ http/1.0rnHost:%srnrn"%(w.host,)
                w.socket.send(bytes(tpl, encoding="utf-8"))
                self.connection.remove(w)
            for r in rlist:
                recv_data = bytes()
                while True:
                    try:
                        chunk = r.socket.recv(8096)
                        recv_data += chunk
                    except Exception as e:
                        break
                # print(r.host, recv_data)
                r.callback(recv_data)
                r.socket.close()
                self.conn.remove(r)

            if len(self.conn) == 0:
                break

def f1(data):
    pass
def f2(data):
   pass

url_list = [
  {'host': '', 'callback': f1},
  {'host': '', 'callback':  f2},
]
req = AsyncRequest()
for itemin url_list:
    req.add_request(item['host'], item['callback'])

req.run()
 #  协成 + 异步 IO ----> 1 个线程发送 N 个 Http 请求
 #      - asyncio 不支持 http,只支持 socket 请求,封装字符串,只需需要封装 http 数据包
 #      - aiohttp 模块,封装了 http 数据包  asyncio + aiohttp
 #      - requests   asyncio + requests
 #      - gevent + requests --> grequests
 #      - Twisted   -> scrapy 基于 Twisted  defer 对象 getPage reactor
 #      - tornado 
 #      gevent - Twisted > Tornado > asynico

新篇章

基于 select( poll, epoll) + 回调 + 事件循环

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
#使用select完成http请求
urls = []
stop = False

class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1rnHost:{}rnConnection:closernrn".format(self.path, self.host).encode("utf8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            html_data = data.split("rnrn")[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
        except BlockingIOError as e:
            pass

        #注册
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

def loop():
    #事件循环,不停的请求socket的状态并调用对应的回调函数
    #1. select本身是不支持register模式
    #2. socket状态变化以后的回调是由程序员完成的
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)
    #回调+事件循环+select(pollepoll)

if __name__ == "__main__":
    fetcher = Fetcher()
    import time
    start_time = time.time()
    for url in range(20):
        url = "...".format(url)
        urls.append(url)
        fetcher = Fetcher()
        fetcher.get_url(url)
    loop()
    print(time.time()-start_time)

这种方式有三个缺点:
可读性差,共享状态管理困难,异常处理困难
所以后来 Python 出现了支持协程的生成器,进而出现 asyncio 这样一种异步解决方案。

生成器

启动一个生成器有两种方法,一种是调用 next,另一种方法是使用 .send(None)

def te():
    a = yield 'no sense'
    yield 1
    return 'Ok'
gen = te()
gen.send(None)    <===> next(gen)
print(next(gen))      ----> 抛出异常

输出

no sense
1
.....  ...  StopIteration: bobby

不过只要将最后一个 next(gen) 调用改为

try:
    print(next(gen))
except StopIteration as e:
    print(e.value)

这样就正常了

  • 生成器的 close() 方法
def te():
    yield 1
    yield 2
    yield 3
    return 4
gen = te()
gen.send(None)
gen.close()   
如果在生成器总不捕获异常那么下一行会抛出异常
如果生成器中处理了,如果之后还有 yield 语句,会在此处抛出一个 RuntimeError 异常,显示 忽略 GeneratorExit 异常,
如果之后没有 yield,直接return 了,那么下一行会抛异常
next(gen)        ----> 这一行会抛出异常 StopItertation

调用 close 的话,需要处理异常

try:
    yield 1
excepe GeneratorExit:
    raise StopIteration

GeneratorExit 是继承自 BaseException,它是更基础的类,与 Exception 不同。

  • 生成器的 throw() 方法
    可以向生成器扔一个异常,需要在里面捕获异常
  • yield from
    python 3.3 新加特性
# 模仿 itertools.chain 方法
def te(*args, **kwargs):
    for i in args:
        yield from i
        # for val in i:
            # yield val

l = [1,2,3]
d = {'a': 2, 'b':2}
for value in te(l, d, range(5,10)):
    print(value)
# 会依次输出各项
def gen():
    yield 1
    pass
# 委托生成器
def g1(gen):
    yield from gen
# 调用方
def main():
    g = g1()
    g.send(None)
版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/3845d1bcf378
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:08:36
  • 阅读 ( 1245 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

推荐文章

猜你喜欢