社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
研究一下大神的goroutine pool的实现
https://www.cnblogs.com/williamjie/p/9267741.html
设计思路
启动服务之时先初始化一个 Goroutine Pool 池,这个Pool维护了一个类似栈的LIFO队列 ,里面存放负责处理任务的Worker,然后在client端提交task到Pool中之后,在Pool内部,接收task之后的核心操作是:
流程图如下
代码实现:
核心变量 pool默认大小 扫描清理goroutine时间
const (
// DefaultAntsPoolSize is the default capacity for a default goroutine pool.
DefaultAntsPoolSize = math.MaxInt32
// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = 5
)
核心方法
Submit 提交任务到pool
Running返回正在运行goroutine数量
Cap返回pool默认大小
Free返回可用的goroutine数量
Release 关闭pool
// Submit submits a task to pool.
func Submit(task f) error {
return defaultAntsPool.Submit(task)
}
// Running returns the number of the currently running goroutines.
func Running() int {
return defaultAntsPool.Running()
}
// Cap returns the capacity of this default pool.
func Cap() int {
return defaultAntsPool.Cap()
}
// Free returns the available goroutines to work.
func Free() int {
return defaultAntsPool.Free()
}
// Release Closes the default pool.
func Release() {
defaultAntsPool.Release()
}
单例模式的pool
// Init a instance pool when importing ants.
var defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
// NewPool generates an instance of ants pool.
func NewPool(size int) (*Pool, error) {
return NewTimingPool(size, DefaultCleanIntervalTime)
}
// NewTimingPool generates an instance of ants pool with a custom timed task.
func NewTimingPool(size, expiry int) (*Pool, error) {
if size <= 0 {
return nil, ErrInvalidPoolSize
}
if expiry <= 0 {
return nil, ErrInvalidPoolExpiry
}
p := &Pool{
capacity: int32(size),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second,
}
p.cond = sync.NewCond(&p.lock)
go p.periodicallyPurge()
return p, nil
}
返回Pool类
capacity pool size running正在运行的goroutine数量
expiryDuration 每个worker的空闲允许时间 workers Worker工作者slice
lock 同步锁 cond 等待空闲worker条件锁
// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration
// workers is a slice that store the available workers.
workers []*Worker
// release is used to notice the pool to closed itself.
release chan sig
// lock for synchronous operation.
lock sync.Mutex
// cond for waiting to get a idle worker
cond *sync.Cond
once sync.Once
}
启动pool之后,另启动一个goroutine周期性不断监测空闲worker并且清理
go p.periodicallyPurge()
Worder工作者
// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
// pool who owns this worker.
pool *Pool
// task is a job should be done.
task chan f
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
}
周期性检查空闲workers,检查每一个空闲worker的recycletime,如果大于空闲阈值时间则清理该worker
// clear expired workers periodically.
func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {
p.lock.Unlock()
return
}
n := -1
for i, w := range idleWorkers {
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.task <- nil
idleWorkers[i] = nil
}
if n > -1 {
if n >= len(idleWorkers)-1 {
p.workers = idleWorkers[:0]
} else {
p.workers = idleWorkers[n+1:]
}
}
p.lock.Unlock()
}
}
核心方法的具体实现
submit
// Submit submits a task to this pool.
func (p *Pool) Submit(task f) error {
if len(p.release) > 0 {
return ErrPoolClosed
}
p.getWorker().task <- task
return nil
}
获取一个可用的Worker
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
var w *Worker
waiting := false
p.lock.Lock()
defer p.lock.Unlock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
if n < 0 {
waiting = p.Running() >= p.Cap()
} else {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
}
if waiting {
for {
p.cond.Wait()
l := len(p.workers) - 1
if l < 0 {
continue
}
w = p.workers[l]
p.workers[l] = nil
p.workers = p.workers[:l]
break
}
} else if w == nil {
w = &Worker{
pool: p,
task: make(chan f, 1),
}
w.run()
p.incRunning()
}
return w
}
获取一个可用的Worker,判断空闲splice数量,如果有空闲splice,直接获取worker,如果没有空闲则阻塞等待直到有空闲worker
判断w是否为nil,如果为空,则新建一个Worker加入到pool中
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
go func() {
for f := range w.task {
if f == nil {
w.pool.decRunning()
return
}
f()
w.pool.putWorker(w)
}
}()
}
将Worker加入到pool中,发送signal信号,通知cond有可用Worker可用
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)
// Notify there is an available worker put back into queue.
p.cond.Signal()
p.lock.Unlock()
}
动态扩容
// ReSize changes the capacity of this pool.
func (p *Pool) ReSize(size int) {
if size == p.Cap() {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
diff := p.Running() - size
for i := 0; i < diff; i++ {
p.getWorker().task <- nil
}
}
running方法
// Running returns the number of the currently running goroutines.
func (p *Pool) Running() int {
return int(atomic.LoadInt32(&p.running))
}
free方法
// Free returns the available goroutines to work.
func (p *Pool) Free() int {
return int(atomic.LoadInt32(&p.capacity) - atomic.LoadInt32(&p.running))
}
cap方法
// Cap returns the capacity of this pool.
func (p *Pool) Cap() int {
return int(atomic.LoadInt32(&p.capacity))
}
release方法
// Release Closes this pool.
func (p *Pool) Release() error {
p.once.Do(func() {
p.release <- sig{}
p.lock.Lock()
idleWorkers := p.workers
for i, w := range idleWorkers {
w.task <- nil
idleWorkers[i] = nil
}
p.workers = nil
p.lock.Unlock()
})
return nil
}
测试代码
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool
var wg sync.WaitGroup
for i := 0; i < runTimes; i++ {
wg.Add(1)
ants.Submit(func() {
demoFunc()
wg.Done()
})
}
wg.Wait()
fmt.Printf("running goroutines: %dn", ants.Running())
fmt.Printf("finish all tasks.n")
// Use the pool with a function,
// set 10 to the size of goroutine pool and 1 second for expired duration
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks
for i := 0; i < runTimes; i++ {
wg.Add(1)
p.Serve(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %dn", p.Running())
fmt.Printf("finish all tasks, result is %dn", sum)
}
研究一下fasthttp中的workPool
https://github.com/valyala/fasthttp
workpool类
// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
type workerPool struct {
// Function for serving server connections.
// It must leave c unclosed.
WorkerFunc ServeHandler
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
connState func(net.Conn, ConnState)
}
启动方法start
func (wp *workerPool) Start() {
if wp.stopCh != nil {
panic("BUG: workerPool already started")
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
关闭方法stop 关闭所有的可用worker
func (wp *workerPool) Stop() {
if wp.stopCh == nil {
panic("BUG: workerPool wasn't started")
}
close(wp.stopCh)
wp.stopCh = nil
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
for i, ch := range ready {
ch.ch <- nil
ready[i] = nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
}
核心处理方法serve 获取可用的Ch来处理Conn
func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}
获取一个可用的worker
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}
从ready数组尾部获取,如果没有则阻塞等待可用的worker。
参考链接:
https://www.cnblogs.com/williamjie/p/9267741.html
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!