go 协程实现 - Go语言中文社区

go 协程实现


[TOC]

go 协程执行过程

1. 生成g,并放入队列

  • 用户调用go时,编译器会会调用runtime.newproc函数
// go 一个协程,就会调用此函数, 将其放到g的 queue中
// siz 为参数bytes
// fn 则包含了参数及函数相关信息
func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg()
    pc := getcallerpc()
    systemstack(func() {
        newproc1(fn, argp, siz, gp, pc)
    })
}

  • 其中 systemstack 会在system stack 上执行 传入的函数
func systemstack(fn func())

如果是g0(per-OS-thread) 或者是signal handling (gsignal)调用systemstack,则会直接执行fn并返回
如果是一个普通的goroutine调用,则会先切换到p0 stack,调用fn,然后再切回

  • newproc1 实际创建一个新的g来执行具体函数
// 创建一个新的g
// argp 为参数起点地址
// narg为参数长度
// callerpc为创建这个go的pc
// callergp为创建这个go的g
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
  ...
  _p_ := _g_.m.p.ptr()
  // 从gfree list中获取一个空闲g,如果local gfree list没有,则从 global list中获取
  newg := gfget(_p_)
  // 如果没有空闲g则新分配一个
  if newg == nil {
   // 创建一个stack size最小的g
   newg = malg(_StackMin)
   // 设置g的状态为_Gdead(g退出或者新建时为此状态)
   // 并且这个函数会在 g->atomicstatus is in a Gscan status时循环等待,直到Gscan state is finished
   casgstatus(newg, _Gidle, _Gdead)
   // 添加到allgs列表中
   allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
   ....
    // 初始化newg相关变量
    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg, false) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    newg.gcscanvalid = false
    // 修改newg为 _Grunnable状态
    casgstatus(newg, _Gdead, _Grunnable)
    ....
    // runqput(_p_ *p, gp *g, next bool)
    // 尝试将newg放到当前的local runnable queue
    // 如果next为true,则将g放到 _p_.runnext(下次调度)
    // 如果next为false,则放在队列尾部
    // 如果queue满的话,则放在全局的queue中
    runqput(_p_, newg, true)
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        // 尝试找M执行P的g队列,如果必要的话会创建M
        // 如果p==nil,尝试获取一个空闲的p,如果没有则什么也不做
        wakep()
    }
    releasem(_g_.m)
}
  • wakep 在g变为 runnable时调用,尝试添加一个P执行g
func wakep() {
    // be conservative about spinning threads
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

其中 startm如下

// 调度M执行p(创建一个M如果需要的话)
// 如果p为nil,尝试获取一个idle的p,否则啥也不干
func startm(_p_ *p, spinning bool) {
  if _p_ == nil {
   //尝试获取一个idle的p
    _p_ = pidleget()
    if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return
        }
  }
  // 尝试从midle list中获取一个m
  mp := mget()
  if mp == nil {
    .....
    // 创建一个新m,并且调用fn或者scheduler
    newm(fn, _p_)
    .....
  }
  ......
  // 标记m状态
  notewakeup(&mp.park)
}

2. 调度

go协程的调度由m来处理
M启动时会调用mstart函数, m0在初始化后调用, 其他的的m在线程启动后调用.

2.1 mstart函数的处理如下:

  • 调用getg获取当前的g, 这里会获取到g0
  • 如果g未分配栈则从当前的栈空间(系统栈空间)上分配, 也就是说g0会使用系统栈空间
  • 调用mstart1函数
    • 调用gosave函数保存当前的状态到g0的调度数据中, 以后每次调度都会从这个栈地址开始
    • 调用asminit函数, 不做任何事情
    • 调用minit函数, 设置当前线程可以接收的信号(signal)
    • 调用schedule函数

mstart 函数如下:

func mstart() {
        // 获取当前g
    _g_ := getg()
    //如果g未分配栈则从当前的栈空间(系统栈空间)上分配, 也就是说g0会使用系统栈空间
    osStack := _g_.stack.lo == 0
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    ......
    // 调用mstart1
    mstart1()
    ......
}

mstart1 函数如下:

func mstart1() {
    _g_ := getg()
    ....
    //因为schedule会一直循环
    // 所以要提前在mcall的stack顶中保留当前的pc和sp (stack pointer (SP))
    //方便起它的调用可以reuse the current frame
    save(getcallerpc(), getcallersp())
    asminit()
    minit()

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    if _g_.m == &m0 {
        mstartm0()
    }

    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }

    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
        // 开始调度
    schedule()
}

2.2 调用schedule函数后就进入了调度

schedule函数获取g => [必要时休眠] => [唤醒后继续获取] => execute函数执行g => 执行后返回到goexit => 重新执行schedule函数

  • schedule
// 找到一个runnable goroutine并执行
func schedule() {
    _g_ := getg()
       .....
       if _g_.m.lockedg != 0 {
                // 停止当前m的执行,直到g重新runnable
        stoplockedm()
                // func execute(gp *g, inheritTime bool) 
                // 调度gp在当前的M执行
                // 如果inheritTime为true,则继承剩下的时间
                // 否则重新开启一个time
                // 用远不会返回
                // 允许Write barriers
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }
        .....
top:
    // 如果当前GC需要停止整个世界(STW), 则调用[stopm](https://github.com/golang/go/blob/go1.13.5/src/runtime/proc.go#L2103)休眠当前的M
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    // 如果M拥有的P中指定了需要在安全点运行的函数(P.runSafePointFn), 则运行它
    if _g_.m.p.ptr().runSafePointFn != 0 {
        runSafePointFn()
    }
    ...
    // 检测wakeP是否准备OK
    tryWakeP := false
    if trace.enabled || trace.shutdown {
        gp = traceReader()
        if gp != nil {
            //切换为 _Grunnable 状态
            casgstatus(gp, _Gwaiting, _Grunnable)
            traceGoUnpark(gp, 0)
            tryWakeP = true
        }
    }
    ...
    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
                // 每调度61次会检测运行下global runnable queue 
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
      
    if gp == nil {
        //从本地运行队列获取一个可以运行的g
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        // 如果本地没有可运行的g,则尝试从其他的P中获取(会直接取一半),或者从 global queue中获取,或者poll network,都没有一直尝试获取
        gp, inheritTime = findrunnable() // blocks until work is available
    }
    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
   //如果当前m标记为自旋,则reset
    if _g_.m.spinning {
        //如果当前有空闲的P, 但是无自旋的M(nmspinning等于0), 则唤醒或新建一个M
        //这里离开自选状态是为了执行G, 所以会检查是否有空闲的P, 有则表示可以再开新的M执行G
        resetspinning()
    }
    ....
    //如果G要求回到指定的M(例如上面的runtime.main)
    if gp.lockedm != 0 {
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        // 调用[startlockedm](https://github.com/golang/go/blob/go1.13.5/src/runtime/proc.go#L2083)函数把G和P交给该M, 自己进入休眠
        startlockedm(gp)
        // 从休眠唤醒后跳到schedule的顶部重试
        goto top
    }
    // 执行G
    execute(gp, inheritTime)
}

2.3 调用execute函数 执行具体操作

execute 调度gp在当前的M上运行
如果inheritTime为true,则使用剩余时间,否则就开启一个新的时间片

func execute(gp *g, inheritTime bool) {
    _g_ := getg()
    // 修改g状态为_Grunning
    casgstatus(gp, _Grunnable, _Grunning)
    ....
    // 每次调度增加一
        if !inheritTime {
        _g_.m.p.ptr().schedtick++
    }
    .....
    //这个函数会根据g.sched中保存的状态恢复各个寄存器的值并继续运行g
    gogo(&gp.sched)
}

目标函数执行完毕后会调用goexit函数, goexit函数会调用goexit1函数, goexit1函数会通过mcall
mcall这个函数就是用于实现"保存状态"的, 处理如下:

  • 设置g.sched.pc等于当前的返回地址
  • 设置g.sched.sp等于寄存器rsp的值
  • 设置g.sched.g等于当前的g
  • 设置g.sched.bp等于寄存器rbp的值
  • 切换TLS中当前的g等于m.g0
  • 设置寄存器rsp等于g0.sched.sp, 使用g0的栈空间
  • 设置第一个参数为原来的g
  • 设置rdx寄存器为指向函数地址的指针(上下文)
  • 调用指定的函数, 不会返回

goexit1函数会通过mcall调用goexit0函数, goexit0函数调用时已经回到了g0的栈空间, 处理如下:

  • 把G的状态由运行中(_Grunning)改为已中止(_Gdead)
  • 清空G的成员
  • 调用dropg函数解除M和G之间的关联
  • 调用gfput函数把G放到P的自由列表中, 下次创建G时可以复用
  • 调用schedule函数继续调度

G结束后回到schedule函数, 这样就结束了一个调度循环.
不仅只有G结束会重新开始调度, G被抢占或者等待资源也会重新进行调度

2.4 抢占

runtime.main会创建一个额外的M运行sysmon函数, 抢占就是在sysmon中实现的.
sysmon会进入一个无限循环, 第一轮回休眠20us, 之后每次休眠时间倍增, 最终每一轮都会休眠10ms.
sysmon中有netpool(获取fd事件), retake(抢占), forcegc(按时间强制执行gc), scavenge heap(释放自由列表中多余的项减少内存占用)等处理.

func sysmon() {
....
    for {
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)

        ...
        // poll network if not polled for more than 10ms
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        // 如果超过10ms没有poll则处理下
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            //获取等待poll后runnable的g 列表
            list := netpoll(false) // non-blocking - returns list of goroutines
            if !list.empty() {
                // Need to decrement number of idle locked M's
                // (pretending that one more is running) before injectglist.
                // Otherwise it can lead to the following situation:
                // injectglist grabs all P's but before it starts M's to run the P's,
                // another M returns from syscall, finishes running its G,
                // observes that there is no work to do and no other running M's
                // and reports deadlock.
                incidlelocked(-1)
                injectglist(&list)
                incidlelocked(1)
            }
        }
        // retake P's blocked in syscalls
        // and preempt long running G's
        //[retake](https://github.com/golang/go/blob/master/src/runtime/proc.go#L4388)函数负责处理抢占
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
    // check if we need to force a GC
    // 检查是否需要强制GC
        if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
            lock(&forcegc.lock)
            forcegc.idle = 0
            var list gList
            list.push(forcegc.g)
            injectglist(&list)
            unlock(&forcegc.lock)
        }
        if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
            lasttrace = now
            schedtrace(debug.scheddetail > 0)
        }
    }
}
  • 其中抢占流程主要由retake完成
func retake(now int64) uint32 {
  lock(&allpLock)
  // 遍历所有p
  for i := 0; i < len(allp); i++ {
    _p_ := allp[i]
    ....
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            // 如果G运行太长时间,则Preempt
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {   // 如果超过了forcePreemptNS (10ms),则抢占
                // 抢占
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }
        if s == _Psyscall {
            ....
            // 如果P在系统调用中(_Psyscall), 且经过了一次sysmon循环(20us~10ms), 则抢占这个P
            // 调用[handoffp](https://github.com/golang/go/blob/master/src/runtime/proc.go#L1990)解除M和P之间的关联
            handoffp(_p_)
        }
  }
  unlock(&allpLock)
  return uint32(n))
}
  • 通过preemptone来具体实现抢占
// 让在p上运行的g停止
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // go中每次call都会检测 stack overflow,通过设置gp.stackguard0 = stackPreempt(stackPreempt超过任何的real sp),检测就会意识到这是被抢占了
    gp.stackguard0 = stackPreempt
    return true
}

参考

Golang源码探索(二) 协程的实现原理
https://github.com/golang/go/tree/go1.13.5

版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/75c0d50ae785
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-01-12 13:07:13
  • 阅读 ( 1129 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢