zoukankan      html  css  js  c++  java
  • Java自定义连接池

    基本原理

    连接池的作用

    • 复用连接,节省新建关闭连接的时间开销
    • 多连接,前后业务避免阻塞

    连接池的应用场景

    • TCP连接

    连接池核心元素

    • 初始化连接池大小,建议为CPU核数
    • 最大连接池大小,建议为CPU核数*2
    • 多线程避免同步锁的应用,锁的使用影响多线程的速度

    代码

    连接池

    package net.sf.hservice_qrcode.secretkey.pool;
    
    import java.util.Enumeration;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.Vector;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import sf.hservice_qrcode.secretkey.tcp.client.TcpClientProxy;
    
    public class ConnPoolImpl implements ConnPool {
    	protected transient final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    	// --------------------- 连接池 -----------------
    	private int initialSize = 2; // 连接池的初始大小
    	private int maxSize = 10; // 连接池最大的大小
    	private int incrSize = 1; // 连接池自动增加的大小
    	private long maxIdleTime = 30; // 最大空闲时间,超时释放连接,单位:秒
    	private boolean autoExpanse = false; // 启动自动扩容
    	
    	// ------------------- 扩容策略 -----------------
    	private int perRoundSeconds = 30; // 每轮100毫秒
    	private int expansionTimeout = 100 * 4; // 扩容超时,单位: 毫秒
    	private int totalWaitTimeout = 1000 * 5; // 总台等待时间,单位:毫秒
    
    	private ConnParam connParam;
    
    	private boolean inited = false;
    
    	/**
    	 * 预准备连接
    	 */
    	private Vector<PooledConn> prepareConnections;
    
    	/**
    	 * 已登录连接
    	 */
    	private Vector<PooledConn> loginedConnections;
    
    	/**
    	 * 空闲连接队列
    	 */
    	private Queue<PooledConn> freeConnQueue = new LinkedList<PooledConn>();
    	private Lock freeConnQueueLock = new ReentrantLock();
    	/**
    	 * 已登录连接检查 定时器
    	 */
    	private Timer timerPool_LoginedConnectionCheck;
    	private long timerPool_LoginedConnectionCheck_period = 100; // 单位:毫秒
    	/**
    	 * 长时间空闲释放 定时器
    	 */
    	private Timer timerPool_LongTimeFreeConnectionCheck;
    	private long timerPool_LongTimeFreeConnectionCheck_period = 100; // 单位:毫秒
    
    	/**
    	 * 构造函数
    	 */
    	public ConnPoolImpl(ConnParam connectParam,ConnPoolParam connPoolParam) {
    		connParam = connectParam;
    		
    		this.initialSize = connPoolParam.getInitialSize();
    		this.maxSize = connPoolParam.getMaxSize();
    		this.incrSize = connPoolParam.getIncrSize();
    		this.maxIdleTime = connPoolParam.getMaxIdleTime();
    		this.autoExpanse = connPoolParam.isAutoExpanse();
    		
    		logger.info(String.format("当前处理器核数: %d" , Runtime.getRuntime().availableProcessors()));
    		logger.info(String.format("连接池参数: %s, 连接参数: %s", connPoolParam.toString(), connParam.toString()));
    		
    	}
    
    	/**
    	 * 创建连接池
    	 */
    	@Override
    	public void createPool() {
    		if (inited) {
    			logger.warn("连接池已经初始化,请勿多次初始化");
    			return;
    		}
    		this.prepareConnections = new Vector<PooledConn>();
    		this.loginedConnections = new Vector<PooledConn>();
    
    		// 创建初始连接
    		createPrepareConn(this.initialSize);
    
    		// -------------- 登录连接检测 ---------
    		this.timerPool_LoginedConnectionCheck = new Timer();
    
    		long firstTime = 1000 * 5; // 延时30秒启动
    
    		timerPool_LoginedConnectionCheck.schedule(new TimerTask() {
    			@Override
    			public void run() {
    				loginedConnectionsCheck_ThreadRun();
    			}
    		}, firstTime, timerPool_LoginedConnectionCheck_period);
    
    		// -------------- 闲置连接检测 ---------
    		timerPool_LongTimeFreeConnectionCheck = new Timer();
    		firstTime = 1000 * 5; // 延时30秒启动
    		timerPool_LongTimeFreeConnectionCheck.schedule(new TimerTask() {
    
    			@Override
    			public void run() {
    				// 释放空闲连接
    				longTimeFreeConnectionCheck_ThreadRun();
    			}
    		}, firstTime, this.timerPool_LongTimeFreeConnectionCheck_period);
    
    		inited = true;
    	}
    
    	/**
    	 * 创建连接
    	 * 
    	 * @param count
    	 */
    	private synchronized void createPrepareConn(int count) {
    		for (int x = 0; x < count; x++) {
    			if (this.prepareConnections.size() >= this.maxSize) {
    				logger.info("[pool]---> 连接池中连接数已到达极限[" + this.maxSize + "],不再扩容");
    				break;
    			}
    
    			TcpClientProxy conn = newConn();
    			PooledConn pooledConn = new PooledConn(conn);
    			pooledConn.setBusy(false);
    			pooledConn.setPooled(false);
    			pooledConn.setQueue(false);
    			pooledConn.setLogined(false);
    
    			this.prepareConnections.addElement(pooledConn);
    
    			PoolReturnValue poolReturnValue = getPoolListString(this.prepareConnections);
    
    			logger.info("[pool]---> 预分配连接池新增连接,当前连接池中连接个数=" + poolReturnValue.getIntVal() + ", 当前连接池["
    					+ poolReturnValue.getStrVal() + "]");
    
    		}
    	}
    
    	/**
    	 * 创建一个新的连接并返回
    	 * 
    	 * @return
    	 */
    	private synchronized TcpClientProxy newConn() {
    		TcpClientProxy conn = null;
    		String clientName = String.format("%s-%02d", connParam.getClientName(), this.prepareConnections.size());
    
    		conn = new TcpClientProxy();
    		conn.SetParam(clientName, connParam.getHost(), connParam.getPort(), connParam.getTermNo(),
    				connParam.getTermMasterKey(), connParam.getConnet_timeout(), connParam.getRecv_timeout());
    		conn.start();
    
    		return conn; // 返回创建的新的数据库连接
    	}
    
    	/**
    	 * 返回一个可用连接
    	 * 
    	 * @return
    	 */
    	@Override
    	public PooledConn getConnection() {
    		boolean bNeedNewConn = false;
    		boolean isExpansed = false;
    
    		// 确保连接池己被创建
    		if (!inited) {
    			logger.warn("连接池未初始化完成,返回可用连接null");
    			return null; // 连接池还没创建,则返回 null
    		}
    		PooledConn conn = getFreeConn(); // 获得一个可用的数据库连接如果目前没有可以使用的连接,即所有的连接都在使用中
    
    		int tryCount = 0;
    
    		while (conn == null) {
    			// ------------- 自动扩容 ---------------
    			if (this.autoExpanse && !isExpansed) {
    				// 等待时间大于扩容超时,进行扩容
    				if (perRoundSeconds * tryCount > expansionTimeout) {
    					bNeedNewConn = true;
    
    					if (bNeedNewConn) {
    						logger.info(String.format("[pool]---> 等待超过%dms, 触发连接池扩容策略,进行扩容", perRoundSeconds * tryCount));
    						expansePool();
    						isExpansed = true;
    					}
    				}
    			}
    
    //			// 等待时间大于最大等待时间,返回失败
    //			if (perRoundSeconds * tryCount > totalWaitTimeout) {
    //				logger.info(String.format("[pool]---> 等待%ds,长时间未分到可用的连接,连接分配失败返回连接null",
    //						perRoundSeconds * tryCount / 1000));
    //				break;
    //			}
    
    			tryCount++;
    			wait(perRoundSeconds);
    			conn = getFreeConn(); // 重新再试,直到获得可用的连接,如果则表明创建一批连接后也不可获得可用连接
    			if (conn != null) {
    				logger.info(String.format("[pool]---> 等待%dms, 连接分配成功", perRoundSeconds * tryCount));
    				break;
    			} else {
    				logger.debug(String.format("[pool]---> 等待%dms未分配到可用连接,继续等待...", perRoundSeconds * tryCount));
    			}
    		}
    
    		return conn;// 返回获得的可用的连接
    	}
    
    	private void expansePool() {
    		// 如果目前连接池中没有可用的连接 创建一些连接
    		createPrepareConn(this.incrSize);
    	}
    
    	/**
    	 * 返回可用的连接
    	 * 
    	 * @return
    	 */
    	private synchronized PooledConn getFreeConn() {
    		PooledConn pConn = null;
    		pConn = freeConnQueue.poll();
    		return pConn;// 返回找到到的可用连接
    	}
    
    	@Override
    	public synchronized void closeConnection(PooledConn pooledConn) {
    		freeConnQueue.offer(pooledConn);
    	}
    
    	/**
    	 * 关闭连接
    	 * 
    	 * @param conn
    	 */
    	private void closeConn(TcpClientProxy conn) {
    		conn.close();
    	}
    
    	/**
    	 * 等待 单位 毫秒
    	 * 
    	 * @param mSeconds
    	 */
    	private void wait(int mSeconds) {
    		try {
    			Thread.sleep(mSeconds);
    		} catch (InterruptedException e) {
    		}
    	}
    
    	/**
    	 * 释放空闲连接
    	 * 
    	 */
    	private void longTimeFreeConnectionCheck_ThreadRun() {
    		// 确保连接池己创新存在
    		PooledConn pConn = null;
    		boolean bFind = false;
    
    		Enumeration enumerate = this.loginedConnections.elements();
    		while (enumerate.hasMoreElements()) {
    			if (this.loginedConnections.size() <= this.initialSize) {
    				break;
    			}
    			pConn = (PooledConn) enumerate.nextElement();
    			long intIdleSeconds = pConn.getIdleSeconds();
    			// 大于最大空闲时间
    			if (intIdleSeconds > this.maxIdleTime * 1000) {
    				// 从连接池向量中删除它
    				this.loginedConnections.removeElement(pConn);
    				logger.info("[pool]---> 已登录连接池移除连接[" + pConn.getConn().getClientName() + "]");
    
    				this.prepareConnections.remove(pConn);
    				logger.info("[pool]---> 预分配连接池移除连接[" + pConn.getConn().getClientName() + "]");
    
    				closeConn(pConn.getConn());
    
    				bFind = true;
    				PoolReturnValue poolReturnValue = getPoolListString(this.loginedConnections);
    
    				logger.info("[pool]---> 超过最大空闲时间" + intIdleSeconds / 1000 + "s释放空闲连接[" + pConn.getConn().getClientName()
    						+ "],释放后连接个数=" + poolReturnValue.getIntVal() + ", 当前连接池[" + poolReturnValue.getStrVal() + "]");
    			}
    		}
    
    		// ------------ 存在释放闲置连接的,才需要重新整理队列 --------------
    		if (bFind) {
    			freeConnQueueLock.lock();
    			freeConnQueue.clear();
    			enumerate = this.loginedConnections.elements();
    			while (enumerate.hasMoreElements()) {
    				pConn = (PooledConn) enumerate.nextElement();
    				freeConnQueue.offer(pConn);
    			}
    
    			logger.info("[pool]--<queue>---> 空闲连接队列重新排列,队列中连接个数:" + freeConnQueue.size() + ", 当前空闲连接队列["
    					+ getFreeQueueListString(freeConnQueue) + "]");
    			freeConnQueueLock.unlock();
    		}
    	}
    
    	private void freeQueue_append(PooledConn pConn) {
    		this.freeConnQueue.offer(pConn);
    
    		logger.info("[pool]--<queue>---> 空闲连接队列,连接个数:" + freeConnQueue.size() + ", 队列["
    				+ getFreeQueueListString(this.freeConnQueue) + "]");
    	}
    
    	private void loginedConnectionsCheck_ThreadRun() {
    		// 签到完成的连接,加入到已登录连接池
    		Enumeration enumerate = this.prepareConnections.elements();
    		while (enumerate.hasMoreElements()) {
    			PooledConn pConn = (PooledConn) enumerate.nextElement();
    			if (pConn.isPooled() == false) {
    				while (pConn.getConn().getLoginFlag() != 2) {
    					wait(300);
    				}
    
    				this.loginedConnections.add(pConn);
    				pConn.setPooled(true);
    
    				PoolReturnValue poolReturnValue = getPoolListString(this.loginedConnections);
    				logger.info("[pool]---> 连接池中增加一个新的连接,当前连接数量=" + poolReturnValue.getIntVal() + ", 当前连接池["
    						+ poolReturnValue.getStrVal() + "]");
    
    				freeQueue_append(pConn); // 加入队列
    			}
    		}
    	}
    
    	private PoolReturnValue getPoolListString(Vector<PooledConn> connections) {
    		PoolReturnValue ret = new PoolReturnValue();
    		int count = 0;
    
    		String stroutString = "";
    		Enumeration enumerate = connections.elements();
    		while (enumerate.hasMoreElements()) {
    			PooledConn pConn = (PooledConn) enumerate.nextElement();
    			stroutString += "|" + pConn.getConn().getClientName();
    			count++;
    		}
    
    		ret.setStrVal(stroutString);
    		ret.setIntVal(count);
    		return ret;
    	}
    
    	private String getFreeQueueListString(Queue<PooledConn> freeConnQueueC) {
    		String stroutString = "";
    
    		LinkedList linkedList = (LinkedList) freeConnQueueC;
    		for (Iterator iterator = linkedList.iterator(); iterator.hasNext();) {
    			PooledConn pConn = (PooledConn) iterator.next();
    			stroutString += "|" + pConn.getConn().getClientName();
    		}
    
    		return stroutString;
    	}
    
    }
    

    连接

    package net.sf.hservice_qrcode.secretkey.pool;
    
    import java.util.Date;
    
    import net.sf.hservice_qrcode.secretkey.tcp.client.TcpClientProxy;
    
    /**
     * 
     * 内部使用的用于保存连接池中连接对象的类 此类中有两个成员,一个是数据库的连接,另一个是指示此连接是否 正在使用的标志。
     */
    public class PooledConn {
    
    	private TcpClientProxy conn = null;// 连接
    	private boolean busy = false; // 此连接是否正在使用的标志,默认没有正在使用
    
    	private Date lastActiveDate;
    	private boolean isQueue;
    	private boolean logined;
    	private boolean pooled;
    	
    	public boolean isPooled() {
    		return pooled;
    	}
    
    	public void setPooled(boolean pooled) {
    		this.pooled = pooled;
    	}
    
    	public boolean isQueue() {
    		return isQueue;
    	}
    
    	public void setQueue(boolean isQueue) {
    		this.isQueue = isQueue;
    	}
    
    	public Date getLastActiveDate() {
    		return lastActiveDate;
    	}
    
    	public void setLastActiveDate(Date lastActiveDate) {
    		this.lastActiveDate = lastActiveDate;
    	}
    
    	/**
    	 * 空闲时间,单位:秒
    	 */
    	public long getIdleSeconds() {
    		long idleSeconds = 0;
    		long curTime = new Date().getTime();
    		idleSeconds = curTime - this.lastActiveDate.getTime();
    
    		return idleSeconds;
    	}
    
    	// 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
    	public PooledConn(TcpClientProxy connection) {
    		this.conn = connection;
    		lastActiveDate = new Date();
    		this.isQueue = false;
    	}
    
    	public TcpClientProxy getConn() {
    		return conn;
    	}
    
    	public void setConn(TcpClientProxy conn) {
    		this.conn = conn;
    	}
    
    	// 获得对象连接是否忙
    	public boolean isBusy() {
    		return busy;
    	}
    
    	// 设置对象的连接正在忙
    	public void setBusy(boolean busy) {
    		this.busy = busy;
    		lastActiveDate = new Date();
    	}
    
    	public boolean isLogined() {
    		return logined;
    	}
    
    	public void setLogined(boolean logined) {
    		this.logined = logined;
    	}
    }
    
  • 相关阅读:
    【LCT维护基环内向树森林】BZOJ4764 弹飞大爷
    【LCT】BZOJ3091 城市旅行
    【LCT+主席树】BZOJ3514 Codechef MARCH14 GERALD07加强版
    【最大权闭合子图】bzoj4873 [Shoi2017]寿司餐厅
    【LCT】BZOJ2049 [SDOI2008]Cave 洞穴勘测
    【有上下界的网络流】ZOJ2341 Reactor Cooling(有上下界可行流)
    【费用流】BZOJ1061: [Noi2008]志愿者招募(这题超好)
    从输入url到页面加载的过程
    forEach和map的区别
    理解 JavaScript 对象原型、原型链如何工作、如何向 prototype 属性添加新的方法。
  • 原文地址:https://www.cnblogs.com/jiftle/p/14919715.html
Copyright © 2011-2022 走看看