Python笔记_从迭代器、生成器到协程(二) - Go语言中文社区

Python笔记_从迭代器、生成器到协程(二)


之前的一篇一直写了python中迭代器和生成器的内容,这篇内容将介绍由生成器实现协程的方法。

日志中实例的代码都是在python3.5中运行的。

协程的内容受以下教程的启发较大:
http://dabeaz.com/coroutines/

1、什么是协程?

并发(不是并行)编程目前有四种方式,多进程,多线程,异步,和协程。协程和多线程的区别在于:协程的调度对于内核来说是不可见的,协程间是协同调度的。

在python中协程可以由生成器来实现的。可以用一个“生产者--消费者”的模型来解释。

假设有两个函数,一个负责生产,一个负责消费。生产完成一个内容后,传递给消费函数进行消费,消费完之后又进行请求一次生产。当然可以用多线程来实现,用生成器的来实现例子如下:

#生产者、消费者
def consumer():
    #消费结果
    res=""
    while True:
        #n:消费的数目
        n = yield res                 
        #如果没有传入消费商品,继续while循环
        if not n:
            continue    
        print ('[消费者]consuming %s...' % n)
        res ='ok'

def producer(c):
    #c是消费者的生成器对象
    c.__next__()
    n=0 
    while True:
        n=n+1
        print("[生产者]producing %s" % n)
        res=c.send(n)
        print("[生产者]consumer return is %s" % res)
        if n > 1000:
            break
    c.close()

if __name__ == '__main__':
    c=consumer()
    producer(c)
执行结果:
[生产者]consumer return is ok
[生产者]producing 997
[消费者]consuming 997...
[生产者]consumer return is ok
[生产者]producing 998
[消费者]consuming 998...
[生产者]consumer return is ok
[生产者]producing 999
[消费者]consuming 999...
[生产者]consumer return is ok
[生产者]producing 1000
[消费者]consuming 1000...
[生产者]consumer return is ok
[生产者]producing 1001
[消费者]consuming 1001...
[生产者]consumer return is ok

在上面的例子中,consumer是一个生成器,producer是一个函数,producer接收一个生成器对象。

如果把上面的逻辑按照多线程来写就比较复杂,我还是用python的多线程写了一个同样的逻辑,用Queue来做进程间通信:

import threading,queue
number=10000

class producer(threading.Thread):
    def __init__(self,q1,q2):
        super(producer,self).__init__()
        #消费者反馈
        self.q1=q1
        #产品输出
        self.q2=q2

    def run(self):
        n=1
        print("[生产者]producing %s" % n)
        self.q2.put(n)
        while True:
            if not self.q1.empty():
                res = self.q1.get(block=False)
                print("[生产者]consumer return is %s" % res)
                if n > number:
                    break
                print("[生产者]producing %s" % n)
                self.q2.put(n)
                n=n+1

class consumer(threading.Thread):
    def __init__(self,q1,q2):
        super(consumer,self).__init__()
        self.q1=q1
        self.q2=q2
    def run(self):
        while True:
            if not self.q2.empty():
                i=self.q2.get(block=False)
                print ('[消费者]consum ing %s...' % i)
                res ='ok'
                self.q1.put(res)
                if i>=number:
                    break

def test():
    q1=queue.Queue(maxsize=5)
    q2=queue.Queue(maxsize=5)
    p = producer(q1,q2)
    c = consumer(q1,q2)
    p.start()
    c.start()
    p.join()
    c.join()
test()

执行结果:
[生产者]producing 99997
[消费者]consuming 99997...
[生产者]consumer return is ok
[生产者]producing 99998
[消费者]consuming 99998...
[生产者]consumer return is ok
[生产者]producing 99999
[消费者]consuming 99999...
[生产者]consumer return is ok
[生产者]producing 100000
[消费者]consuming 100000...
[生产者]consumer return is ok

协程和多线程去执行这样一个逻辑速度有多大差异呢,我对比了一下生产/消费1000个东西的耗时对比:

  • 协程:0.086s
  • 多线程:21.79s

2、协程的好处都有啥?

引用一下廖雪峰的教程中的观点(http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090171191d05dae6e129940518d1d6cf6eeaaa969000

  • 协程的执行有点像多线程,但协程的特点在于是一个线程执行。

  • 最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

  • 第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

3、用协程来打造流水线pipeline

用协程可以很方便的打造流水线式的程序,对生产者/消费者的一个扩展就是:流水线上的每一个环节,都有input和output,它既消费上一个流程的结果,也产生一个输出给下一个环节。非常类似于linux中的管道的功能。

3.1处理字符串的例字

假设有这样一个数据流水线:

  1. manager随机产生一些5~10位的长度的字符串,传递给第一个工人
  2. 第一个工人将字符串截断,只取前5个字符
  3. 第二个工人将这个字符串中的“数字”去掉,然后排序,输出一个新字符串
  4. 第三个工人将字符串中的字符都变为大写

也就是完成这样一个流水线84BzX5Fn-->84BzX-->BzX-->BZX,代码如下:

#流水线例子
from random import shuffle,randint
import re

def random_str():
    #随机生成5~10位字符串
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
    chars_list=list(chars)
    shuffle(chars_list)
    res=''.join(chars_list[0:randint(5,10)])
    return res

def manager(target):
    #生成1万个随机字符串并传入work1
    n=0
    target.__next__()
    while (n<10000):
        n=n+1
        resource = random_str()
        print("Manager: %s" % resource)
        target.send(resource)

def work1(target):
    target.__next__()
    while True:
        input_str = yield
        if len(input_str)>5:
            #截断
            res=input_str[0:5]
        else:
            res=input_str
        #给下一个生成器传入值
        target.send(res)

def work2(target):
    target.__next__()
    while True:
        str_from_work1 = yield
        if str_from_work1:
            #去掉字符串中的数字
            res=re.sub(r'([d]+)','',str_from_work1)
            target.send(res)

def work3():
    #字母变为大写
    while True:
        str_from_work2 = yield
        if str_from_work2:
            res=str_from_work2.upper()
            print("output: %s" % res)

if __name__ == '__main__':
    manager(work1(work2(work3())))

#执行结果
Manager: 5wKvB
output: WKVB
Manager: uRDjl5
output: URDJL
Manager: 84BzX5Fn
output: BZX
Manager: FfhjoMueD
output: FFHJO
Manager: TDqe2axN
output: TDQE
Manager: cqEOj0I
output: CQEOJ
Manager: c2xQE
output: CXQE
Manager: 2qMupVNc
output: QMUP
Manager: MriKfphB
output: MRIKF
Manager: KSoc7EibJZ
output: KSOC
Manager: QDrou
output: QDROU

3.2带分支的流水线

在上面的例字里面,第2和第3个工人之间,增加一个新的环节:

  • 有若干工人,根据几个“客户需求”,给每个字符串的尾部添加一些tag

例字如下:
#流水线例子
from random import shuffle,randint
import re

def random_str():
    #随机生成5~10位字符串
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
    chars_list=list(chars)
    shuffle(chars_list)
    res=''.join(chars_list[0:randint(5,10)])
    return res

def manager(target):
    #生成1万个随机字符串并传入work1
    n=0
    target.__next__()
    while (n<10000):
        n=n+1
        resource = random_str()
        print("Manager: %s" % resource)
        target.send(resource)

def work1(target):
    target.__next__()
    while True:
        input_str = yield
        if len(input_str)>5:
            #截断
            res=input_str[0:5]
        else:
            res=input_str
        #给下一个生成器传入值
    target.send(res)

def work2(target):
    target.__next__()
    while True:
        str_from_work1 = yield
        if str_from_work1:
            #去掉字符串中的数字
            res=re.sub(r'([d]+)','',str_from_work1)
            target.send(res)

def broadcast(targets):
    #广播
    for i in targets:
        i.__next__()
    while True:
        res=yield
        for i in targets:
            i.send(res)

def work_tail(tail,target):
    #把字符串后面添加一个tag
    target.__next__()
    while True:
        res=yield
        target.send(res+tail)

def work3():
    #字母变为大写
    while True:
        str_from_work2 = yield
        if str_from_work2:
            res=str_from_work2.upper()
            print("output: %s" % res)
if __name__ == '__main__':
    manager(work1(work2(
        broadcast([
            work_tail('_tag1',work3()),
            work_tail('_tag2',work3()),
            work_tail('_tag_new',work3()),
            work_tail('_tag_old',work3()),
        ])
    )))

执行结果:
Manager: BEv8akI5q
output: BEVA_TAG1
output: BEVA_TAG2
Manager: j6FfVUysB9
output: JFFV_TAG1
output: JFFV_TAG2
Manager: C27fB
output: CFB_TAG1
output: CFB_TAG2
Manager: 1V8NYRrB2
output: VNY_TAG1
output: VNY_TAG2
Manager: qtAOye1
output: QTAOY_TAG1
output: QTAOY_TAG2
版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/7eb5aded9eae
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:02:29
  • 阅读 ( 1442 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢