连接池要求
- 资源限制,包括最小连接数,最大连接上限,以及等待队列的长度
- 动态伸缩连接池。在连接池不够用时申请新的连接,否则释放空闲连接
- 连接保活机制,需要周期检测当前池中连接是否正常,对应采取重连|剔除的动作
简单实现
实现
- 连接池最大容量限制,最小连接限制
- 连接池动态伸缩(可以继续优化)
问题
- 没有有效的手段检测连接状态,仅用设置连接有效期,过期重连机制(问题:net.Conn 客户端怎么保持长连接)
- 伸缩机制不友好,仅在用户使用时触发伸缩动作。
package hbase
import (
"errors"
"sync"
"sync/atomic"
"time"
)
type BaseConnection struct {
Busy bool // it's this connection in use
Valid bool // it's this connection can be use
AccessAt time.Time // last access time
HbaseConn *HbaseConnection // real hbase client
Pool *ConnectionPool // the pool it belong to
}
type ConnectionPool struct {
sync.Mutex
Address string // host:ip
MaxConn int // max pool size
CoreConn int // min pool size
MaxWait int32 // max wait count
WaitTimeout time.Duration // max wait timeout, return 'nil,error' when get timeout
ConnExpire time.Duration // connection will be destroy when expire
Usecount int // num of used connection
IdleCount int // num of idle connection that can be use
WaitCount int32 // num of goroutines wait for connection
WaitChan chan *BaseConnection // when a connection is release, and some other goroutine if wait, this chan will be used
Connections []*BaseConnection // connections
}
type PoolConfig struct {
MaxPool int // max pool size
MinPool int // min pool size
MaxWait int32 // max wait goroutine num
Timeout time.Duration // max wait time
Expire time.Duration // max idle time of connection
Address string
}
func NewConnectionPool(config PoolConfig) *ConnectionPool {
pool := &ConnectionPool{
Address: config.Address,
MaxConn: config.MaxPool,
CoreConn: config.MinPool,
MaxWait: config.MaxWait,
WaitTimeout: config.Timeout,
ConnExpire: config.Expire,
Usecount: 0,
IdleCount: 0,
WaitCount: 0,
WaitChan: make(chan *BaseConnection, config.MaxWait),
}
for idx := 0; idx < pool.CoreConn; idx++ {
conn := newConnection(pool.Address)
if conn != nil {
pool.IdleCount++
pool.Connections = append(pool.Connections, conn)
}
}
return pool
}
func newConnection(addr string) *BaseConnection {
conn, err := hbaseconnect(addr)
if err != nil {
return &BaseConnection{
Busy: false,
Valid: true,
AccessAt: time.Now(),
HbaseConn: conn,
}
}
return nil
}
func closeConnection(conn *BaseConnection) {
if conn == nil {
return
}
hbaseclose(conn.HbaseConn)
conn.Valid = false
}
func (pool *ConnectionPool) TryGet() (conn *BaseConnection, err error) {
pool.Lock()
defer pool.Unlock()
if pool.IdleCount > 0 {
for _, conn = range pool.Connections {
if conn.Valid && !conn.Busy {
if time.Since(conn.AccessAt) > pool.ConnExpire {
conn.AccessAt = time.Now()
err = hbasereconnect(conn.HbaseConn)
if err != nil {
continue
}
}
pool.Usecount++
pool.IdleCount--
conn.Busy = true
return conn, nil
}
}
}
if len(pool.Connections) < pool.MaxConn {
conn = newConnection(pool.Address)
if conn != nil {
pool.Usecount++
pool.Connections = append(pool.Connections, conn)
return conn, nil
}
}
return nil, errors.New("no avaliable connections")
}
func (pool *ConnectionPool) Get() (*BaseConnection, error) {
conn, _ := pool.TryGet()
if conn != nil {
return conn, nil
}
wc := atomic.LoadInt32(&pool.WaitCount)
if pool.MaxWait <= wc {
return nil, errors.New("error, too much wait routine")
}
// wait until timeout or get a connection
atomic.AddInt32(&pool.WaitCount, 1)
defer atomic.AddInt32(&pool.WaitCount, -1)
select {
case conn := <-pool.WaitChan:
return conn, nil
case <-time.After(time.Duration(time.Second * time.Duration(pool.WaitTimeout))):
return nil, errors.New("timeout")
}
}
// Fixme: the connection may be broken for some reason, we should remove it
func (pool *ConnectionPool) Put(conn *BaseConnection) {
pool.Lock()
defer pool.Unlock()
conn.AccessAt = time.Now()
wc := atomic.LoadInt32(&pool.WaitCount)
if wc > 0 {
pool.WaitChan <- conn
return
}
pool.Usecount--
pool.IdleCount++
conn.Busy = false
// check whether any connection is expired
if len(pool.Connections) > pool.CoreConn {
for idx, con := range pool.Connections {
if time.Since(con.AccessAt) > pool.ConnExpire {
closeConnection(con)
pool.IdleCount--
pool.Connections = append(pool.Connections[:idx], pool.Connections[idx+1:]...)
break
}
}
}
}