Golang 中 Channel 对阻塞 goroutine 的唤醒顺序分析 - Go语言中文社区

Golang 中 Channel 对阻塞 goroutine 的唤醒顺序分析


一、前言

       我们知道,goroutine 是有大小的,当 发送满 /读取空 时,会阻塞对应的 发送/读取 goroutine 协程。

       那么,当 Channel 可用时,它是按照什么顺序唤醒等待的 goroutine 协程的呢?

       带着这个问题,我们深入 chan 的源码逻辑,去一探究竟。

     剧透结论:先阻塞的先被唤醒

二、chan 源码分析

       分析的 go 版本:1.11

       源码位置:runtime/chan.go

2.1  chan 结构

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

       成员意义如下:

  • qcount: 当前队列中的元素数量
  • dataqsiz: 队列可以容纳的元素数量, 如果为 0 表示这个 channel 无缓冲区
  • buf: 队列的缓冲区指针, 指向一个环形数组
  • elemsize: 元素的大小
  • closed: 是否已关闭
  • elemtype: 元素的类型, 判断是否调用写屏障时使用
  • sendx: 发送元素的序号
  • recvx: 接收元素的序号
  • recvq: 当前等待从 channel 接收数据的 G 的链表
  • sendq: 当前等待发送数据到 channel 的 G 的链表
  • lock: 操作 channel 时使用的线程锁

 2.2  c <- x 执行过程分析

        首先找到,该语句的执行入口:

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

        从该函数的注释,我们也可以知道,这就是向 channel 发送数据时真正执行的函数了。

        跟踪到实际调用的函数  chansend

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "n")
	}

	if raceenabled {
		racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation.
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

        chansend 的定义很长,我们重点关注对接收者的唤醒,即这句:

	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

          可以看到,唤醒的接收者是从 recvq 队列里出队得到的,也就是说是先进先出,先阻塞先被唤醒的

          相应的 chanrecv 函数对于发送者 goroutine  的唤醒机制也是类似的,先阻塞先被唤醒

2.3  实验验证

package main

import (
	"fmt"
	"sync"
	"time"
	"runtime"
)

func main() {

	runtime.GOMAXPROCS(1)

	var ch = make(chan int)

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func(ch <-chan int) {
		defer wg.Done()
		fmt.Println("Blocking 1")
		value := <-ch
		fmt.Println("Goroutine 1 read: ", value)
	}(ch)

	go func(ch <-chan int) {
		defer wg.Done()
		fmt.Println("Blocking 2")
		value := <-ch
		fmt.Println("Goroutine 2 read: ", value)
	}(ch)

	go func(ch <-chan int) {
		defer wg.Done()
		fmt.Println("Blocking 3")
		value := <-ch
		fmt.Println("Goroutine 3 read: ", value)
	}(ch)

	time.Sleep(time.Second * 1)

	ch <- 3
	ch <- 6
	ch <- 9

	close(ch)
	wg.Wait()

}

        上述代码,创建了三个 goroutine 等待读取 ch,虽然谁先调用读取是不确定的,

        但是,谁先调用了读取并被阻塞,谁就能在通道数据可用时先被唤醒

        运行结果如下:

        

          可以看到,阻塞协程的编号依次是 1 2 3,而 1 2 3 读出的内容确实也是有序的:

          最先阻塞的协程最先读取输入内容,即 goroutine 1 读取到输入 3, 2 读取到 6, 3 读取到 9

          而我们向 channel 写入的顺序正是 3 6 9

          这侧面印证了每次唤醒都是有序的!

          注意,该实验中设置了 runtime.GOMAXPROCS(1),因为多 P 环境下,不利于分析唤醒顺序。

                     这是因为可能几乎同时会唤醒多个 goroutine,会发生读取争抢,顺序就不能保证了。

                     如果不使用 runtime.GOMAXPROCS(1) 的方式,那在发送间增加 sleep 也可以达到同样的验证目的

         验证代码的另一种写法(不使用 runtime.GOMAXPROCS(1) ):

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {

	var ch = make(chan int)

	wg := sync.WaitGroup{}
	wg.Add(3)

	go func(ch <-chan int) {
		defer wg.Done()
		fmt.Println("Blocking 1")
		value := <-ch
		fmt.Println("Goroutine 1 read: ", value)
	}(ch)

	go func(ch <-chan int) {
		defer wg.Done()
		fmt.Println("Blocking 2")
		value := <-ch
		fmt.Println("Goroutine 2 read: ", value)
	}(ch)

	go func(ch <-chan int) {
		defer wg.Done()
		fmt.Println("Blocking 3")
		value := <-ch
		fmt.Println("Goroutine 3 read: ", value)
	}(ch)

	time.Sleep(time.Second * 1)

	ch <- 3
	time.Sleep(time.Second * 1)
	ch <- 6
	time.Sleep(time.Second * 1)
	ch <- 9

	close(ch)
	wg.Wait()

}

2.4  特别说明

       当 channel 关闭时,会唤醒所有等待的协程。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/shida_csdn/article/details/88344017
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-09-14 00:09:07
  • 阅读 ( 2524 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢