go 语言实践-goroutine+chan 并不是 CSP 的最佳方式 - Go语言中文社区

go 语言实践-goroutine+chan 并不是 CSP 的最佳方式


go语言的最大两个亮点,一个是协程,一个就是chan了。二者合体的典型应用CSP,基本就是大家认可的并行开发神器,确实简化了并行程序的开发难度,但个人并不是很推荐业务中直接面对这种硬编码。那么,本文的重点就是讨论硬编码面临的问题,以及相关的处理方案。
文中异步队列代码,具体参见:github 异步队列。不排除 github 代码库因为更新,而与文中描述不一致。

一.CSP是什么

CSP模型的全称为Communicating Sequential Processes,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。与函数式编程和actor模型类似,CSP模型也是正在复兴的古董。由于近来Go语言的兴起, CSP模型又流行起来。
紧密耦合的channel
由于go的协程,为我们隐藏了多线程面对多任务调度的复杂性,开发者可以非常方便的写出无锁的异步数据调度业务。比较简单的常用写法是:

package main

import (
    "fmt"
    "time"
)
//生产者
type Produce struct{
    myChannel chan interface{}
}
func (p *Produce)send(any interface{}){
    p.myChannel<-any
}
//消费者
type Consumer struct{
    myChannel chan interface{}
}
func(c *Consumer)receive(){
    go func(){
        for data:=range c.myChannel{
            fmt.Printf("receive: %v n",data)
        }
    }()
}

func main(){
    var mychan=make(chan interface{})
    var consumer=&Consumer{mychan}
    consumer.receive()
    var pro=&Produce{mychan}
    pro.send("Msg")

    time.Sleep(time.Second*10)
}

在实际业务中,我们会看到这个myChannel必须是作为代码中的硬编码存在。可能通过以下几个方式被调用:

1.对象属性初始化

比如上述代码,myChanel是消费者和生产者的共享变量,在其初始化的时候注入。

2.函数参数注入

package main

import (
    "fmt"
    "time"
)

//生产者
type Produce struct {
}

func (p *Produce) send(myChannel chan<- interface{}, any interface{}) {
    myChannel <- any
}

//消费者
type Consumer struct {
}

func (c *Consumer) receive(myChannel <-chan interface{}) {
    go func() {
        for data := range myChannel {
            fmt.Printf("receive: %v n", data)
        }
        fmt.Println("receive stop!")
    }()
}

func main() {
    var mychan = make(chan interface{})
    var consumer = &Consumer{}
    consumer.receive(mychan)
    var pro = &Produce{}
    pro.send(mychan, "Msg")

    close(mychan)
    time.Sleep(time.Second * 10)
}

二 业务场景需要考虑的问题

基于前一节两种基础代码,我们很容易实现【M生产者:1消费者】的数据传递。但实际业务远不是这种教科书形式这么简单。CSP在其语法上定义了:STOP、SKIP、INTERRUPT、CONDITION、TIMEOUT等常用规则。如果在业务中来考虑,就非常麻烦,也容易出错。如果我们封装了goroutinue+chan的异步队列来封装,用回调的方式来驱动消费者的函数句柄,好比使用消息中间件一样,就会简化很多。

1.对于STOP

相对于channel的关闭,go的chan是非常成熟的,可以在封装后,通过一个closeflag int32标签来规避重复关闭的问题。接受函数只需要正常的for range channel即可,没特别的技术含量。

2.对于interrupt

对于立即中断,go1.7之后提供了context包,可以利用其cancelfunccancelContext的特性,当调用cancelfunc后,cancelContext.Done()将会有一个消息发出,结合select就可以从消息接受中退出
以下代码片段展示了封装后,如何响应stop和interrupt:

func (queue *queueCSP) receive(channel chan interface{}, cancelCtx context.Context, invoker ReceiveFunc) {
    //...
    for !stoppedByChannel && !stoppedByContext {
        //Notice:由于 select 的语法特点,即使 cancelCtx.Done的消息已经发出,但只要channel有值,则很可能无法立即退出循环
        select {
        case msg := <-channel:
            if msg != nil {
                invoker(msg)
            } else {
                stoppedByChannel = true
            }
        case <-cancelCtx.Done():
            stoppedByContext = true
        }
    }
    //release resource
    //...
//...

3.TIMEOUT

参考上面的代码片段,如果加入time.Ticker,就可以轻易实现超时的控制,但发现超时后,将消息汇报、停止回调函数,则不在本文的范围。有兴趣的读者可以看看context的相关资料。

4.内存占用

通常情况下,一个channel+goroutinue要占用2k-8k的栈空间,100万个csp就是2-8G。很多业务中,这些都是闲置的。

三、channel带来的额外问题及替换方案

前一节,尽管我们利用QueueCSP封装goroutinue+chan以杜绝业务层重复关闭的隐患,但也存在一些不尽如人意之处:
1. 引入锁确保推送安全,导致推送速度较慢。
2. 引入了context包来处理即使退出信号,但channel存在未处理消息时,依然有很大几率继续执行几次业务操作,这是 select-case 的随机性造成的(还记得 select-case 生成随机数吗)。
3. goroutinue的内存开销大。

那么,go的goroutine+chan 这种模式之前,异步队列通信又是如何处理呢?比较常见的是通过bufferRing来提供一定缓冲数量的队列,而MPSC则提供无数量限制的方式。MPSC的全称是Multi-Produce Single-Consumer,是非常高效的无锁并发的多线程并发解决方案,很多时候用 MPSC 来做控制指令队列通道。go语言利用原子操作,实现非常简单:

type node struct {
    next *node
    val  interface{}
}

type Queue struct {
    head, tail *node
}

func New() *Queue {
    q := &Queue{}
    stub := &node{}
    q.head = stub
    q.tail = stub
    return q
}

// 添加一条新的消息到队列的末尾
func (q *Queue) Push(x interface{}) {
    n := &node{val: x}
    prev := (*node)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(n)))
    prev.next = n
}

// 从队列中提取一条消息交付给消费者
func (q *Queue) Pop() interface{} {
    tail := q.tail
    next := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) // acquire
    if next != nil {
        q.tail = next
        v := next.val
        next.val = nil
        return v
    }
    return nil
}

// 清空队列
func (q *Queue) Empty() bool {
    tail := q.tail
    next := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next))))
    return next == nil
}

如果我们的 Queue 封装的是mpsc,那么就是在 for 循环中先判断是否关闭,然后操作Pop,如果有数据,则调用接受者。这样,系统的退出响应就会非常简单:

func (queue *queueMPSC) run() {
    var msg interface{}

    defer func() {
        defer func() {
            if err := recover(); err != nil {
                log.Printf("[queue_mpsc] recovering reason is %+v. More detail:", err)
                log.Println(string(debug.Stack()))
            }
        }()
    }()
    for  {
       //是否立即关闭
        if atomic.LoadInt32(&queue.closed) == _CLOSED {
            return
        }
        //当完成消息处理再关闭时,系统发出poisonPill
        if msg = queue.userQueue.Pop(); msg != nil && msg != poisonPill {
            queue.invoker(msg)
        } else {
            return
        }
    }
}

尽管这段代码没有考虑处理大量业务时的cpu 调度(runtime.Gosched()),但基本上是业务可用的代码。我们可以看到:
1. 不需要select协程,整体上开销更小
2. 控制更加灵活,处理业务消息的同时,还能非常方便的响应控制消息(停止、暂停等)。

四、总结

客观的说,go语言的goroutine+chan来构建 CSP 可以非常直观的理解业务之间的数据传送关系。但并不是唯一解,尤其是在中型应用中,考虑利用 atomic+基础数据结构的工具库,更是成熟的工程化选择,其原因是:
- chan更适合单生产者+单/多消费者。一旦是多个生产者,就存在重复关闭的隐患。
- 通过函数回调,异步队列回调,是更安全的基础组件的编码方式。
- 采用 CSP实现的队列,在进行系统控制时,比如 stop,及时性不如 mpsc或传统的bufferRing/List等数据结构。
- 取消协程,减少额外的内存开销。
- 消费者如果带状态,且需要进行独立于业务消息的系统信息响应时,mpsc是更为普遍的控制指令的结构。
简而言之,goroutine+channel 作为一两个人的小规模、短期代码问题不大。但作为团队的长期业务进行持续开发,还是建议原始的基础库,尽管没有高大上的名称,无法技术 show,但更能解决问题!

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_26981997/article/details/78773487
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-13 19:55:17
  • 阅读 ( 2008 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢