1、利用的等待/通知实现超时取连接
package cn.enjoyedu.ch1.pool; import java.sql.Connection; import java.util.LinkedList; /** *类说明:连接池的实现 */ public class DBPool { /*容器用于存放连接*/ private static LinkedList<Connection> pool = new LinkedList<Connection>(); /*限制了连接池的大小*/ public DBPool(int initialSize) { if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { //模拟拿连接 pool.addLast(SqlConnectImpl.fetchConnection()); } } } /*释放连接,通知其他的等待连接的线程*/ public void releaseConnection(Connection connection) { if (connection != null) { synchronized (pool){ pool.addLast(connection); //通知其他等待连接的线程 pool.notifyAll(); } } } /*获取*/ // 在mills内无法获取到连接,将会返回null 1S public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool){ //永不超时 if(mills<=0){ while(pool.isEmpty()){ pool.wait(); } return pool.removeFirst(); }else{ /*超时时刻*/ long future = System.currentTimeMillis()+mills; /*等待时长*/ long remaining = mills; while(pool.isEmpty()&&remaining>0){ pool.wait(remaining); /*唤醒一次,重新计算等待时长*/ remaining = future-System.currentTimeMillis(); } Connection connection = null; if(!pool.isEmpty()){ connection = pool.removeFirst(); } return connection; } } } }
测试
package com.pool; import java.sql.Connection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** *类说明:测试并发1000次从池拿连接 */ public class DBPoolTest { static DBPool pool = new DBPool(10); // 控制器:控制main线程将会等待所有Woker结束后才能继续执行 static CountDownLatch end; public static void main(String[] args) throws Exception { // 线程数量 int threadCount = 50; end = new CountDownLatch(threadCount); int count = 20;//每个线程的操作次数 AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程 AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程 for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new Worker(count, got, notGot), "worker_"+i); thread.start(); } end.await();// main线程在此处等待 System.out.println("总共尝试了: " + (threadCount * count)); System.out.println("拿到连接的次数: " + got); System.out.println("没能连接的次数: " + notGot); } static class Worker implements Runnable { int count; AtomicInteger got; AtomicInteger notGot; public Worker(int count, AtomicInteger got, AtomicInteger notGot) { this.count = count; this.got = got; this.notGot = notGot; } public void run() { while (count > 0) { try { // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null // 分别统计连接获取的数量got和未获取到的数量notGot Connection connection = pool.fetchConnection(1000); if (connection != null) { try { connection.createStatement(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } } else { notGot.incrementAndGet(); System.out.println(Thread.currentThread().getName() +"等待超时!"); } } catch (Exception ex) { } finally { count--; } } end.countDown(); } } }
结果
2、利用信号量实现数据库连接池:
注意可用连接useful和无可用连接useless都要管理起来,因为你不能保证每个人都会先拿连接再释放连接,这是信号量的一个坑
package com.semaphore; import java.sql.Connection; import java.util.LinkedList; import java.util.concurrent.Semaphore; /** *类说明:Semaphore实现数据库连接池 */ public class DBPoolSemaphore { private final static int POOL_SIZE = 10; //两个指示器,分别表示池子还有可用连接和已用连接 private final Semaphore useful,useless; //存放数据库连接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } public DBPoolSemaphore() { this.useful = new Semaphore(10); this.useless = new Semaphore(0); } /*归还连接*/ public void returnConnect(Connection connection) throws InterruptedException { if(connection!=null) { System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!" +"可用连接数:"+useful.availablePermits()); useless.acquire(); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /*从池子拿连接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection connection; synchronized (pool) { connection = pool.removeFirst(); } useless.release(); return connection; } }
测试
package com.semaphore; import java.sql.Connection; import java.util.Random; /** *类说明:测试数据库连接池 */ public class AppTest { private static DBPoolSemaphore dbPool = new DBPoolSemaphore(); private static class BusiThread extends Thread{ @Override public void run() { Random r = new Random();//让每个线程持有连接的时间不一样 long start = System.currentTimeMillis(); try { Connection connect = dbPool.takeConnect(); System.out.println("Thread_"+Thread.currentThread().getId() +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms."); SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据 System.out.println("查询数据完成,归还连接!"); dbPool.returnConnect(connect); } catch (InterruptedException e) { } } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { Thread thread = new BusiThread(); thread.start(); } } }
结果
Semaphore信号量的一个坑,只管理可用连接,当开发者不按先取连接再释放连接的规则,就达不到流控的效果,
package com.semaphore; import java.sql.Connection; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.Semaphore; /** *类说明:Semaphore实现数据库连接池 */ public class DBPoolNoUseless { private final static int POOL_SIZE = 10; private final Semaphore useful; //存放数据库连接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } public DBPoolNoUseless() { this.useful = new Semaphore(10); } /*归还连接*/ public void returnConnect(Connection connection) throws InterruptedException { if(connection!=null) { System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!" +"可用连接数:"+useful.availablePermits()); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /*从池子拿连接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection connection; synchronized (pool) { connection = pool.removeFirst(); } return connection; } private static DBPoolNoUseless dbPoolNoUseless = new DBPoolNoUseless(); private static class BusiThread extends Thread{ @Override public void run() { Random r = new Random();//让每个线程持有连接的时间不一样 long start = System.currentTimeMillis(); try { System.out.println("Thread_"+Thread.currentThread().getId() +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms."); SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据 System.out.println("查询数据完成,归还连接!"); dbPoolNoUseless.returnConnect(new SqlConnectImpl()); } catch (InterruptedException e) { } } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { Thread thread = new BusiThread(); thread.start(); } } }
模拟拿连接的类:
package cn.enjoyedu.ch1.pool; import cn.enjoyedu.tools.SleepTools; import java.sql.*; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; /** *类说明: */ public class SqlConnectImpl implements Connection{ /*拿一个数据库连接*/ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } //忽略其它 }