zoukankan      html  css  js  c++  java
  • java数据库连接池_方便自己学习

    1、利用的等待/通知实现超时取连接

    package cn.enjoyedu.ch1.pool;
    
    import java.sql.Connection;
    import java.util.LinkedList;
    
    /**
     *类说明:连接池的实现
     */
    public class DBPool {
    
        /*容器用于存放连接*/
        private static LinkedList<Connection> pool = new LinkedList<Connection>();
    
        /*限制了连接池的大小*/
        public DBPool(int initialSize) {
            if (initialSize > 0) {
                for (int i = 0; i < initialSize; i++) {
                    //模拟拿连接
                    pool.addLast(SqlConnectImpl.fetchConnection());
                }
            }
        }
    
        /*释放连接,通知其他的等待连接的线程*/
        public void releaseConnection(Connection connection) {
            if (connection != null) {
                synchronized (pool){
                    pool.addLast(connection);
                    //通知其他等待连接的线程
                    pool.notifyAll();
                }
            }
        }
    
        /*获取*/
        // 在mills内无法获取到连接,将会返回null 1S
        public Connection fetchConnection(long mills) throws InterruptedException {
            synchronized (pool){
                //永不超时
                if(mills<=0){
                    while(pool.isEmpty()){
                        pool.wait();
                    }
                    return pool.removeFirst();
                }else{
                    /*超时时刻*/
                    long future = System.currentTimeMillis()+mills;
                    /*等待时长*/
                    long remaining = mills;
                    while(pool.isEmpty()&&remaining>0){
                        pool.wait(remaining);
                        /*唤醒一次,重新计算等待时长*/
                        remaining = future-System.currentTimeMillis();
                    }
                    Connection connection = null;
                    if(!pool.isEmpty()){
                        connection = pool.removeFirst();
                    }
                    return connection;
                }
            }
    
        }
    }
                

    测试

    package com.pool;
    
    import java.sql.Connection;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     *类说明:测试并发1000次从池拿连接
     */
    public class DBPoolTest {
        static DBPool pool  = new DBPool(10);
        // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
        static CountDownLatch end;
    
        public static void main(String[] args) throws Exception {
            // 线程数量
            int threadCount = 50;
            end = new CountDownLatch(threadCount);
            int count = 20;//每个线程的操作次数
            AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
            AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
            for (int i = 0; i < threadCount; i++) {
                Thread thread = new Thread(new Worker(count, got, notGot), 
                        "worker_"+i);
                thread.start();
            }
            end.await();// main线程在此处等待
            System.out.println("总共尝试了: " + (threadCount * count));
            System.out.println("拿到连接的次数:  " + got);
            System.out.println("没能连接的次数: " + notGot);
        }
    
        static class Worker implements Runnable {
            int           count;
            AtomicInteger got;
            AtomicInteger notGot;
    
            public Worker(int count, AtomicInteger got,
                                   AtomicInteger notGot) {
                this.count = count;
                this.got = got;
                this.notGot = notGot;
            }
    
            public void run() {
                while (count > 0) {
                    try {
                        // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                        // 分别统计连接获取的数量got和未获取到的数量notGot
                        Connection connection = pool.fetchConnection(1000);
                        if (connection != null) {
                            try {
                                connection.createStatement();
                                connection.commit();
                            } finally {
                                pool.releaseConnection(connection);
                                got.incrementAndGet();
                            }
                        } else {
                            notGot.incrementAndGet();
                            System.out.println(Thread.currentThread().getName()
                                    +"等待超时!");
                        }
                    } catch (Exception ex) {
                    } finally {
                        count--;
                    }
                }
                end.countDown();
            }
        }
    }

    结果

    2、利用信号量实现数据库连接池:

    注意可用连接useful无可用连接useless都要管理起来,因为你不能保证每个人都会先拿连接再释放连接,这是信号量的一个坑

    package com.semaphore;
    
    import java.sql.Connection;
    import java.util.LinkedList;
    import java.util.concurrent.Semaphore;
    
    /**
     *类说明:Semaphore实现数据库连接池
     */
    public class DBPoolSemaphore {
        
        private final static int POOL_SIZE = 10;
        //两个指示器,分别表示池子还有可用连接和已用连接
        private final Semaphore useful,useless;
        //存放数据库连接的容器
        private static LinkedList<Connection> pool = new LinkedList<Connection>();
        //初始化池
        static {
            for (int i = 0; i < POOL_SIZE; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
        public DBPoolSemaphore() {
            this.useful = new Semaphore(10);
            this.useless = new Semaphore(0);
        }
        
        /*归还连接*/
        public void returnConnect(Connection connection) throws InterruptedException {
            if(connection!=null) {
                System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
                        +"可用连接数:"+useful.availablePermits());
                useless.acquire();
                synchronized (pool) {
                    pool.addLast(connection);
                }
                useful.release();
            }
        }
        
        /*从池子拿连接*/
        public Connection takeConnect() throws InterruptedException {
            useful.acquire();
            Connection connection;
            synchronized (pool) {
                connection = pool.removeFirst();
            }
            useless.release();
            return connection;
        }
        
    }

    测试

    package com.semaphore;
    
    import java.sql.Connection;
    import java.util.Random;
    
    /**
     *类说明:测试数据库连接池
     */
    public class AppTest {
    
        private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
        
        private static class BusiThread extends Thread{
            @Override
            public void run() {
                Random r = new Random();//让每个线程持有连接的时间不一样
                long start = System.currentTimeMillis();
                try {
                    Connection connect = dbPool.takeConnect();
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms.");
                    SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据
                    System.out.println("查询数据完成,归还连接!");
                    dbPool.returnConnect(connect);
                } catch (InterruptedException e) {
                }
            }
        }
        
        public static void main(String[] args) {
            for (int i = 0; i < 50; i++) {
                Thread thread = new BusiThread();
                thread.start();
            }
        }
        
    }

    结果

     Semaphore信号量的一个坑,只管理可用连接,当开发者不按先取连接再释放连接的规则,就达不到流控的效果,

    package com.semaphore;
    
    import java.sql.Connection;
    import java.util.LinkedList;
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    
    /**
     *类说明:Semaphore实现数据库连接池
     */
    public class DBPoolNoUseless {
    
        private final static int POOL_SIZE = 10;
        private final Semaphore useful;
        //存放数据库连接的容器
        private static LinkedList<Connection> pool = new LinkedList<Connection>();
        //初始化池
        static {
            for (int i = 0; i < POOL_SIZE; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
        public DBPoolNoUseless() {
            this.useful = new Semaphore(10);
        }
        
        /*归还连接*/
        public void returnConnect(Connection connection) throws InterruptedException {
            if(connection!=null) {
                System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
                        +"可用连接数:"+useful.availablePermits());
                synchronized (pool) {
                    pool.addLast(connection);
                }
                useful.release();
            }
        }
        
        /*从池子拿连接*/
        public Connection takeConnect() throws InterruptedException {
            useful.acquire();
            Connection connection;
            synchronized (pool) {
                connection = pool.removeFirst();
            }
            return connection;
        }
    
        private static DBPoolNoUseless dbPoolNoUseless = new DBPoolNoUseless();
    
        private static class BusiThread extends Thread{
            @Override
            public void run() {
                Random r = new Random();//让每个线程持有连接的时间不一样
                long start = System.currentTimeMillis();
                try {
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms.");
                    SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据
                    System.out.println("查询数据完成,归还连接!");
                    dbPoolNoUseless.returnConnect(new SqlConnectImpl());
                } catch (InterruptedException e) {
                }
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 50; i++) {
                Thread thread = new BusiThread();
                thread.start();
            }
        }
        
    }

     模拟拿连接的类:

    package cn.enjoyedu.ch1.pool;
    
    import cn.enjoyedu.tools.SleepTools;
    
    import java.sql.*;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.Executor;
    
    /**
     *类说明:
     */
    public class SqlConnectImpl implements Connection{
        
        /*拿一个数据库连接*/
        public static final Connection fetchConnection(){
            return new SqlConnectImpl();
        }
    
        //忽略其它    
    }
    作者:howtosay
             
    放牛娃的个人笔记整理,每天记录一点点,进步一点点
  • 相关阅读:
    2020/4/29 一场令人头疼的cf。。。
    2020/4/27 日常补坑-tarjan第一道awa
    2020/4/26 2-sat 学习笔记
    SHADEILS--Success History based Adaptive Differential Evolution with Iterative Local Search
    再探动态规划——lettcode689三个无重叠数组的最大和
    学算法——gradient descent
    学算法——particle swarm optimization
    读论文——A study on Artificial Potential Fields
    install libspatialindex on macOS
    Supervised Learning003
  • 原文地址:https://www.cnblogs.com/hongzm/p/11267313.html
Copyright © 2011-2022 走看看