zoukankan      html  css  js  c++  java
  • 连接池的简单实现

    连接池要求

    1. 资源限制,包括最小连接数,最大连接上限,以及等待队列的长度
    2. 动态伸缩连接池。在连接池不够用时申请新的连接,否则释放空闲连接
    3. 连接保活机制,需要周期检测当前池中连接是否正常,对应采取重连|剔除的动作

    简单实现

    实现

    1. 连接池最大容量限制,最小连接限制
    2. 连接池动态伸缩(可以继续优化)

    问题

    1. 没有有效的手段检测连接状态,仅用设置连接有效期,过期重连机制(问题:net.Conn 客户端怎么保持长连接)
    2. 伸缩机制不友好,仅在用户使用时触发伸缩动作。
    package hbase
    
    import (
    	"errors"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    type BaseConnection struct {
    	Busy      bool             // it's this connection in use
    	Valid     bool             // it's this connection can be use
    	AccessAt  time.Time        // last access time
    	HbaseConn *HbaseConnection // real hbase client
    	Pool      *ConnectionPool  // the pool it belong to
    }
    
    type ConnectionPool struct {
    	sync.Mutex
    	Address     string               // host:ip
    	MaxConn     int                  // max pool size
    	CoreConn    int                  // min pool size
    	MaxWait     int32                // max wait count
    	WaitTimeout time.Duration        // max wait timeout, return 'nil,error' when get timeout
    	ConnExpire  time.Duration        // connection will be destroy when expire
    	Usecount    int                  // num of used connection
    	IdleCount   int                  // num of idle connection that can be use
    	WaitCount   int32                // num of goroutines wait for connection
    	WaitChan    chan *BaseConnection // when a connection is release, and some other goroutine if wait, this chan will be used
    	Connections []*BaseConnection    // connections
    }
    
    type PoolConfig struct {
    	MaxPool int           // max pool size
    	MinPool int           // min pool size
    	MaxWait int32         // max wait goroutine num
    	Timeout time.Duration // max wait time
    	Expire  time.Duration // max idle time of connection
    	Address string
    }
    
    func NewConnectionPool(config PoolConfig) *ConnectionPool {
    	pool := &ConnectionPool{
    		Address:     config.Address,
    		MaxConn:     config.MaxPool,
    		CoreConn:    config.MinPool,
    		MaxWait:     config.MaxWait,
    		WaitTimeout: config.Timeout,
    		ConnExpire:  config.Expire,
    		Usecount:    0,
    		IdleCount:   0,
    		WaitCount:   0,
    		WaitChan:    make(chan *BaseConnection, config.MaxWait),
    	}
    
    	for idx := 0; idx < pool.CoreConn; idx++ {
    		conn := newConnection(pool.Address)
    		if conn != nil {
    			pool.IdleCount++
    			pool.Connections = append(pool.Connections, conn)
    		}
    	}
    
    	return pool
    }
    
    func newConnection(addr string) *BaseConnection {
    	conn, err := hbaseconnect(addr)
    	if err != nil {
    		return &BaseConnection{
    			Busy:      false,
    			Valid:     true,
    			AccessAt:  time.Now(),
    			HbaseConn: conn,
    		}
    	}
    
    	return nil
    }
    
    func closeConnection(conn *BaseConnection) {
    	if conn == nil {
    		return
    	}
    	hbaseclose(conn.HbaseConn)
    
    	conn.Valid = false
    }
    
    func (pool *ConnectionPool) TryGet() (conn *BaseConnection, err error) {
    	pool.Lock()
    	defer pool.Unlock()
    
    	if pool.IdleCount > 0 {
    		for _, conn = range pool.Connections {
    			if conn.Valid && !conn.Busy {
    				if time.Since(conn.AccessAt) > pool.ConnExpire {
    				        conn.AccessAt = time.Now()
    					err = hbasereconnect(conn.HbaseConn)
    					if err != nil {
    						continue
    					}
    				}
    
    				pool.Usecount++
    				pool.IdleCount--
    				conn.Busy = true
    
    				return conn, nil
    			}
    		}
    	}
    
    	if len(pool.Connections) < pool.MaxConn {
    		conn = newConnection(pool.Address)
    		if conn != nil {
    			pool.Usecount++
    			pool.Connections = append(pool.Connections, conn)
    			return conn, nil
    		}
    	}
    
    	return nil, errors.New("no avaliable connections")
    }
    
    func (pool *ConnectionPool) Get() (*BaseConnection, error) {
    	conn, _ := pool.TryGet()
    	if conn != nil {
    		return conn, nil
    	}
    
    	wc := atomic.LoadInt32(&pool.WaitCount)
    	if pool.MaxWait <= wc {
    		return nil, errors.New("error, too much wait routine")
    	}
    
    	// wait until timeout or get a connection
    	atomic.AddInt32(&pool.WaitCount, 1)
    	defer atomic.AddInt32(&pool.WaitCount, -1)
    
    	select {
    	case conn := <-pool.WaitChan:
    		return conn, nil
    	case <-time.After(time.Duration(time.Second * time.Duration(pool.WaitTimeout))):
    		return nil, errors.New("timeout")
    	}
    }
    
    // Fixme: the connection may be broken for some reason, we should remove it
    func (pool *ConnectionPool) Put(conn *BaseConnection) {
    	pool.Lock()
    	defer pool.Unlock()
    
    	conn.AccessAt = time.Now()
    	wc := atomic.LoadInt32(&pool.WaitCount)
    	if wc > 0 {
    		pool.WaitChan <- conn
    		return
    	}
    
    	pool.Usecount--
    	pool.IdleCount++
    	conn.Busy = false
    
    	// check whether any connection is expired
    	if len(pool.Connections) > pool.CoreConn {
    		for idx, con := range pool.Connections {
    			if time.Since(con.AccessAt) > pool.ConnExpire {
    				closeConnection(con)
    				pool.IdleCount--
    				pool.Connections = append(pool.Connections[:idx], pool.Connections[idx+1:]...)
    				break
    			}
    		}
    	}
    }
    
  • 相关阅读:
    在未排序的数组中找到第 k 个最大的元素
    区域和检索
    控制台画图程序(可更换笔刷版本)
    循环中的scanf处理了换行符怎么破
    strlen获取字符数组为什么是255
    宽字符输出中文,Devc++解决方法
    区间取最小值最大值-位值和
    模拟鼠标键盘-封装函数
    scanf("%d",a[i]+j)为什么不加取地址符号
    scanf需要多输入一行是什么问题
  • 原文地址:https://www.cnblogs.com/sinpo828/p/14688300.html
Copyright © 2011-2022 走看看