【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P) - Go语言中文社区

【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)


本人的源码是基于go 1.9.7 版本的哦!

紧接着之前写的 【我的区块链之路】- golang源码分析之select的底层实现 和 【我的区块链之路】- golang源码分析之channel的底层实现 我们这一次需要对go的调度器做一番剖析。

go的调度器只要实现在 runtime 包中,路径为: ./src/runtime/proc.go 文件中。

我们都知道go的强大是因为可以起很多 goroutine 也即是我们所说的协程。那么协程和线程有什么联系呢?协程又是如何调度的呢?

在逼逼这些东西之前,我们先了解下,go语言其实是在操作系统提供的内核线程之上搭建了一个特有得 【两级线程】模型。下面再说两级线程模型前,有三个必知的核心元素。(G、M、P)

G:Goroutine的缩写,一个G代表了对一段需要被执行的Go语言代码的封装

M:Machine的缩写,一个M代表了一个内核线程

P:Processor的缩写,一个P代表了M所需的上下文环境

简单的来说,一个G的执行需要M和P的支持。一个M在与一个P关联之后形成了一个有效的G运行环境【内核线程 + 上下文环境】。每个P都会包含一个可运行的G的队列 (runq )。

好了下面我们来具体的看看 G、M、P

M (machine):

M是machine的头文字, 在当前版本的golang中等同于系统线程.
M可以运行两种代码:

  • go代码, 即goroutine, M运行go代码需要一个P
  • 原生代码, 例如阻塞的syscall, M运行原生代码不需要P

M会从运行队列中取出G, 然后运行G, 如果G运行完毕或者进入休眠状态, 则从运行队列中取出下一个G运行, 周而复始。
有时候G需要调用一些无法避免阻塞的原生代码, 这时M会释放持有的P并进入阻塞状态, 其他M会取得这个P并继续运行队列中的G.
go需要保证有足够的M可以运行G, 不让CPU闲着, 也需要保证M的数量不能过多通常创建一个M的原因是由于没有足够的M来关联P并运行其中可运行的G。而且运行时系统执行系统监控的时候,或者GC的时候也会创建M

M的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

// M 结构体
type m struct {
    /*
        1.  所有调用栈的Goroutine,这是一个比较特殊的Goroutine。
        2.  普通的Goroutine栈是在Heap分配的可增长的stack,而g0的stack是M对应的线程栈。
        3.  所有调度相关代码,会先切换到该Goroutine的栈再执行。
    */
	g0      *g     // goroutine with scheduling stack
	morebuf gobuf  // gobuf arg to morestack
	divmod  uint32 // div/mod denominator for arm - known to liblink

	// Fields not known to debuggers.
	procid        uint64       // for debuggers, but offset not hard-coded
	gsignal       *g           // signal-handling g
	goSigStack    gsignalStack // Go-allocated signal handling stack
	sigmask       sigset       // storage for saved signal mask
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	mstartfn      func()       // 

	curg          *g       //   M 正在运行的结构体G
	caughtsig     guintptr // goroutine running during fatal signal
	p             puintptr // attached p for executing go code (nil if not executing go code)
	nextp         puintptr
	id            int32
	mallocing     int32
	throwing      int32
	preemptoff    string // if != "", keep curg running on this m
	locks         int32
	softfloat     int32
	dying         int32
	profilehz     int32
	helpgc        int32
	spinning      bool // m is out of work and is actively looking for work
	blocked       bool // m is blocked on a note
	inwb          bool // m is executing a write barrier
	newSigstack   bool // minit on C thread called sigaltstack
	printlock     int8
	incgo         bool // m is executing a cgo call
	fastrand      uint32
	ncgocall      uint64      // number of cgo calls in total
	ncgo          int32       // number of cgo calls currently in progress
	cgoCallersUse uint32      // if non-zero, cgoCallers in use temporarily
	cgoCallers    *cgoCallers // cgo traceback if crashing in cgo call
	park          note
	alllink       *m // on allm
	schedlink     muintptr
	mcache        *mcache
	lockedg       *g          // 表示与当前M锁定那个g
	createstack   [32]uintptr // stack that created this thread.
	freglo        [16]uint32  // d[i] lsb and f[i]
	freghi        [16]uint32  // d[i] msb and f[i+16]
	fflag         uint32      // floating point compare flags
	locked        uint32      // tracking for lockosthread
	nextwaitm     uintptr     // next m waiting for lock
	needextram    bool
	traceback     uint8
	waitunlockf   unsafe.Pointer // todo go func(*g, unsafe.pointer) bool
	waitlock      unsafe.Pointer
	waittraceev   byte
	waittraceskip int
	startingtrace bool
	syscalltick   uint32
	thread        uintptr // thread handle

	// these are here because they are too large to be on the stack
	// of low-level NOSPLIT functions.
	libcall   libcall
	libcallpc uintptr // for cpu profiler
	libcallsp uintptr
	libcallg  guintptr
	syscall   libcall // stores syscall parameters on windows

	mOS
}

M的字段众多,其中最重要的为下面四个:

g0: Go运行时系统在启动之初创建的,用于执行一些运行时任务。

mstartfn:表示M的起始函数。其实就是我们 go 语句携带的那个函数啦。

curg:存放当前正在运行的G的指针。

p:指向当前与M关联的那个P。

nextp:用于暂存于当前M有潜在关联的P。 (预联)当M重新启动时,即用预联的这个P做关联啦

spinning:表示当前M是否正在寻找G。在寻找过程中M处于自旋状态

lockedg:表示与当前M锁定的那个G。运行时系统会把 一个M 和一个G锁定,一旦锁定就只能双方相互作用,不接受第三者

M并没有像G和P一样的状态标记, 但可以认为一个M有以下的状态:

  • 自旋中(spinning): M正在从运行队列获取G, 这时候M会拥有一个P
  • 执行go代码中: M正在执行go代码, 这时候M会拥有一个P
  • 执行原生代码中: M正在执行原生代码或者阻塞的syscall, 这时M并不拥有P
  • 休眠中: M发现无待运行的G时会进入休眠, 并添加到空闲M链表中, 这时M并不拥有P

自旋中(spinning)这个状态非常重要, 是否需要唤醒或者创建新的M取决于当前自旋中的M的数量。

M在被创建之初会被加入到全局的M列表 【runtime.allm】 。接着,M的起始函数(mstartfn)和准备关联的P(p)都会被设置。最后,运行时系统会为M专门创建一个新的内核线程并与之关联。这时候这个新的M就为执行G做好了准备。其中起始函数(mstartfn)仅当运行时系统要用此M执行系统监控或者垃圾回收等任务的时候才会被设置。全局M列表的作用是运行时系统在需要的时候会通过它获取到所有的M的信息,同时防止M被gc

在新的M被创建后回西安做一番初始化工作。其中包括了对自身所持的栈空间以及信号做处理的初始化。在上述初始化完成后 mstartfn 函数就会被执行 (如果存在的话)。【注意】:如果mstartfn 代表的是系统监控任务的话,那么该M会一直在执行mstartfn 而不会有后续的流程。否则 mstartfn 执行完后,当前M将会与那个准备与之关联的P完成关联。至此,一个并发执行环境才真正完成。之后就是M开始寻找可运行的G并运行之。

运行时系统管辖的M会在GC任务执行的时候被停止,这时候系统会对M的属性做某些必要的重置并把M放置入调度器的空闲M列表。【很重要】因为在需要一个未被使用的M时,运行时系统会先去这个空闲列表获取M。(只有都没有的时候才会创建M)

M本身是无状态的。M是否有空闲仅以它是否存在于调度器的空闲M列表 runtime.sched.midle  中为依据 (空闲列表不是那个全局列表哦)。

单个Go程序所使用的M的最大数量是可以被设置的。在我们使用命令运行Go程序时候,有一个引导程序先会被启动的。在这个歌引导程序中会为Go程序的运行简历必要的环境。引导程序对M的数量进行初始化设置,默认是 最大值 1W 【即是说,一个Go程序最多可以使用1W个M,即:理想状态下,可以同时有1W个内核线程被同时运行】。使用 runtime/debug.SetMaxThreads() 函数设置。

P (process):

P是process的头文字, 代表M运行G所需要的资源
一些讲解协程的文章把P理解为cpu核心, 其实这是错误的.
虽然P的数量默认等于cpu核心数, 但可以通过环境变量GOMAXPROC修改, 在实际运行时P跟cpu核心并无任何关联。

P也可以理解为控制go代码的并行度的机制,
如果P的数量等于1, 代表当前最多只能有一个线程(M)执行go代码,
如果P的数量等于2, 代表当前最多只能有两个线程(M)执行go代码.
执行原生代码的线程数量不受P控制

因为同一时间只有一个线程(M)可以拥有PP中的数据都是锁自由(lock free)的, 读写这些数据的效率会非常的高

P是使G能够在M中运行的关键。Go运行时系统适当地让P与不同的M建立或者断开联系,以使得P中的那些可运行的G能够在需要的时候及时获得运行时机。

P的结构体定义:(在 ./src/runtime/runtime2.go 文件中)

type p struct {
	lock mutex

	id          int32
	status      uint32 // one of pidle/prunning/...
	link        puintptr
	schedtick   uint32     // incremented on every scheduler call
	syscalltick uint32     // incremented on every system call
	sysmontick  sysmontick // last tick observed by sysmon
	m           muintptr   // back-link to associated m (nil if idle)
	mcache      *mcache
	racectx     uintptr

	deferpool    [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
	deferpoolbuf [5][32]*_defer

	// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
	goidcache    uint64
	goidcacheend uint64

	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// runnext, if non-nil, is a runnable G that was ready'd by
	// the current G and should be run next instead of what's in
	// runq if there's time remaining in the running G's time
	// slice. It will inherit the time left in the current time
	// slice. If a set of goroutines is locked in a
	// communicate-and-wait pattern, this schedules that set as a
	// unit and eliminates the (potentially large) scheduling
	// latency that otherwise arises from adding the ready'd
	// goroutines to the end of the run queue.
	runnext guintptr

	// Available G's (status == Gdead)
	gfree    *g
	gfreecnt int32

	sudogcache []*sudog
	sudogbuf   [128]*sudog

	tracebuf traceBufPtr

	// traceSweep indicates the sweep events should be traced.
	// This is used to defer the sweep start event until a span
	// has actually been swept.
	traceSweep bool
	// traceSwept and traceReclaimed track the number of bytes
	// swept and reclaimed by sweeping in the current sweep loop.
	traceSwept, traceReclaimed uintptr

	palloc persistentAlloc // per-P to avoid mutex

	// Per-P GC state
	gcAssistTime     int64 // Nanoseconds in assistAlloc
	gcBgMarkWorker   guintptr
	gcMarkWorkerMode gcMarkWorkerMode

	// gcw is this P's GC work buffer cache. The work buffer is
	// filled by write barriers, drained by mutator assists, and
	// disposed on certain GC state transitions.
	gcw gcWork

	runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

	pad [sys.CacheLineSize]byte
}

通过runtime.GOMAXPROCS函数我们可以改变单个Go程序可以间拥有的P的最大数量。

P的最大数量相当于是对可以被并发执行的用户级的G的数量作出限制。

每一个P都必须关联一个M才能使其中的G得以运行

【注意】:运行时系统会将M与关联的P分离开来。但是如果该P的可运行队列中还有未运行的G,那么运行时系统就会找到一个空的M (在调度器的空闲队列中的M) 或者创建一个空的M,并与该P关联起来(为了运行G而做准备)。

runtime.GOMAXPROCS函数设置的只会影响P的数量,但是对M (内核线程)的数量不会影响,所以runtime.GOMAXPROCS 并不是控制线程数,只能说是影响上下文环境P的数目

在Go程序开始运行时,会先由引导程序对M做了数量上的限制,及对P做了限制,P的数量默认为1。所以我们无论在程序中使用go关键字启用多少goroutine,它们都会被塞到一个P的可运行G队列中

在确认P的最大数量后,运行时系统会根据这个数值初始化全局的P列表 【runtime.allp】,类似全局M列表,其中包含了所有 运行时系统创建的所有P。随后,运行时系统会把调度器的可运行G队列【runtime.sched.runq】中的所有G均匀的放入全局的P列表中的各个P的可执行G队列当中。到这里为止,运行时系统需要用到的所有P都准备就绪了。

类似M的空闲列表,调度器也存在一个P的空闲列表【runtime.sched.pidle】,当一个P不再与任何M关联的时候,运行时系统就会把该P放入这个列表中,而一个空闲的P关联了某个M之后会被从这个列表中取出【注意:就算一个P加入了空闲队列,但是它的可运行G队列不一定为空

和M不同P是有状态的:(五种)

Pidle:当前P未和任何M关联

Prunning:当前P正在和某个M关联

Psyscall:当前P中的被运行的那个G正在进行系统调用

Pgcstop:运行时系统正在进行gc。(运行时系统在gc时会试图把全局P列表中的P都处于此状态)

Pdead:当前P已经不再被使用。(在调用runtime.GOMAXPROCS减少P的数量时,多余的P就处于此状态)

P的初始状态就是为Pgcstop,处于这个状态很短暂,在初始化和填充P中的G队列之后,运行时系统会将其状态置为Pidle并放入调度器的空闲P列表 (runtime.sched.pidle)中。其中的P会由调度器根据实际情况进行取用。下图是P在各个状态建的流转情况:

从上图,我们可以看出,除了Pdead之外的其他状态的P都会在运行时系统欲进行GC是被指为Pgcstop在gc结束后状态不会回复到之前的状态的,而是都统一直接转到了Pidle 【这意味着,他们都需要被重新调度】。【注意】:除了Pgcstop 状态的P,其他状态的P都会在 调用runtime.GOMAXPROCS 函数去减少P数目时,被认为是多余的P而状态转为Pdead,这时候其带的可运行G的队列中的G都会被转移到 调度器的可运行G队列中,它的自由G队列 【gfree】也是一样被移到调度器的自由列表 【runtime.sched.gfree】中

【注意】:每个P中都有一个可运行G队列自由G队列。自由G队列包含了很多已经完成的G,随着被运行完成的G的积攒到一定程度后,运行时系统会把其中的部分G转移的调度器的自由G队列 【runtime.sched.gfree】中。

【注意】:当我们每次用 go关键字 启用一个G的时候,运行时系统都会先从P的自由G队列获取一个G来封装我们提供的函数 (go 关键字后面的函数) ,如果发现P中的自由G过少时,会从调度器的自由G队列中移一些G过来,只有连调度器的自由G列表都弹尽粮绝的时候,才会去创建新的G。

G (goroutine):

G是goroutine的头文字, goroutine可以解释为受管理的轻量线程, goroutine使用go关键词创建。

举例来说,  func main() { go other() },  这段代码创建了两个goroutine。
一个是main, 另一个是other, 【注意】:main本身也是一个goroutine

goroutine的新建, 休眠, 恢复, 停止都受到go运行时的管理。
goroutine执行异步操作时会进入休眠状态, 待操作完成后再恢复, 无需占用系统线程
goroutine新建或恢复时会添加到运行队列, 等待M取出并运行

G的结构体定义:(在 ./src/runtime/runtime2.go 文件中)


type g struct {
	// Stack parameters.
	// stack describes the actual stack memory: [stack.lo, stack.hi).
	// stackguard0 is the stack pointer compared in the Go stack growth prologue.
	// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
	// stackguard1 is the stack pointer compared in the C stack growth prologue.
	// It is stack.lo+StackGuard on g0 and gsignal stacks.
	// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
	stack       stack   // offset known to runtime/cgo   描述了真实的栈内存,包括上下界
	stackguard0 uintptr // offset known to liblink
	stackguard1 uintptr // offset known to liblink

	_panic         *_panic // innermost panic - offset known to liblink
	_defer         *_defer // innermost defer
	m              *m      // current m; offset known to arm liblink   当前运行G的M
	sched          gobuf    //  goroutine切换时,用于保存g的上下文
	syscallsp      uintptr        // if status==Gsyscall, syscallsp = sched.sp to use during gc
	syscallpc      uintptr        // if status==Gsyscall, syscallpc = sched.pc to use during gc
	stktopsp       uintptr        // expected sp at top of stack, to check in traceback
	param          unsafe.Pointer // passed parameter on wakeup   用于传递参数,睡眠时其他goroutine可以设置param,唤醒时该goroutine可以获取
	atomicstatus   uint32
	stackLock      uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
	goid           int64    // goroutine的ID
	waitsince      int64  // approx time when the g become blocked   g被阻塞的大体时间
	waitreason     string // if status==Gwaiting
	schedlink      guintptr
	preempt        bool     // preemption signal, duplicates stackguard0 = stackpreempt
	paniconfault   bool     // panic (instead of crash) on unexpected fault address
	preemptscan    bool     // preempted g does scan for gc
	gcscandone     bool     // g has scanned stack; protected by _Gscan bit in status
	gcscanvalid    bool     // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
	throwsplit     bool     // must not split stack
	raceignore     int8     // ignore race detection events
	sysblocktraced bool     // StartTrace has emitted EvGoInSyscall about this goroutine
	sysexitticks   int64    // cputicks when syscall has returned (for tracing)
	traceseq       uint64   // trace event sequencer
	tracelastp     puintptr // last P emitted an event for this goroutine
	lockedm        *m       // G被锁定只在这个m上运行
	sig            uint32
	writebuf       []byte
	sigcode0       uintptr
	sigcode1       uintptr
	sigpc          uintptr
	gopc           uintptr // pc of go statement that created this goroutine
	startpc        uintptr // pc of goroutine function
	racectx        uintptr
	waiting        *sudog         // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
	cgoCtxt        []uintptr      // cgo traceback context
	labels         unsafe.Pointer // profiler labels
	timer          *timer         // cached timer for time.Sleep

	// Per-G GC state

	// gcAssistBytes is this G's GC assist credit in terms of
	// bytes allocated. If this is positive, then the G has credit
	// to allocate gcAssistBytes bytes without assisting. If this
	// is negative, then the G must correct this by performing
	// scan work. We track this in bytes to make it fast to update
	// and check for debt in the malloc hot path. The assist ratio
	// determines how this corresponds to scan work debt.
	gcAssistBytes int64
}


// 用于保存G切换时上下文的缓存结构体
type gobuf struct {
	// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
	//
	// ctxt is unusual with respect to GC: it may be a
	// heap-allocated funcval so write require a write barrier,
	// but gobuf needs to be cleared from assembly. We take
	// advantage of the fact that the only path that uses a
	// non-nil ctxt is morestack. As a result, gogo is the only
	// place where it may not already be nil, so gogo uses an
	// explicit write barrier. Everywhere else that resets the
	// gobuf asserts that ctxt is already nil.
	sp   uintptr     // 当前的栈指针
	pc   uintptr     // 计数器
	g    guintptr    // g自身
	ctxt unsafe.Pointer // this has to be a pointer so that gc scans it
	ret  sys.Uintreg
	lr   uintptr
	bp   uintptr // for GOEXPERIMENT=framepointer
}

下面我们来讲讲G。Go语言的编译器会把我们编写的go语句编程一个运行时系统的函数调用,并把go语句中那个函数及其参数都作为参数传递给这个运行时系统函数中。

运行时系统在接到这样一个调用后,会先检查一下go函数及其参数的合法性,紧接着会试图从本地P的自由G队列中(或者调度器的自由G队列)中获取一个可用的自由G (P中有讲述了),如果没有则新创建一个G。类似M和P,G在运行时系统中也有全局的G列表【runtime.allg】,那些新建的G会先放到这个全局的G列表中,其列表的作用也是集中放置了当前运行时系统中给所有的G的指针。在用自由G封装go的函数时,运行时系统都会对这个G做一次初始化。

初始化:包含了被关联的go关键字后的函数及当前G的状态机G的ID等等。在G被初始化完成后就会被放置到当前本地的P的可运行队列中。只要时机成熟,调度器会立即尽心这个G的调度运行。

G的各种状态:

Gidle:G被创建但还未完全被初始化。

Grunnable:当前G为可运行的,正在等待被运行。

Grunning:当前G正在被运行。

Gsyscall:当前G正在被系统调用

Gwaiting:当前G正在因某个原因而等待

Gdead:当前G完成了运行

正在被初始化进行中的G是处于Grunnable状态的。一个G真正被使用是在状态为Grunnable之后。G的生命周期及状态变化如图:

图上有一步是事件到来,那么G在运行过程中,是否等待某个事件以及等待什么样的事件完全由起封装的go关键字后的函数决定。(如:等待chan中的值、涉及网络I/O、time.Timer、time.Sleep等等事件)

G退出系统调用,及其复杂:运行时系统先会尝试直接运行当前G,仅当无法被运行时才会转成Grunnable并放置入调度器的自由G列表中。

最后,已经是Gdead状态的G是可以被重新初始化并使用的。而对比进入Pdead状态的P等待的命运只有被销毁。处于Gdead的G会被放置到本地P或者调度器的自由G列表中。

至此,G、M、P的初步描述已经完毕,下面我们来看一看一些核心的队列:

G、M、P的容器
中文名 源码的名称 作用域 简要说明
全局M列表 runtime.allm 运行时系统 存放所有M
全局P列表 runtime.allp 运行时系统 存放所有P
全局G列表 runtime.allg 运行时系统 存放所有G
调度器中的空闲M列表 runtime.sched.midle 调度器 存放空闲M
调度器中的空闲P列表 runtime.sched.pidle 调度器 存放空闲P
调度器中的可运行G队列 runtime.sched.runq 调度器 存放可运行G
调度器中那个的自由G列表 runtime.sched.gfree 调度器 存放自由G
P的可运行G队列 runq 本地P 存放当前P中的可运行G
P中的自由G列表 gfree 本地P 存放当前P中的自由G

 三个全局的列表主要为了统计运行时系统的的所有G、M、P。我们主要关心剩下的这些容器,尤其是和G相关的四个

在运行时系统创建的G都会被保存在全局的G列表中,值得注意的是:从Gsyscall转出来的G都会被放置到调度器的可运行G队列中。而被运行时系统初始化的G会被放置到本地P的可运行列表中。从Gwaiting转出来的G,除了因网络I/O陷入等待的G之外,都会被放置到本地P的可运行G队列中。转成Gdead状态的G会先被放置到本地P的自由G列表 (上面的描述可以知道这一点)。调度器中的与G、M、P相关的列表其实只是起了一个暂存的作用。

一句话概括三者关系:

  • G需要绑定在M上才能运行;
  • M需要绑定P才能运行;

下面我们看一看三者及内核调度实体【KSE】的关系:

 

综上所述,一个G的执行需要M和P的支持。一个M在于一个P关联之后就形成一个有效的G运行环境 【内核线程 +  上下文环境】。每个P都含有一个 可运行G的队列【runq】。队列中的G会被一次传递给本地P关联的M并且获得运行时机

由上图可以看出 M 与 KSE 总是 一对一 的。一个M能且仅能代表一个内核线程

一个M的生命周期内,它会且仅会与一个KSE产生关联M与P以及P与G之间的关联是多变的总是会随着实际调度的过程而改变。其中, M 与 P 总是一对一,P 与 G 总是 一对多, 而 一个 G 最终由 一个 M 来负责运行

上述我们讲的运行时系统其实就是我们下面要说的调度器

我们再来回顾下G、M、P 中的主要成员:

G里面比较重要的成员:

  • stack: 当前g使用的栈空间, 有lo和hi两个成员
  • stackguard0: 检查栈空间是否足够的值, 低于这个值会扩张栈, 0是go代码使用的
  • stackguard1: 检查栈空间是否足够的值, 低于这个值会扩张栈, 1是原生代码使用的
  • m: 当前g对应的m
  • sched: g的调度数据, 当g中断时会保存当前的pc和rsp等值到这里, 恢复运行时会使用这里的值
  • atomicstatus: g的当前状态
  • schedlink: 下一个g, 当g在链表结构中会使用
  • preempt: g是否被抢占中
  • lockedm: g是否要求要回到这个M执行, 有的时候g中断了恢复会要求使用原来的M执行

M里面比较重要的成员:

  • g0: 用于调度的特殊g, 调度和执行系统调用时会切换到这个g
  • curg: 当前运行的g
  • p: 当前拥有的P
  • nextp: 唤醒M时, M会拥有这个P
  • park: M休眠时使用的信号量, 唤醒M时会通过它唤醒
  • schedlink: 下一个m, 当m在链表结构中会使用
  • mcache: 分配内存时使用的本地分配器, 和p.mcache一样(拥有P时会复制过来)
  • lockedg: lockedm的对应值

P里面比较重要的成员:

  • status: p的当前状态
  • link: 下一个p, 当p在链表结构中会使用
  • m: 拥有这个P的M
  • mcache: 分配内存时使用的本地分配器
  • runqhead: 本地运行队列的出队序号
  • runqtail: 本地运行队列的入队序号
  • runq: 本地运行队列的数组, 可以保存256个G
  • gfree: G的自由列表, 保存变为_Gdead后可以复用的G实例
  • gcBgMarkWorker: 后台GC的worker函数, 如果它存在M会优先执行它
  • gcw: GC的本地工作队列, 详细将在下一篇(GC篇)分析

调度器涉及到的结构体除了上面的G、M、P 之外,还有以下,比如全局的调度器:

type schedt struct {
	// accessed atomically. keep at top to ensure alignment on 32-bit systems.
     // 下面两个变量需以原子访问访问。保持在 struct 顶部,确保其在 32 位系统上可以对齐
	goidgen  uint64
	lastpoll uint64

	lock mutex
    
    // 当修改 nmidle,nmidlelocked,nmsys,nmfreed 这些数值时
    // 需要记得调用 checkdead

	midle        muintptr // idle m's waiting for work   空闲的M 队列。
	nmidle       int32    // number of idle m's waiting for work  当前等待工作的空闲 m 计数
	nmidlelocked int32    // number of locked m's waiting for work  当前等待工作的被 lock 的 m 计数
	mcount       int32    // number of m's that have been created  已经创建的 m 数量
	maxmcount    int32    // maximum number of m's allowed (or die)   允许创建的最大的 m 数量

	ngsys uint32 // number of system goroutines; updated atomically  系统 goroutine 的数量, 原子操作

	pidle      puintptr // idle p's   空闲的 p 队列
	npidle     uint32
	nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

	// Global runnable queue.
     // 全局的可运行 g 队列
	runqhead guintptr       // 队头地址
	runqtail guintptr       // 队尾地址 
	runqsize int32          // 队列宽度  

	// Global cache of dead G's.
    // dead G 的全局缓
	gflock       mutex
	gfreeStack   *g        // 栈中自由g ?
	gfreeNoStack *g        // 堆中自由g ?   
	ngfree       int32

	// Central cache of sudog structs.
    // sudog 结构的集中缓存
	sudoglock  mutex
	sudogcache *sudog

	// Central pool of available defer structs of different sizes.
    // 不同大小的可用的 defer struct 的集中缓存池
	deferlock mutex
	deferpool [5]*_defer

	gcwaiting  uint32 // gc is waiting to run  gc 等待运行状态。 作为gc任务被执行期间的辅助标记、停止计数和通知机制
	stopwait   int32
	stopnote   note
	sysmonwait uint32  // 作为 系统检测任务被执行期间的停止计数和通知机制
	sysmonnote note

	// safepointFn should be called on each P at the next GC
	// safepoint if p.runSafePointFn is set.
    // 应在下一个GC上的每个P上调用safepointFn
    // 如果设置了p.runSafePointFn,则为safepoint。
	safePointFn   func(*p)
	safePointWait int32
	safePointNote note

	profilehz int32 // cpu profiling rate   CPU分析率

	procresizetime int64 // nanotime() of last change to gomaxprocs   上次修改 gomaxprocs 的纳秒时间
	totaltime      int64 // ∫gomaxprocs dt up to procresizetime
}

全局调度器,全局只有一个schedt类型的实例

sudoG 结构体:

// sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时
// 之所以需要 sudog 是因为 g 和同步对象之间的关系是多对多的
// 一个 g 可能会在多个等待队列中,所以一个 g 可能被打包为多个 sudog
// 多个 g 也可以等待在同一个同步对象上
// 因此对于一个同步对象就会有很多 sudog 了
// sudog 是从一个特殊的池中进行分配的。用 acquireSudog 和 releaseSudog 来分配和释放 sudog

type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g          *g
	selectdone *uint32 // CAS to 1 to win select race (may point to stack)
	next       *sudog
	prev       *sudog
	elem       unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32
	parent      *sudog // semaRoot binary tree
	waitlink    *sudog // g.waiting list or semaRoot
	waittail    *sudog // semaRoot
	c           *hchan // channel
}

 

那么goroutine的入口是怎么样的呢?首先,我们从goroutine是如何被创建的说起,创建goroutine的函数为:newproc 函数 (在 ./src/runtime/proc.go 文件中),即:使用go命令创建goroutine时, go会把go命令编译为对runtime.newproc的调用

// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.

// 根据 参数 fn 和 siz 创建一个 g
// 并把它放置入 自由g队列中等待唤醒
// 编译器翻译一个 go 表达式时会调用这个函数
// 无法拆分堆栈,因为它假设参数在 &fn 之后顺序可用; 如果发生堆栈拆分,则不会复制它们。

//    新建一个goroutine,
//    用fn + PtrSize 获取第一个参数的地址,也就是argp
//    用siz - 8 获取pc地址

//go:nosplit
func newproc(siz int32, fn *funcval) {
    // add 是一个指针运算,跳过函数指针
    // 把栈上的参数起始地址找到
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)

    // getcallerpc返回的是 调用函数之后的那条程序指令的地址,
    // 即callee函数返回时要执行的下一条指令的地址
	pc := getcallerpc(unsafe.Pointer(&siz))
    
    // 用g0的栈创建G对象
	systemstack(func() {
		newproc1(fn, (*uint8)(argp), siz, 0, pc)
	})
}


// 结构体 funcval
// funcval 是一个变长结构,第一个成员是函数指针
// 所以上面的 add 是跳过这个 fn
type funcval struct {
	fn uintptr
	// variable-size, fn-specific data here   这里的可变大小,特定于fn的数据
}

runtime.newproc函数中只做了三件事:

  • 计算额外参数的地址 argp
  • 获取调用端的地址(返回地址) pc
  • 使用systemstack调用 newproc1 函数

systemstack 会切换当前的 g g0, 并且使用g0的栈空间, 然后调用传入的函数, 再切换回原来的g和原来的栈空间
切换到g0后会假装返回地址是mstart, 这样traceback的时候可以在mstart停止。
这里传给systemstack的是一个闭包, 调用时会把闭包的地址放到寄存器rdx, 具体可以参考上面对闭包的分析。

下面我们在主要来看看  newproc1 函数做了什么:

// Create a new g running fn with narg bytes of arguments starting
// at argp and returning nret bytes of results.  callerpc is the
// address of the go statement that created this. The new g is put
// on the queue of g's waiting to run.

// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {

    // 先获取 当前 g,其实这里获取到的是 g0
	_g_ := getg()
    
    // 判断下 func 的实现是否为空
	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
    
    // 设置g对应的m的locks++, 禁止抢占
	_g_.m.locks++ // disable preemption because it can be holding p in a local var   禁用抢占,因为它可以在本地var中保存p
	siz := narg + nret
	siz = (siz + 7) &^ 7

	// We could allocate a larger initial stack if necessary.
	// Not worth it: this is almost always an error.
	// 4*sizeof(uintreg): extra space added below
	// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
	if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
		throw("newproc: function arguments too large for new goroutine")
	}

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}
	if narg > 0 {
		memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
		// This is a stack-to-stack copy. If write barriers
		// are enabled and the source stack is grey (the
		// destination is always black), then perform a
		// barrier copy. We do this *after* the memmove
		// because the destination stack may have garbage on
		// it.
		if writeBarrier.needed && !_g_.m.curg.gcscandone {
			f := findfunc(fn.fn)
			stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
			// We're in the prologue, so it's always stack map index 0.
			bv := stackmapdata(stkmap, 0)
			bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
		}
	}

	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	newg.gcscanvalid = false
	casgstatus(newg, _Gdead, _Grunnable)

	if _p_.goidcache == _p_.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0, so main goroutine receives goid=1.
		_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
		_p_.goidcache -= _GoidCacheBatch - 1
		_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
	}
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	if raceenabled {
		newg.racectx = racegostart(callerpc)
	}
	if trace.enabled {
		traceGoCreate(newg, newg.startpc)
	}
	runqput(_p_, newg, true)

	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
		wakep()
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
	return newg
}

先大致看下newproc1 函数逻辑流程:

newproc1 --> newg
newg[gfget] --> nil{is nil?}
nil -->|yes|E[init stack]
nil -->|no|C[malg]
C --> D[set g status=> idle->dead]
D --> allgadd
E --> G[set g status=> dead-> runnable]
allgadd --> G
G --> runqput

runtime.newproc1的处理如下:

  • 调用getg获取当前的g, 会编译为读取FS寄存器(TLS), 这里会获取到g0
  • 设置g对应的m的locks++, 禁止抢占
  • 获取m拥有的p
  • 新建一个g
    • 首先调用gfget从p.gfree获取g, 如果之前有g被回收在这里就可以复用
    • 获取不到时调用malg分配一个g, 初始的栈空间大小是2K
    • 需要先设置g的状态为已中止(_Gdead), 这样gc不会去扫描这个g的未初始化的栈
  • 把参数复制到g的栈上
  • 把返回地址复制到g的栈上, 这里的返回地址是goexit, 表示调用完目标函数后会调用goexit
  • 设置g的调度数据(sched)
    • 设置sched.sp等于参数+返回地址后的rsp地址
    • 设置sched.pc等于目标函数的地址, 查看gostartcallfngostartcall
    • 设置sched.g等于g
  • 设置g的状态为待运行(_Grunnable)
  • 调用runqput把g放到运行队列
    • 首先随机把g放到p.runnext, 如果放到runnext则入队原来在runnext的g
    • 然后尝试把g放到P的"本地运行队列"
    • 如果本地运行队列满了则调用runqputslow把g放到"全局运行队列"
      • runqputslow会把本地运行队列中一半的g放到全局运行队列, 这样下次就可以继续用快速的本地运行队列了
  • 如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 并且主函数已执行则唤醒或新建一个M
    • 这一步非常重要, 用于保证当前有足够的M运行G, 具体请查看上面的"空闲M链表"
    • 唤醒或新建一个M会通过wakep函数
      • 首先交换nmspinning到1, 成功再继续, 多个线程同时执行wakep只有一个会继续
      • 调用startm函数
        • 调用pidleget从"空闲P链表"获取一个空闲的P
        • 调用mget从"空闲M链表"获取一个空闲的M
        • 如果没有空闲的M, 则调用newm新建一个M
          • newm会新建一个m的实例, m的实例包含一个g0, 然后调用newosproc动一个系统线程
          • newosproc会调用syscall clone创建一个新的线程
          • 线程创建后会设置TLS, 设置TLS中当前的g为g0, 然后执行mstart
        • 调用notewakeup(&mp.park)唤醒线程

创建goroutine的流程就这么多了, 接下来看看M是如何调度的.

 

 

(未完,疲惫中.............)

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_25870633/article/details/83445946
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2019-08-25 14:31:51
  • 阅读 ( 1702 )
  • 分类:架构

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢