Go并发模式之 防止goroutine泄漏 - Go语言中文社区

Go并发模式之 防止goroutine泄漏


goroutine 有以下几种方式被终止:
1。 当他完成了它的工作。
2。 因为不可恢复的错误, 它不能继续工作
3。 当他被告知 需要终止工作。

我们可以简单的使用前两种方法, 因为这两种方法隐含在你的算法中, 但"取消工作" 又是怎样工作的呢?

例如:这样情况: 子goroutine 是否该继续执行可能是以许多其他goroutine 状态的认知为基础的。

通常是 main goroutine 具有这种语境知识能够告诉其子goroutine 终止。

下面看一个 泄漏的例子/ (goroutine 没有进行死亡处理的例子)
func main() {
	doWork := func(strings <-chan string) <-chan interface {} {
		completed := make(chan interface{})
		go func() {
			defer fmt.Println("doWork exited.")
			defer close(completed)
			for s := range strings{
				//做一些有趣的事
				fmt.Println(s)
			}

		}()
		return completed
	}

	doWork(nil)
	//也许这里有其他操作需要执行
	fmt.Println("Done.")
}
*/
/**
分析: doWork 的goroutine 在整个生命周期中都保留在内存中;显然不合理,传入的是nil 应该是更早的结束这个goroutine 才对。
怎么解决呢,使用 done channel
分析: doWork 的goroutine 在整个生命周期中都保留在内存中;显然不合理,传入的是nil 应该是更早的结束这个goroutine 才对。
怎么解决呢,使用 done channel
func main(){
	doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} {
		terminated := make(chan interface{})
		go func() {
			defer fmt.Println("doWork exited.")
			defer close(terminated)

			for{
				select {
					case s := <-strings :
						//做一些有趣的事情
						fmt.Println(s)
					case <-done :
						return
				}
			}
		}()

		return terminated
	}

	done := make(chan interface{})
	terminated := doWork(done, nil)

	go func() {
		//一秒后取消 dowork 里边的 goroutine
		time.Sleep(1 * time.Second)
		fmt.Println("Cancelling doWork goroutine...")
		close(done)
	}()

	<-terminated
	fmt.Println("Done.")
}
//Cancelling doWork goroutine...
//doWork exited.
//Done.

*/
/**
分析: 尽管我们给我们的字符串 channel 中传递了nil, 我们的goroutine仍然成功退出。 通过 1s 后 关闭doWork 中 goroutine 成功消除了我们的
goroutine 泄漏
分析: 尽管我们给我们的字符串 channel 中传递了nil, 我们的goroutine仍然成功退出。 通过 1s 后 关闭doWork 中 goroutine 成功消除了我们的
goroutine 泄漏
这个例子 是: doWork 的 goroutine 从某个channel(作为参数传进来) 中 for 读取

下面这个例子: doWork  中的goroutine 向某个channel() 中写入(for方式)数据
func main(){
	newRandStream := func() <-chan int{
		randStream := make(chan int)
		go func() {
			defer fmt.Println("newRandStream closure exited.")
			defer close(randStream)
			for{
				randStream <- rand.Int()
			}
		}()
		return randStream
	}

	randStream := newRandStream()
	fmt.Println("3 random ints:")
	for i:=1; i<=3; i++{
		fmt.Printf("%d: %dn", i, <-randStream)
	}

}
//
//3 random ints:
//1: 5577006791947779410
//2: 8674665223082153551
//3: 6129484611666145821
*/

/**
问题: 在三次迭代后,我们的goroutine试图将下一个随机数发送到不需要的被读取的 channel. 我们没有告诉生产者它可以停止了。

解决方案: 像接收案例一样,为生产者提供一个通知他 退出 的 done channel:
问题: 在三次迭代后,我们的goroutine试图将下一个随机数发送到不需要的被读取的 channel. 我们没有告诉生产者它可以停止了。

解决方案: 像接收案例一样,为生产者提供一个通知他 退出 的 done channel:
func main(){
	newRandStream := func(done <-chan interface{}) <-chan int {
		randStream := make(chan int)
		go func() {
			defer fmt.Println("newRandStream closure exited. ")
			defer close(randStream)

			for{
				select {
					case randStream <-rand.Int() :
						case <-done :
							return
				}
			}
		}()

		return randStream
	}

	done := make(chan interface{})
	randStream := newRandStream(done)
	fmt.Println(" 3 random ints:")
	for i:=1; i<=3; i++ {
		fmt.Printf("%d: %dn", i, <-randStream)
	}
	close(done)

	//正在模拟进行的工作
	time.Sleep(1 * time.Second)
}
*/
/**
为了goroutine 不泄漏, 约定: 如果goroutine 负责创建 goroutine,他也负责 确保可以停止 goroutine
 */
为了goroutine 不泄漏, 约定: 如果goroutine 负责创建 goroutine,他也负责 确保可以停止 goroutine
or-channel   /  复合 done channel

希望将 多个 channel 合并为 一个 done channel ;该channel 在 任何一个组件channel 关闭时 ,都将关闭, 注意:组件中其他channel 可没有关闭!!!

这种模式通过递归 和 goroutine 创建一个复合 done chanenl .
func main(){
	var or func(channels ...<-chan interface{}) <-chan interface{}

	or = func(channels ...<-chan interface{}) <-chan interface{}{
		//设置终止标准
		switch len(channels){
		case 0 :  //防止 空 chnnel
			return nil
		case 1 :
			return channels[0]
		}

		orDone := make(chan interface{})

		//创建goroutine 以便可以不受阻塞地等待我们 channel上的消息
		go func(){
			defer close(orDone)
			switch len(channels){
			case 2 :
				select{
					case <-channels[0] :
					case <-channels[1]:
				}
			default :
				select{
					case <- channels[0] :
					case <- channels[1] :
					case <- channels[2]	:
					case <- or(append(channels[3:], orDone)...) :   //这样递归下去 形成了 树
					//为了使在建立这个树的goroutine 退出时,再树下的goroutine 也可以跟着退出,需要将这个orDone channel 也传递到了调用中。
					//画出这个树 ?实际上不像树,只有一个儿子,孙子,这样单传特点。
				}
			}

		}()

		return orDone
	}


	//下面是使用or channel一个例子
	sig := func(after time.Duration) <-chan interface{} {
		c := make(chan interface{})
		go func(){
			defer fmt.Printf("done ! after %v",after)
			defer close(c)
			time.Sleep(after)
		}()
		return c
	}
	start := time.Now()
	<-or(
		sig(2*time.Hour),
		sig(2*time.Minute),
		sig(1*time.Second),
		sig(5*time.Second),
		sig(10*time.Second),
		sig(1*time.Hour),
		sig(1*time.Minute),
		)
	fmt.Printf("done after %v", time.Since(start))
	time.Sleep(5*time.Minute);
}

or 函数调用图形分析:

分析: 在组成or channel 的 组件中有多个channel; 这些channel 需要不同时间才能关闭(1s 10s 1m 等等);但是现象是 1s后关闭的这个channel
(sig(1*time.Second)) 导致 or channel 关闭。

这种模式:在系统中的模块交汇处非常有用。

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/u013862108/article/details/88629715
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-06-13 17:37:29
  • 阅读 ( 926 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢