Go专栏 (五) ——— 并发和并发模式 - Go语言中文社区

Go专栏 (五) ——— 并发和并发模式


Go-In-Action

@author 鲁伟林
记录《Go语言实战》中各章节学习过程,写下一些自己的思考和总结。希望维护管理此仓库,记录学习过程中部分心得,以与其他同行参考。

本博客中涉及的完整代码:
GitHub地址: https://github.com/thinkingfioa/go-learning
本人博客地址: https://blog.csdn.net/thinking_fioa
文中如若有任何错误,欢迎指出。个人邮箱: thinking_fioa@163.com

第6章 并发

Go语言里的并发指的是能让某个函数独立于其他函数运行的能力。当一个函数创建为goroutine时,Go会将其视为独立的工作单元。

Go语言的并发同步模型来自一个叫作通信顺序进程(CSP)。CSP是一种消息传递模型,在goroutine之间传递数据来传递消息,而不是对数据进行加锁来实现同步。

6.1 并发和并行

操作系统会在物理处理器上调度线程来执行,而Go语言的运行会在逻辑处理器上调度goroutine来运行。在1.5版本后,Go语言的运行默认会在每个可用的物理处理器分配一个逻辑处理器。

如果创建一个goroutine并准备运行,这个goroutine会被放到调度器的全局运行队列中。之后,调度器就将队列中的goroutine分配一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中,如下图6-2

 

  1. 如果正在运行的goroutine需要执行一个阻塞的系统调用,当前逻辑处理器绑定的线程和goroutine会从逻辑处理器分离,调度器会创建一个新的线程,并将其绑定到逻辑处理器上,如上图线程M3。一旦被阻塞的系统调用执行完并返回,对应的goroutine会放回本地运行队列中
  2. 如果一个goroutine需要做一个网络I/O调用,流程上有些不同。该goroutine会和逻辑处理器分离,并移到集成了网络轮询器的运行。

6.2 goroutine

下面是一段代码,有两个goroutine匿名函数,同时逻辑处理器的数量设置为2(runtime.GOMAXPROCS(2))

func main() {
    runtime.GOMAXPROCS(2)

    var wg sync.WaitGroup
    wg.Add(2)

    fmt.Println("start program")
    go func() {
        defer wg.Done()
        for count := 0; count < 3; count++ {
            for char := 'a'; char < 'a'+26; char++ {
                fmt.Printf("%c ", char)
            }
        }
    }()

    go func() {
        defer wg.Done()

        for count := 0; count < 3; count++ {
            for char := 'A'; char < 'A'+26; char++ {
                fmt.Printf("%c ", char)
            }
        }
    }()

    wg.Wait()
    fmt.Println("Finisg")
}
  1. runtime.GOMAXPROCS(2) ----- 设置逻辑处理器数目为2
  2. 大写字母和小写字母间隔输出,且每次运行结果都不相同

6.3 竞争状态

竞争状态:两个或多个gorotine在没有相互同步的情况下,访问某个共享的资源,并试图同时读和写这个资源。int类型是非线程安全,在多gorotine访问时需要保护操作。下面代码就存在竞争状态

var (
    count int
    wg    sync.WaitGroup
)

func main() {
    wg.Add(2)
    go addCount()
    go addCount()

    wg.Wait()
    fmt.Printf("count %d", count)

}

func addCount() {
    defer wg.Done()
    for i := 0; i < 10000; i++ {
        if i == 500 {
            // 当前的gorotine从线程退出,并放回本地运行队列
            runtime.Gosched()
        }
        count++
    }
}
  1. 代码输出与预期不符
  2. runtime.Gosched() ----- 将从当前线程退出,给其他gorotine运行机会
  3. count的类型是int,非线程安全
  4. go build -race ----- 检查代码中的竞争状态(上面例子未检查出来,我也很奇怪?)

6.4 锁住共享资源

Go语言提供了传统的同步gorotine的机制,就是对共享资源加锁,类似于其他语言。atomic和sync包里的函数提供了很好的解决方案

6.4.1 原子函数

使用 atomic包中的AddInt64函数

var (
    count int64
    wg    sync.WaitGroup
)

func main() {
    wg.Add(2)

    go addCount(10000)
    go addCount(10000)

    wg.Wait()

    fmt.Printf("count %d", count)
}

func addCount(loopNum int) {
    defer wg.Done()

    for i := 0; i < loopNum; i++ {
        //count++
        atomic.AddInt64(&count, 1)
    }
}

注: 使用原子方法:atomic.AddInt64(&count, 1),来保证线程安全

使用原子函数LoadInt64和StoreInt64

原子函数LoadInt64和StoreInt64提供了一种安全地读和写一个整型值的方式

var (
    shutdown int64
    wg       sync.WaitGroup
)

func main() {
    wg.Add(2)

    go doWork("A")
    go doWork("B")

    time.Sleep(10 * time.Second)
    atomic.StoreInt64(&shutdown, 1)
    wg.Wait()
    fmt.Println("Finish")
}

func doWork(name string) {
    defer wg.Done()

    for {
        fmt.Printf("output %sn", name)
        time.Sleep(250 * time.Millisecond)

        if atomic.LoadInt64(&shutdown) == 1 {
            fmt.Printf("%s shutdown.n", name)
            break
        }
    }
}
  1. 原子函数LoadInt64和StoreInt64保证了变量shutdown同步和可见性
  2. time.Sleep(1 * time.Second) ----- gorotine暂停1秒

6.4.2 互斥锁(mutex)

另一种同步访问共享资源的方式是使用互斥锁(mutex)。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个goroutine可以执行这个临界区代码。使用mutex.Lock()和mutex.Unlock()声明一段代码是临界区

var (
    count int64
    wg    sync.WaitGroup
    mutex sync.Mutex
)

func main() {

    wg.Add(2)

    go addCount(10000)
    go addCount(10000)

    wg.Wait()
    fmt.Printf("count %dn", count)
}

func addCount(loopNum int) {
    defer wg.Done()

    for i := 0; i < loopNum; i++ {
        mutex.Lock()
        count++
        mutex.Unlock()
    }
}

6.5 通道

Go语言不仅提供了原子函数和互斥锁来保证对共享资源的安全访问以及消除竞争状态,还可以使用通道,通过发送和接收需要共享的资源

Go语言使用内置函数make来创建一个通道。通道有两种类型,一个是无缓冲的通道,一个是有缓冲的通道

  1. unbuffered := make(chan int) ----- 无缓冲通道,第一个参数是关键字chan,之后跟着允许通道交换的数据类型
  2. buffered := make(chan string, 10) ----- 有缓冲通道,第一个参数是关键字chan,之后跟着允许通道交换的数据类型,第二参数是缓冲大小
  3. buffered <- "thinking" ----- 通过通道发送一个字符串
  4. value := <- buffered ----- 从通道接收一个字符串

6.5.1 无缓冲的通道

无缓冲的通道是指在接收前没有能力保存任何值的通道。通道要求发送gorotine和接收gorotine同时准备好,才能完成发送和接收操作,否则将一直阻塞等待。

无缓冲的通道操作是一个阻塞操作,数据没有被成功接收,发送gorotine将一直阻塞。

var wg sync.WaitGroup

func init() {
    rand.Seed(time.Now().UnixNano())
}

func main() {
    court := make(chan int)

    wg.Add(2)

    go player("thinking", court)
    go player("ppp", court)

    court <- 1

    wg.Wait()
}

func player(name string, court chan int) {

    defer wg.Done()
    for {

        ball, ok := <-court
        if !ok {
            fmt.Printf("Player %s Wonn", name)
            return
        }

        n := rand.Intn(100)
        if n%13 == 0 {
            fmt.Printf("Player %s Missedn", name)
            close(court)
            return
        }

        fmt.Printf("Player %s Hit %dn", name, ball)
        ball++

        court <- ball
    }
}

6.5.2 有缓冲的通道

有缓冲的通道是一种在被接收前能存储一个或多个值的通道。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。有点类似于Java语言中的LinkedBlockingQueue

const (
    numberWorker = 4
    taskNum      = 10
)

var wg sync.WaitGroup

func init() {
    rand.Seed(time.Now().Unix())
}

func main() {
    wg.Add(numberWorker)

    tasks := make(chan string, taskNum)

    // 启动${numberWorker}个goroutine
    for gr := 1; gr <= numberWorker; gr++ {
        go worker(tasks, gr)
    }
    // 提交任务
    for post := 1; post <= taskNum; post++ {
        tasks <- fmt.Sprintf("Task : %d", post)
    }
    // 关闭通道
    close(tasks)
    wg.Wait()
}

func worker(tasks chan string, work int) {
    defer wg.Done()

    for {
        // 获取任务
        task, ok := <-tasks
        if !ok {
            fmt.Printf("Worker: %d : Shutting Downn", work)
            return
        }

        fmt.Printf("Worker: %d : Started %sn", work, task)

        sleep := rand.Int63n(100)
        time.Sleep(time.Duration(sleep) * time.Millisecond)

        fmt.Printf("Worker: %d : Completed %sn", work, task)
    }
}
  1. close(tasks) ----- 关闭通道。当通道关闭时,goroutine依旧可以从通道接收数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值
  2. const (......) ----- 常量的定义

第7章 并发模式

主要介绍3个可以在实际工程中使用的包: runner、pool和work

7.1 runner

runner包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以使用runner包来终止程序

runner支持以下终止点:

  1. 程序可以在分配的时间内完成工作,正常终止
  2. 程序没有及时完成工作,终止程序执行
  3. 接收到操作系统发送的中断事件,程序立即清理状态并停止工作

run.go 代码如下

type Runner struct {
    // 接收操作系统发送的中断信号
    interrupt chan os.Signal

    complete chan error

    // 接收超时事件
    timeout <-chan time.Time

    // 执行的函数,必须是一个接收整数且什么都不返回的函数
    tasks []func(int)
}

var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")

func New(d time.Duration) *Runner {
    return &Runner{
        // interrupt通道的缓冲区容量初始化为1,确保语言运行是发送这个事件的时候不会阻塞
        interrupt: make(chan os.Signal, 1),
        // 当任务完成或退出后,返回一个error或者nil值,将等待main函数区接收这个值
        complete: make(chan error),
        // 在指定duration时间后,向通道发送一个time.Time的值
        timeout: time.After(d),
    }
}

func (r *Runner) add(tasks ...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}

func (r *Runner) Start() error {
    //
    signal.Notify(r.interrupt, os.Interrupt)

    go func() {
        r.complete <- r.run()
    }()

    select {
    case err := <-r.complete:
        return err
    case <-r.timeout:
        return ErrTimeout
    }
}

func (r *Runner) run() error {
    for id, task := range r.tasks {
        if r.gotInterrupt() {
            return ErrInterrupt
        }

        task(id)
    }
    return nil
}

func (r *Runner) gotInterrupt() bool {
    select {
    case <-r.interrupt:
        // 当发生中断事件信号时,停止接收后续信号
        signal.Stop(r.interrupt)
        return true
    default:
        return false
    }
}

main.go

const timeout = 3 * time.Second

func main() {
    log.Printf("Starting work.")

    r := runner.New(timeout)

    r.Add(createTask(), createTask(), createTask())

    if err := r.Start(); err != nil {
        switch err {
        case runner.ErrTimeout:
            log.Println("Terminating due to timeout.")
            os.Exit(1)
        case runner.ErrInterrupt:
            log.Println("Terminating due to interrupt.")
            os.Exit(2)
        }
    }
}

func createTask() func(int) {
    return func(id int) {
        log.Printf("Processor - Task #%d.", id)
        time.Sleep(time.Duration(id) * time.Second)
    }
}

7.1.1 代码解释

  1. New(...)函数 ----- Go语言中工厂函数通常称为New函数,函数接收一个time.Duration类型的值,并返回Runner类型的指针。
  2. compete被初始化为无缓冲的通道。设计成无缓冲的通道是因为向这个通道发送一个error类型的值或者nil值,之后就会等待main函数接收这个值。
  3. select语句的经典用法。goInterrupt(...)函数展示了select语句的经典用法。select语句在没有任何要接收的数据时会阻塞,但是如果有default分支就不会阻塞了。如:Start函数中的select语句就会阻塞,直到接收到值

7.2 pool

pool包展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。

当资源池资源不够时,创建新的资源分配,如下factory字段是一个函数类型,可以用该函数创建新的资源

pool.go代码如下:

type Pool struct {
    m sync.Mutex

    //有缓冲的通道资源池
    resources chan io.Closer
    factory   func() (io.Closer, error)
    closed    bool
}

var ErrPoolClosed = errors.New("Pool has been closed.")

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size value is wrong")
    }

    return &Pool{
        resources: make(chan io.Closer, size),
        factory:   fn,
    }, nil
}

func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources:
        log.Printf("Acquire:", "New Resource")
        if !ok {
            return nil, ErrPoolClosed
        }
        return r, nil
    default:
        return p.factory()
    }
}

func (p *Pool) Release(r io.Closer) {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        r.Close()
        return
    }

    select {
    case p.resources <- r:
        log.Printf("Release:", "In Queue")
    default:
        log.Println("Release:", "Closing")
        r.Close()
    }
}

func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        return
    }

    p.closed = true

    // 在清空通道里的资源之前,将通道关闭,如果不这样做,会发生死锁
    close(p.resources)

    for r := range p.resources {
        r.Close()
    }

}

main.go代码

const (
    maxGoroutines   = 25
    pooledResources = 2
)

type dbConnection struct {
    ID int32
}

func (dbConn *dbConnection) Close() error {
    log.Println("Close: Connection", dbConn.ID)
    return nil
}

var idCounter int32

func createConnection() (io.Closer, error) {
    id := atomic.AddInt32(&idCounter, 1)
    log.Println("Create: New Connection", id)

    return &dbConnection{id}, nil
}

func main() {
    var wg sync.WaitGroup
    wg.Add(maxGoroutines)

    p, err := pool.New(createConnection, pooledResources)
    if err != nil {
        log.Println(err)
    }

    for query := 0; query < maxGoroutines; query++ {
        go func(q int) {
            performQueries(q, p)
            wg.Done()
        }(query)
    }

    wg.Wait()

    log.Println("Shutdown Program.")
    p.Close()
}

func performQueries(query int, p *pool.Pool) {
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }

    defer p.Release(conn)

    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("Query: QID[%d] CID[%d]n", query, conn.(*dbConnection).ID)
}

7.2.1 代码解释

  1. Release(...)方法和Close(...)方法必须要代码同步。防止资源已经关闭,仍发送数据。
  2. Acquire(...)方法 ----- 还有可用资源时会从资源池里返回一个资源,否则调用factory字段的函数类型创建一个新的资源。

7.3 work

work包的目的是展示如何使用无缓冲的通道来创建一个goroutine池,多个goroutine执行并控制一组工作,让其并发执行。

使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组goroutine配合执行。

work.go代码如下:

type Worker interface {
    Task()
}

type Pool struct {
    work chan Worker
    wg   sync.WaitGroup
}

func New(maxGoroutine int) *Pool {
    p := Pool{
        work: make(chan Worker),
    }

    p.wg.Add(maxGoroutine)

    for i := 0; i < maxGoroutine; i++ {
        go func() {
            for w := range p.work {
                w.Task()
            }
            // goroutine结束
            p.wg.Done()
        }()
    }

    return &p
}

func (p *Pool) Run(w Worker) {
    p.work <- w
}

func (p *Pool) Shutdown() {
    close(p.work)
    // 等待所有的goroutine终止
    p.wg.Wait()
}

7.3.1 代码解释

  1. Work.go中New(...)函数 ----- 使用固定数量的goroutine来创建一个工作池
  2. wg sync.WaitGroup参数用来控制Shutdown()函数,保证所有的goroutine截止,才函数退出
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/thinking_fioa/article/details/89289980
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2021-05-17 22:28:44
  • 阅读 ( 1323 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢