今天小编给大家分享一下Go调度器学习之goroutine调度怎么创建的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
在以下情形中,goroutine
可能会发生调度:
情形 | 说明 |
---|---|
go func(){} | 使用go关键字创建一个新的goroutine,调度器会考虑调度 |
GC | 由于GC也需要在系统线程M上执行,且其中需要所有的goroutine都停止运行,所以也会发生调度 |
系统调用 | 发生系统的调用时,会阻塞M,所以它会被调度走,同时新的goroutine也会被调度上来 |
同步内存访问 | mutex、channel等操作会使得goroutine阻塞,因此会被调度走,等条件满足后,还会被调度上来继续运行 |
其中,使用go
关键字创建协程时的调度分析,上篇博客做了初步的分析,特别是有关调度循环的分析,但是我们没有具体分析,当创建协程时,系统是怎么发生调度的。
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
我们还记得,go
关键字在创建协程时,Go
的编译器会将其转换为runtime.newproc
函数,上篇我们详细分析了main goroutine
的创建过程,在runtime.main
函数中,全局变量mainStarted
会被置为true
,之后普通协程的创建,则会调用runtime.wakep
函数尝试唤醒空闲的P。
func wakep() {
if atomic.Load(&sched.npidle) == 0 {
return
}
// be conservative about spinning threads
if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
wakep
函数首先确认是否有其他线程正在处于spinning
状态,即M是否在找工作,如果没有的话,则调用startm
函数创建一个新的、或者唤醒一个处于睡眠状态的工作线程出来工作。
func startm(_p_ *p, spinning bool) {
// Disable preemption.
//
// Every owned P must have an owner that will eventually stop it in the
// event of a GC stop request. startm takes transient ownership of a P
// (either from argument or pidleget below) and transfers ownership to
// a started M, which will be responsible for performing the stop.
//
// Preemption must be disabled during this transient ownership,
// otherwise the P this is running on may enter GC stop while still
// holding the transient P, leaving that P in limbo and deadlocking the
// STW.
//
// Callers passing a non-nil P must already be in non-preemptible
// context, otherwise such preemption could occur on function entry to
// startm. Callers passing a nil P may be preemptible, so we must
// disable preemption before acquiring a P from pidleget below.
mp := acquirem() // 保证在此期间不会发生栈扩展
lock(&sched.lock)
if _p_ == nil { // 没有指定p,那么需要从空闲队列中取一个p
_p_ = pidleget()
if _p_ == nil {// 如果没有空闲的p,直接返回
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
releasem(mp)
return
}
}
nmp := mget() // 如果有空闲的p,那么取出一个空闲的m
if nmp == nil {// 如果没有空闲的m,那么调用newm创建一个,然后返回
// No M is available, we must drop sched.lock and call newm.
// However, we already own a P to assign to the M.
//
// Once sched.lock is released, another G (e.g., in a syscall),
// could find no idle P while checkdead finds a runnable G but
// no running M's because this new M hasn't started yet, thus
// throwing in an apparent deadlock.
//
// Avoid this situation by pre-allocating the ID for the new M,
// thus marking it as 'running' before we drop sched.lock. This
// new M will eventually run the scheduler to execute any
// queued G's.
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_, id)
// Ownership transfer of _p_ committed by start in newm.
// Preemption is now safe.
releasem(mp)
return
}
unlock(&sched.lock)
if nmp.spinning {
throw("startm: m is spinning")
}
if nmp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
nmp.spinning = spinning
nmp.nextp.set(_p_)
notewakeup(&nmp.park) // 如果有空闲的m,则唤醒这个m
// Ownership transfer of _p_ committed by wakeup. Preemption is now
// safe.
releasem(mp)
}
startm
函数首先判断是否有空闲的P,如果没有则直接返回;如果有,则判断是否有空闲的M,如果没有,则新建一个;如果有空闲的M,则唤醒这个M。说白了,wakep
函数就是为了更大程度的利用P,利用CPU资源。
说到这里,我们就需要重温一下上篇博客讲到的,调度中获取goroutine
的规则是:
每调度61次就需要从全局队列中获取goroutine
;
其次优先从本P所在队列中获取goroutine
;
如果还没有获取到,则从其他P的运行队列中窃取goroutine
;
其中,从其他P队列中窃取goroutine
,调用的是findrunnable
函数,这个函数很长,为了简化说明,我们删除一些不是很重要的代码:
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
...
// local runq
// 再从本地队列找找
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 再看看全局队列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
...
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
gp, inheritTime, tnow, w, newWork := stealWork(now) // 调用stealWork盗取goroutine
now = tnow
if gp != nil {
// Successfully stole.
return gp, inheritTime
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
...
// return P and block
// 上面的窃取没有成功,那么解除m和p的绑定,摒弃娥江p放到空闲队列,然后去休眠
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
...
_g_.m.spinning = false // m即将睡眠,状态不再是spinning
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
...
stopm() // 休眠
goto top
}
从上面的代码可以看出,工作线程会反复尝试寻找运行的goroutine
,实在找不到的情况下才会进入到睡眠。需要注意的是,工作线程M从其他P的本地队列中盗取goroutine时的状态称之为自旋(spinning)状态,而前面讲到wakep
调用startm
函数,也是优先从自旋状态的M中选取,实在没有才去唤醒休眠的M,再没有就创建新的M。
窃取算法stealWork
我们就不分析了,有兴趣的同学可以看看。下面具体分析下stopm
是怎么实现线程睡眠的。
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) // 把m放到sched.midle空闲队列
unlock(&sched.lock)
mPark()
acquirep(_g_.m.nextp.ptr()) // 绑定这个m和其下一个p,这里没有看懂为啥这么操作
_g_.m.nextp = 0
}
func mPark() {
gp := getg()
notesleep(&gp.m.park) // 进入睡眠状态
noteclear(&gp.m.park)
}
可以看出,stopm
主要是将m对象放到调度器的空闲线程队列,然后通过notesleep
进入睡眠状态。note
是go runtime
实现的一次性睡眠和唤醒机制,通过notesleep
进入睡眠状态,然后另一个线程可以通过notewakeup
唤醒这个线程。
小结
上面巴拉巴拉讲了那么多,看的人有点头晕,我们接下来讲一个很小的例子梳理一下以上的逻辑(主线程的创建和执行在上一篇博客中详细叙述过,这里不再赘述),主线程创建了一个goroutine
,这时候会触发wakep
,接下来可能会唤醒空闲的工作线程(如果是第一个非main goroutine
,就没有空闲的工作线程),或者创建一个新的工作线程,或者什么都不做。
如果是创建一个新的工作线程,那么其开启执行的点也是mstart
函数(注意区分mstart
和startm
),然后在schedule
函数中会尝试去获取goroutine
,如果全局和本地的goroutine
队列都没有,则会去其他的P上窃取goroutine
,如果窃取不成功,则会休眠。
如果是去唤醒工作协程,唤醒后会在休眠的地方开始,重新进行窃取。
窃取到工作协程后,就会去执行,然后就会因为各种原因重新开始调度循环。
在Go
中,有很多种情形会导致goroutine
阻塞,即其主动挂起,然后被调度走,等满足其运行条件时,还会被调度上来继续运行。比如channel
的读写,我们以通道的阻塞读为例,来介绍goroutine
的主动挂起的调度方式。
和前面介绍的Map一样,channel
的读也有以下两种读取方式:
v := <- ch v, ok := <- ch
分别对应以下chanrecv1
和chanrecv2
函数:
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
无论是哪个函数,最终调用的都是chanrecv
函数:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
c.recvq.enqueue(mysg) // 将这个goroutine放到channel的recv的queue中
atomic.Store8(&gp.parkingOnChan, 1)
// 挂起这个goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
chanrecv
会先判断channel是否有数据可读,如果有则直接读取并返回,如果没有则将这个goroutine
放到channel
的recv
的queue
中,然后调用gopark
函数将当前goroutine
挂起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
gopark
函数则使用mcall
函数(前面分析过,主要作用是保存当前goroutine
现场,然后切换到g0
栈去调用作为参数传入的函数)取执行park_m
函数:
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
park_m
首先把当前goroutine
的状态设置为_Gwaiting
(因为它正在等待其它goroutine
往channel
里面写数据),然后调用dropg
函数解除g
和m
之间的关系,最后通过调用schedule
函数进入调度循环。
至此,一个goroutine
就被主动挂起了。
我们继续以上例子,当另一个goroutine
对这个channel
发送数据的时候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
goready(gp, skip+1)
}
channel
的发送流程和读取类似,当检查到接收队列中有等待着时,会调用send
函数然后调用goready
唤醒协程:
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
releasem(mp)
}
这里发现,ready
函数和创建协程时一样,会触发wakep
来检查是否需要唤醒空闲P来执行。而在此之前,这个被唤醒的goroutine
会放到P的本地队列的下一个执行goroutine
,以提升时效性。
到这里,一个被挂起的协程也就被唤醒了。
以上就是“Go调度器学习之goroutine调度怎么创建”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://juejin.cn/post/7215967453929799740