社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
之前的一篇一直写了python中迭代器和生成器的内容,这篇内容将介绍由生成器实现协程的方法。
日志中实例的代码都是在python3.5中运行的。
协程的内容受以下教程的启发较大:
http://dabeaz.com/coroutines/
并发(不是并行)编程目前有四种方式,多进程,多线程,异步,和协程。协程和多线程的区别在于:协程的调度对于内核来说是不可见的,协程间是协同调度的。
在python中协程可以由生成器来实现的。可以用一个“生产者--消费者”的模型来解释。
假设有两个函数,一个负责生产,一个负责消费。生产完成一个内容后,传递给消费函数进行消费,消费完之后又进行请求一次生产。当然可以用多线程来实现,用生成器的来实现例子如下:
#生产者、消费者
def consumer():
#消费结果
res=""
while True:
#n:消费的数目
n = yield res
#如果没有传入消费商品,继续while循环
if not n:
continue
print ('[消费者]consuming %s...' % n)
res ='ok'
def producer(c):
#c是消费者的生成器对象
c.__next__()
n=0
while True:
n=n+1
print("[生产者]producing %s" % n)
res=c.send(n)
print("[生产者]consumer return is %s" % res)
if n > 1000:
break
c.close()
if __name__ == '__main__':
c=consumer()
producer(c)
执行结果:
[生产者]consumer return is ok
[生产者]producing 997
[消费者]consuming 997...
[生产者]consumer return is ok
[生产者]producing 998
[消费者]consuming 998...
[生产者]consumer return is ok
[生产者]producing 999
[消费者]consuming 999...
[生产者]consumer return is ok
[生产者]producing 1000
[消费者]consuming 1000...
[生产者]consumer return is ok
[生产者]producing 1001
[消费者]consuming 1001...
[生产者]consumer return is ok
在上面的例子中,consumer是一个生成器,producer是一个函数,producer接收一个生成器对象。
如果把上面的逻辑按照多线程来写就比较复杂,我还是用python的多线程写了一个同样的逻辑,用Queue来做进程间通信:
import threading,queue
number=10000
class producer(threading.Thread):
def __init__(self,q1,q2):
super(producer,self).__init__()
#消费者反馈
self.q1=q1
#产品输出
self.q2=q2
def run(self):
n=1
print("[生产者]producing %s" % n)
self.q2.put(n)
while True:
if not self.q1.empty():
res = self.q1.get(block=False)
print("[生产者]consumer return is %s" % res)
if n > number:
break
print("[生产者]producing %s" % n)
self.q2.put(n)
n=n+1
class consumer(threading.Thread):
def __init__(self,q1,q2):
super(consumer,self).__init__()
self.q1=q1
self.q2=q2
def run(self):
while True:
if not self.q2.empty():
i=self.q2.get(block=False)
print ('[消费者]consum ing %s...' % i)
res ='ok'
self.q1.put(res)
if i>=number:
break
def test():
q1=queue.Queue(maxsize=5)
q2=queue.Queue(maxsize=5)
p = producer(q1,q2)
c = consumer(q1,q2)
p.start()
c.start()
p.join()
c.join()
test()
执行结果:
[生产者]producing 99997
[消费者]consuming 99997...
[生产者]consumer return is ok
[生产者]producing 99998
[消费者]consuming 99998...
[生产者]consumer return is ok
[生产者]producing 99999
[消费者]consuming 99999...
[生产者]consumer return is ok
[生产者]producing 100000
[消费者]consuming 100000...
[生产者]consumer return is ok
协程和多线程去执行这样一个逻辑速度有多大差异呢,我对比了一下生产/消费1000个东西的耗时对比:
引用一下廖雪峰的教程中的观点(http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090171191d05dae6e129940518d1d6cf6eeaaa969000)
协程的执行有点像多线程,但协程的特点在于是一个线程执行。
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
用协程可以很方便的打造流水线式的程序,对生产者/消费者的一个扩展就是:流水线上的每一个环节,都有input和output,它既消费上一个流程的结果,也产生一个输出给下一个环节。非常类似于linux中的管道的功能。
假设有这样一个数据流水线:
也就是完成这样一个流水线84BzX5Fn-->84BzX-->BzX-->BZX,代码如下:
#流水线例子
from random import shuffle,randint
import re
def random_str():
#随机生成5~10位字符串
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
chars_list=list(chars)
shuffle(chars_list)
res=''.join(chars_list[0:randint(5,10)])
return res
def manager(target):
#生成1万个随机字符串并传入work1
n=0
target.__next__()
while (n<10000):
n=n+1
resource = random_str()
print("Manager: %s" % resource)
target.send(resource)
def work1(target):
target.__next__()
while True:
input_str = yield
if len(input_str)>5:
#截断
res=input_str[0:5]
else:
res=input_str
#给下一个生成器传入值
target.send(res)
def work2(target):
target.__next__()
while True:
str_from_work1 = yield
if str_from_work1:
#去掉字符串中的数字
res=re.sub(r'([d]+)','',str_from_work1)
target.send(res)
def work3():
#字母变为大写
while True:
str_from_work2 = yield
if str_from_work2:
res=str_from_work2.upper()
print("output: %s" % res)
if __name__ == '__main__':
manager(work1(work2(work3())))
#执行结果
Manager: 5wKvB
output: WKVB
Manager: uRDjl5
output: URDJL
Manager: 84BzX5Fn
output: BZX
Manager: FfhjoMueD
output: FFHJO
Manager: TDqe2axN
output: TDQE
Manager: cqEOj0I
output: CQEOJ
Manager: c2xQE
output: CXQE
Manager: 2qMupVNc
output: QMUP
Manager: MriKfphB
output: MRIKF
Manager: KSoc7EibJZ
output: KSOC
Manager: QDrou
output: QDROU
在上面的例字里面,第2和第3个工人之间,增加一个新的环节:
例字如下:
#流水线例子
from random import shuffle,randint
import re
def random_str():
#随机生成5~10位字符串
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
chars_list=list(chars)
shuffle(chars_list)
res=''.join(chars_list[0:randint(5,10)])
return res
def manager(target):
#生成1万个随机字符串并传入work1
n=0
target.__next__()
while (n<10000):
n=n+1
resource = random_str()
print("Manager: %s" % resource)
target.send(resource)
def work1(target):
target.__next__()
while True:
input_str = yield
if len(input_str)>5:
#截断
res=input_str[0:5]
else:
res=input_str
#给下一个生成器传入值
target.send(res)
def work2(target):
target.__next__()
while True:
str_from_work1 = yield
if str_from_work1:
#去掉字符串中的数字
res=re.sub(r'([d]+)','',str_from_work1)
target.send(res)
def broadcast(targets):
#广播
for i in targets:
i.__next__()
while True:
res=yield
for i in targets:
i.send(res)
def work_tail(tail,target):
#把字符串后面添加一个tag
target.__next__()
while True:
res=yield
target.send(res+tail)
def work3():
#字母变为大写
while True:
str_from_work2 = yield
if str_from_work2:
res=str_from_work2.upper()
print("output: %s" % res)
if __name__ == '__main__':
manager(work1(work2(
broadcast([
work_tail('_tag1',work3()),
work_tail('_tag2',work3()),
work_tail('_tag_new',work3()),
work_tail('_tag_old',work3()),
])
)))
执行结果:
Manager: BEv8akI5q
output: BEVA_TAG1
output: BEVA_TAG2
Manager: j6FfVUysB9
output: JFFV_TAG1
output: JFFV_TAG2
Manager: C27fB
output: CFB_TAG1
output: CFB_TAG2
Manager: 1V8NYRrB2
output: VNY_TAG1
output: VNY_TAG2
Manager: qtAOye1
output: QTAOY_TAG1
output: QTAOY_TAG2
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!