Go并发模式之pipeline(管道) - Go语言中文社区

Go并发模式之pipeline(管道)


pipeline

pipeline 是你可以用来在系统中形成抽象的另一种工具。特别是当程序需要流式处理 或批处理数据时,它是一个非常强大的工具。
pipeline 只不过是一系列将数据输入, 执行操作并将结果数据传回的系统。 将这些操作称为 pipeline 的一个 stage.
通过使用pipeline, 可以分离每个stage的关注点, 这提供了很多好处。如可以可以相互独立地修改各个stage,
还可以混合搭配stage(而无需修改stage);还可以 将每个stage 同时处理到上游 或下游 stage;
还可以扇出(或限制)pipeline
func main() {

	//给 数组每个值 乘上 一个 常数 返回
	multiply := func(values []int, multiplier int) []int {
		multipliedValues := make([]int , len(values))
		for i, v := range values{
			multipliedValues[i] = v * multiplier
		}
		return multipliedValues
	}

	add := func(values []int , additive int) []int {
		addedValues := make([]int, len(values))
		for i, v := range values{
			addedValues[i] = v + additive
		}
		return addedValues
	}

	ints := []int{1,2,3,4}
	for _, v := range add(multiply(ints, 2), 1){
		fmt.Println(v)
	}
}
//3
//5
//7
//9
具备哪些 属性 是stage? pipeline stage 属性是什么?
1。 消耗 并 返回相同的 类型
2。 他能 很方便的用语言来表达,可以方便地 被传递(函数可以 很方便被传递); pipeline stage 事实上与函数式编程密切相关

pipeline stage 有趣特性之一 : 在不改变stage本身的情况下,将我们的stage 结合到更高层次 变得 非常容易。例子如下:


ints := []int{1, 2, 3, 4}
for _, v := range multiply(add(multiply(ints, 2), 1), 2)
{
   fmt.Println(v)
}

分析上面这些 例子: 这些操作 有 "批处理操作的特点" 可认为就是 批处理 操作; 还有另一种类型的pipeline stage 执行 流操作,特点是
stage 一次只接收和处理一个元素。
批处理和流处理 优点 和 缺点

批处理的特点/缺点: 任何时候(数组/片)的内存占用量 都是我们发送pipeline 开始处 片 的 大小的两倍。

将stage  转换为 流处理  代码如下:
func main(){
	multiply := func(value, multiplier int) int {
		return value * multiplier
	}

	add := func(value, additive int) int{
		return value + additive
	}

	ints :=[]int{1, 2, 3, 4}
	for _, v := range ints{
		fmt.Println(multiply(add(multiply(v, 2), 1), 2))
	}
}
优点是: 内存占用 只回落到只有 pipeline输入 的大小
以上写法的缺点:
让 range 语句为我们的pipeline 进行繁重的提升,这也限制了我们的扩展能力。
每次迭代 都三次函数调用,这也很不爽
构建pipeline的最佳模式/ 最佳实践
func main(){
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for _, i := range integers{
				select {
					case <-done :
						return
					case intStream <- i:
				}
			}
		}()
		return intStream
	}

	multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <- chan int {
		multipliedStream := make(chan int)
		go func() {
			defer close(multipliedStream)
			for i := range intStream{
				select{
					case <-done :
						return
					case multipliedStream <- i*multiplier:
				}
			}
		}()
		return multipliedStream
	}

	add := func(done <-chan interface{}, intStream<- chan int, additive int)<-chan int{
		addedStream := make(chan int)
		go func(){
			defer close(addedStream)
			for i := range intStream {
				select {
					case <-done:
						return
					case addedStream <- i+ additive:
				}
			}
		}()
		return addedStream
	}

	done := make(chan interface{})
	defer close(done)

	intStream := generator(done , 1, 2, 3, 4)
	pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
	for v := range pipeline{
		fmt.Println(v)
	}

}
//6
//10
//14
//18
分析: generator 函数将一组离散值转换为一个channel 上的数据流。在使用流水线时, 会经常看到这个 "生成器" 的存在

一些 便利的 生成器:
一些 便利的 生成器:
 */

var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for{
			for _, v := range values{
				select {
					case <-done:
					return
					case valueStream <- v :
				}
			}
		}

	}()
	return valueStream
}

//从流中取出 1-- num  的值
var take = func(done <- chan interface{}, valueStream <-chan interface{}, num int,) <- chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i:=0; i< num; i++{
			select{
				case <- done:
				return
				case takeStream <- <-valueStream :
			}
		}
	}()
	return takeStream
}

//重复调用函数的生成器
var repeatFn = func(done<- chan interface{}, fn func() interface{}) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for{
			select {
				case <- done:
				return
				case valueStream<- fn():
			}

		}
	}()
	return valueStream
}

func main(){
	//repeat 移到外部
	//take 移到外部

	//repeatFn 重复调用函数的生成器


	done := make(chan interface{})
	defer close(done)

	for num := range take(done, repeat(done , 1,2, 3), 10){
		fmt.Printf("%v ", num)
	}

	rand := func() interface{} { return rand.Int()}
	for num := range take(done, repeatFn(done, rand),10){
		fmt.Println(num)
	}
}

/**
当需要处理特定的类型时, 你可以放置一个执行类型断言的 stage。 这个stage 的开销可以忽略不计。
当需要处理特定的类型时, 你可以放置一个执行类型断言的 stage。 这个stage 的开销可以忽略不计。
var toString = func(done <-chan interface{}, valueStream <- chan interface{})<-chan string{
	stringStream := make(chan string)
	go func(){
		defer close(stringStream)
		for v := range valueStream{
			select {
				case <- done :
					return
				case stringStream <- v.(string)	:
			}
		}
	}()
	return stringStream
}

非类型相关的测试:

pipeline_test.go

//非类型 相关的 测试
func BenchmarkGeneric(b *testing.B){
	done := make(chan interface{})
	defer close(done)

	b.ResetTimer()

	for range toString(done, take(done, repeat(done, "a"), b.N)){

	}

}

//BenchmarkGeneric-8   	 1000000	      2494 ns/op

//类型相关的测试
func BenchmarkTyped(b *testing.B){
	repeat := func(done <-chan interface{}, values ...string)<-chan string {
		valueStream := make(chan string)
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values{
					select{
						case <- done:
							return
						case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan string, num int) <-chan string {
		takeStream := make(chan string)
		go func() {
			defer close(takeStream)
			for i := num; i>0 || i == -1;{
				if i != -1{
					i--
				}
				select{
					case <-done :
						return
					case takeStream <- <-valueStream:
				}

			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)

	b.ResetTimer()
	for range take(done, repeat(done, "a"), b.N){
	}
}
//BenchmarkTyped-8   	 1000000	      1265 ns/op
分析: 类型相关的stage 运行速度 是 非类型相关的  两倍。
一般来说,pipeline 上限制因素将是你的生成器,或者是计算密集型的一个 stage
扇入, 扇出

有时候,pipeline 中的某些个 stage 可能计算上特别耗时, 这可能会拖慢 整个pipeline的速度, 如何解决这个问题?

可以重新调整 stage的顺序,还可以重复使用 某个 stage,  开启多个goroutine 来并行执行 某些个stage.

扇出: 启动多个goroutine 来处理 来自 pipeline 的输入。
扇入: 将多个结果组合到一个channel的过程。

什么情况下: 适合 扇出,扇入 这种模式?
1 他不依赖于之前 stage 计算的值
2 运行需要很长时间

举个例子: 计算素数比较耗时,不使用扇出扇入 代码如下:

var toInt = func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int{
	intStream := make(chan int)
	go func(){
		defer close(intStream)
		for v := range valueStream{
			select {
				case <-done :
					return
				case intStream <- v.(int) :
			}
		}
	}()

	return intStream
}

var primeFinder = func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
	primeStream := make(chan interface{})
	go func() {
		defer close(primeStream)
		for integer := range intStream{
			integer -= 1
			prime := true
			for divisor := integer -1 ; divisor > 1 ; divisor--{
				if integer%divisor == 0{
					prime = false
					break
				}
			}
			if prime{
				select {
					case <-done :
						return
					case primeStream <- integer:
				}
			}

		}

	}()


	return primeStream
}


func main(){
	rand := func() interface{}{ return rand.Intn(50000000)}
	done := make(chan interface{})
	defer close(done)
	start := time.Now()

	randIntStream := toInt(done, repeatFn(done, rand))
	fmt.Println("Primes:")
	for prime := range take(done, primeFinder(done, randIntStream) , 10){
		fmt.Printf("t%dn", prime)
	}

	fmt.Printf("Search took:%v", time.Since(start))
}

//Primes:
//24941317
//36122539
//6410693
//10128161
//25511527
//2107939
//14004383
//7190363
//45931967
//2393161
//Search took:29.760974984s
下面是加入 扇出, 扇入 改造提升后的代码如下:
下面是加入 扇出, 扇入 改造提升后的代码如下:


 */

var fanIn = func(done<-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
	var wg sync.WaitGroup
	multiplexedStream := make(chan interface{})

	multiplex := func(c<-chan interface{}) {
		defer wg.Done()
		for i := range c {
			select {
				case <- done :
					return
				case multiplexedStream <-i:
			}
		}
	}

	// select  from all the channels
	wg.Add(len(channels))
	for _, c := range channels{
		go multiplex(c)
	}

	//Wait for all reads to complete
	go func() {
		wg.Wait()
		close(multiplexedStream)
	}()

	return multiplexedStream
}

func main(){
	done := make(chan interface{})
	defer close(done)
	start := time.Now()

	rand := func() interface{} { return rand.Intn(50000000)}
	randIntStream := toInt(done, repeatFn(done, rand))

	numFinders := runtime.NumCPU()
	fmt.Printf("Spinning up %d prime finders.n", numFinders)
	finders := make([]<-chan interface{}, numFinders)   // chan 组成的数组
	fmt.Println("Primes:")
	for i:=0; i< numFinders; i++{
		finders[i] = primeFinder(done, randIntStream)
	}

	for prime := range take(done, fanIn(done, finders...), 10){
		fmt.Printf("t%dn", prime)
	}

	fmt.Printf("Search took:%v", time.Since(start))

}
//
//Spinning up 8 prime finders.
//Primes:
//6410693
//24941317
//10128161
//36122539
//25511527
//2107939
//14004383
//7190363
//2393161
//45931967
//Search took:9.450233287s
注意: 上面的例子(扇出 扇入) 不保证  读取-输出项目 的顺序!!!
分析 :
扇出 + 扇入 模式 如下图:

 

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/u013862108/article/details/89060516
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-25 01:14:41
  • 阅读 ( 1017 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢