go-redis连接池pool源码分析 - Go语言中文社区

go-redis连接池pool源码分析


首先看看Pooler接口申明有哪些方法

分成四大类:1、建立连接和关闭连接 2、池子里面取Conn的管理 3、监控统计 4、整个Pooler池子的关闭
 

type Pooler interface {
	NewConn() (*Conn, error)
	CloseConn(*Conn) error

	Get() (*Conn, error)
	Put(*Conn)
	Remove(*Conn)

	Len() int
	IdleLen() int
	Stats() *Stats

	Close() error
}

再来看看实现整个接口的具体结构类型

type ConnPool struct {
	opt *Options          //初始化的配置项

	dialErrorsNum uint32 // atomic 连接错误次数
 
	lastDialError   error  //连接错误的最后一次的错误类型
	lastDialErrorMu sync.RWMutex

	queue chan struct{}   //池子里面空闲的conn的同步channel

	connsMu sync.Mutex    
	conns   []*Conn       //活跃的active conns

	idleConnsMu sync.RWMutex
	idleConns   []*Conn   //空闲的idle conns

	stats Stats

	_closed uint32 // atomic  //池子是否关闭标签
}

看看pool初始化的过程

var _ Pooler = (*ConnPool)(nil) //接口检查

func NewConnPool(opt *Options) *ConnPool {
	p := &ConnPool{
		opt: opt,

		queue:     make(chan struct{}, opt.PoolSize), //同步用的
		conns:     make([]*Conn, 0, opt.PoolSize),
		idleConns: make([]*Conn, 0, opt.PoolSize),
	}

	if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
		go p.reaper(opt.IdleCheckFrequency) //定时任务,清理过期的conn
	}

	return p
}

//reaper字面意思为收割者,为清理的意思
func (p *ConnPool) reaper(frequency time.Duration) {
	ticker := time.NewTicker(frequency)
	defer ticker.Stop()

	for range ticker.C {
		if p.closed() {
			break
		}
        //定时清理无用的conns
		n, err := p.ReapStaleConns()
		if err != nil {
			internal.Logf("ReapStaleConns failed: %s", err)
			continue
		}
		atomic.AddUint32(&p.stats.StaleConns, uint32(n))
	}
}


func (p *ConnPool) ReapStaleConns() (int, error) {
	var n int
	for {
        //往channel里面写入一个,表示占用一个任务
		p.getTurn()

		p.idleConnsMu.Lock()
		cn := p.reapStaleConn()
		p.idleConnsMu.Unlock()

		if cn != nil {
			p.removeConn(cn)
		}
        
        //处理完了,释放占用的channel的位置
		p.freeTurn()

		if cn != nil {
			p.closeConn(cn)
			n++
		} else {
			break
		}
	}
	return n, nil
}


func (p *ConnPool) reapStaleConn() *Conn {
	if len(p.idleConns) == 0 {
		return nil
	}
    
    //取第一个空闲conn
	cn := p.idleConns[0]
    //判断是否超时没有人用
	if !cn.IsStale(p.opt.IdleTimeout) {
		return nil
	}
    
    //超时没有人用则从空闲列表里面移除 
	p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)

	return cn
}

//判断是否超时的处理
func (cn *Conn) IsStale(timeout time.Duration) bool {
	return timeout > 0 && time.Since(cn.UsedAt()) > timeout
}

//移除连接就是一个很简单的遍历
func (p *ConnPool) removeConn(cn *Conn) {
	p.connsMu.Lock()
	for i, c := range p.conns {
		if c == cn {
			p.conns = append(p.conns[:i], p.conns[i+1:]...)
			break
		}
	}
	p.connsMu.Unlock()
}

再看怎么创建一个新的连接

func (p *ConnPool) NewConn() (*Conn, error) {
	cn, err := p.newConn()
	if err != nil {
		return nil, err
	}

	p.connsMu.Lock()
    //创建好后加入conns中
	p.conns = append(p.conns, cn)
	p.connsMu.Unlock()
	return cn, nil
}

func (p *ConnPool) newConn() (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}
    //判断是否一直出错
	if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
		return nil, p.getLastDialError()
	}

	netConn, err := p.opt.Dialer()
	if err != nil {
		p.setLastDialError(err)
        //dialer出错到一定程度需要tryDial就是一个循环retry的dial
		if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
			go p.tryDial()
		}
		return nil, err
	}

	return NewConn(netConn), nil
}


func (p *ConnPool) tryDial() {
	for {
		if p.closed() {
			return
		}
        //就是不断的dailer直到成功
		conn, err := p.opt.Dialer()
		if err != nil {
			p.setLastDialError(err)
			time.Sleep(time.Second)
			continue
		}
        //dialer成功,则设置dialErrorsNum为0
		atomic.StoreUint32(&p.dialErrorsNum, 0)
		_ = conn.Close()
		return
	}
}

看看最重要的,怎么从pool里面Get出来一个conn

func (p *ConnPool) Get() (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}
    
    //等待有空闲,通过channel方式同步,如果池子里面满了,则会阻塞等待,但是不会一直阻塞,有一个超时机制 
	err := p.waitTurn()
	if err != nil {
		return nil, err
	}

	for {
		p.idleConnsMu.Lock()
		cn := p.popIdle()
		p.idleConnsMu.Unlock()

		if cn == nil {
			break
		}
        //取出空闲的,并判断conn是否过期
		if cn.IsStale(p.opt.IdleTimeout) {
			p.CloseConn(cn)
			continue
		}
     
		atomic.AddUint32(&p.stats.Hits, 1)
		return cn, nil
	}
    
    //没有取到conn,miss了
	atomic.AddUint32(&p.stats.Misses, 1)
    //创建新的连接
	newcn, err := p.NewConn()
	if err != nil {
		p.freeTurn()
		return nil, err
	}

	return newcn, nil
}

//p.queue是Get方法成功的时候就往channel里面写,Put方法成功就往channel里面读,释放出来位置
func (p *ConnPool) waitTurn() error {
	select {
	case p.queue <- struct{}{}:
		return nil
	default:
		timer := timers.Get().(*time.Timer)
		timer.Reset(p.opt.PoolTimeout)
     
		select {
		case p.queue <- struct{}{}:
			if !timer.Stop() {
                //临界点处理:防止此时正好有ticker写入了timer.C,但是select到了这一个case里面
				<-timer.C
			}
			timers.Put(timer)
			return nil
		case <-timer.C:
             //超时写入机制
			timers.Put(timer)
			atomic.AddUint32(&p.stats.Timeouts, 1)
			return ErrPoolTimeout
		}
	}
}

看懂了Get方法,对偶的Put方法就应该非常容易理解:

func (p *ConnPool) Put(cn *Conn) {
    //buffer的设置
	buf := cn.Rd.PeekBuffered()
	if buf != nil {
		internal.Logf("connection has unread data: %.100q", buf)
		p.Remove(cn)
		return
	}

	p.idleConnsMu.Lock()
    //放入空闲conns里面来,方便Get的时候取出来
	p.idleConns = append(p.idleConns, cn)
	p.idleConnsMu.Unlock()
    //Get成功时候是写入channel,这个时候自然是释放channel
	p.freeTurn()
}

 

转载于:https://my.oschina.net/yang1992/blog/1864245

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_33785972/article/details/92032671
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-02 19:10:13
  • 阅读 ( 1551 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢