zoukankan      html  css  js  c++  java
  • wait/notify模拟连接池

    连接池中的连接可重复使用,减少每次新建和烧毁连接对资源的消耗,但连接池的容量大小也要设置合理,否则也会占用多余的资源。连接池的基本功能是获取连接和释放连接

    连接在java中也是一个类,连接对象是一个普通java对象,连接池也是如此,本例使用Connection代表连接类,ConnectionPool代表连接池,主要用到的技术为wait/notify机制以及CountDownLatch的使用

    第一版的代码

    日志使用的为log4j2,配置内容直接拷贝官网内容,将日志级别改为了info,并去掉了一些不必要的打印

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="WARN">
      <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
          <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %msg%n"/>
        </Console>
      </Appenders>
      <Loggers>
        <Root level="Info">
          <AppenderRef ref="Console"/>
        </Root>
      </Loggers>
    </Configuration>
    View Code

    连接池ConnectionPool

    package com.demo.Multithreading.ConnectionPool;
    
    import java.util.LinkedList;
    
    public class ConnectionPool {
        // 存放连接
        private LinkedList<Connection> pool = new LinkedList<Connection>();
        // 连接池的初始容量
        private int capacity;
    
        public ConnectionPool() {
            capacity = 10;
            init();
        }
    
        public ConnectionPool(int capacity) {
            this.capacity = capacity;
            init();
        }
    
        /**
         * 初始化连接池
         */
        public void init() {
            for (int i = 0; i < capacity; i++) {
                pool.add(Connection.createConnection(i));
            }
        }
    
        /**
         * 获取连接,指定的时间内未获取到,则返回null
         * 
         * @param timeout
         *            等待超时时间
         * @return
         * @throws InterruptedException
         */
        public Connection getConnection(long timeout) throws InterruptedException {
            synchronized (pool) {
                if (pool.size() == 0) {
                    // 连接池为空,在指定时间内等待以获取连接
                    pool.wait(timeout);
                    // 收到其他线程notify,准备获取连接
                    if (pool.size() > 0) {
                        // 连接池中存在连接可获取
                        Connection removeFirst = pool.removeFirst();
                        return removeFirst;
                    }
                    // 未获取到连接,返回null
                    return null;
                } else {
                    // 连接池中存在连接,直接获取
                    Connection removeFirst = pool.removeFirst();
                    return removeFirst;
                }
            }
        }
    
        /**
         * 释放连接
         * 
         * @param con
         */
        public void releaseConnection(Connection con) {
            synchronized (pool) {
                if (con != null) {
                    // 在末尾添加连接
                    pool.addLast(con);
                    // 通知其他线程获取连接
                    pool.notifyAll();
                }
            }
        }
    }
    View Code

    连接Connection

    package com.demo.Multithreading.ConnectionPool;
    
    public class Connection {
    
        private int num;
    
        private Connection(int num) {
            this.num = num;
        }
    
        public static Connection createConnection(int num) {
            return new Connection(num);
        }
    
        @Override
        public String toString() {
            return "Connection [num=" + num + "]";
        }
    
    }
    View Code

    测试时线程类MyThread

    package com.demo.Multithreading.ConnectionPool.test;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.Multithreading.ConnectionPool.Connection;
    import com.demo.Multithreading.ConnectionPool.ConnectionPool;
    
    public class MyThread extends Thread {
        private Logger logger = LoggerFactory.getLogger(MyThread.class);
        private ConnectionPool pool;
        private CountDownLatch latch;
    
        public MyThread(String name, CountDownLatch latch, ConnectionPool pool) {
            this.pool = pool;
            this.latch = latch;
            this.setName(name);
        }
    
        @Override
        public void run() {
            Connection connection = null;
            try {
                connection = pool.getConnection(5000);
                Thread.sleep(2000);
                logger.info(Thread.currentThread().getName() + ": " + connection);
            } catch (InterruptedException e) {
                logger.error("", e);
            }
    
            latch.countDown();
        }
    
    }
    View Code

    测试类App

    package com.demo.Multithreading;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.Multithreading.ConnectionPool.ConnectionPool;
    import com.demo.Multithreading.ConnectionPool.test.MyThread;
    
    public class App {
        private static Logger logger = LoggerFactory.getLogger(App.class);
    
        public static void main(String[] args) {
            ConnectionPool cp = new ConnectionPool(3);
            int threadCount = 5;
            CountDownLatch cd = new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; i++) {
                MyThread th = new MyThread("mythread" + i, cd, cp);
                th.start();
            }
    
            try {
                cd.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("execute end.");
        }
    }
    View Code

    执行测试类中main方法,输出结果如下

    11:23:45.307 [mythread0] mythread0: Connection [num=0]
    11:23:45.307 [mythread2] mythread2: Connection [num=2]
    11:23:45.307 [mythread1] mythread1: Connection [num=1]
    11:23:50.293 [mythread3] mythread3: null
    11:23:50.294 [mythread4] mythread4: null
    11:23:50.294 [main] execute end.

    连接池中初始化3个连接,5个线程同时获取连接,有两个连接未获取到连接池,在MyThread中获取连接的最大时间为5秒,线程休眠2秒,可认为线程执行时间为2秒,既然如此,最开始获取到连接的三个线程在2秒左右,不超过3秒的时间内一定能执行结束,另外两条线程也能获取到连接,但真实输出结果是,后面两个线程未获取到连接,究其原因,还是因为没有释放连接。对于可复用资源,比如连接池,线程池等,也包括java io流,在使用之后一定要记得关闭。

    修改测试线程MyThread中run方法,释放连接

    package com.demo.Multithreading.ConnectionPool.test;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.Multithreading.ConnectionPool.Connection;
    import com.demo.Multithreading.ConnectionPool.ConnectionPool;
    
    public class MyThread extends Thread {
        private Logger logger = LoggerFactory.getLogger(MyThread.class);
        private ConnectionPool pool;
        private CountDownLatch latch;
    
        public MyThread(String name, CountDownLatch latch, ConnectionPool pool) {
            this.pool = pool;
            this.latch = latch;
            this.setName(name);
        }
    
        @Override
        public void run() {
            Connection connection = null;
            try {
                connection = pool.getConnection(5000);
                Thread.sleep(2000);
                logger.info(Thread.currentThread().getName() + ": " + connection);
            } catch (InterruptedException e) {
                logger.error("", e);
            } finally {
                if (connection != null) {
                    pool.releaseConnection(connection);
                }
            }
    
            latch.countDown();
        }
    
    }
    View Code

    执行结果

    11:39:53.906 [mythread1] mythread1: Connection [num=1]
    11:39:53.906 [mythread0] mythread0: Connection [num=0]
    11:39:53.906 [mythread2] mythread2: Connection [num=2]
    11:39:55.914 [mythread3] mythread3: null
    11:39:55.914 [mythread4] mythread4: Connection [num=1]
    11:39:55.915 [main] execute end.

    还是有线程未获取到连接,

    分析:五条线程同时执行,前三条线程获取到锁,线程sleep 2s,另外两条线程未获取到锁,处于timed waiting,当前三条线程中有一条线程执行结束释放连接,同时notifyAll,则处于timed waiting状态的两条线程同时变为runnable状态,继续执行,此时只有一个连接,其中一个线程获取后,另外一个线程获取到的就为null,所以出现上述情况,但这种情况不是必现的,有时五条线程都可以获取到连接,该方式不稳定。

    改进:notifyAll是唤醒所有在当前锁上等待的线程,notify是唤醒任意一个在当前锁上等待的线程,那如果将notifyAll改为notify,每次释放连接时,只唤醒一条线程,这种思路理论上是可以的我觉得

    对App测试类进行改进,循环20次进行输出打印

    package com.demo.Multithreading;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.Multithreading.ConnectionPool.ConnectionPool;
    import com.demo.Multithreading.ConnectionPool.test.MyThread;
    
    public class App {
        private static Logger logger = LoggerFactory.getLogger(App.class);
    
        public static void main(String[] args) {
            
            for(int j=0;j<20;j++) {
                ConnectionPool cp = new ConnectionPool(3);
                int threadCount = 5;
                CountDownLatch cd = new CountDownLatch(threadCount);
                for (int i = 0; i < threadCount; i++) {
                    MyThread th = new MyThread("mythread" + i, cd, cp);
                    th.start();
                }
    
                try {
                    cd.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                logger.info("execute end.");
            }
            
        }
    }
    View Code

    分别使用notifyAll和notify方法唤醒线程

    结果:

     notifyAll方法,有14次只有四条线程获取到连接,6次五条线程都获取到连接

    notify方法,20次五条线程均获取到连接

    当然,notify和notifyAll方法要看使用场景,在本例中使用notify方法比使用notifyAll方法合适

    如果使用notifyAll,还会有一个比较严重的问题,就是超时时间不准确,还是按照3个连接,5个线程来分析,前三个执行结束,后两个被notifyAll唤醒,当其中一个线程获取到连接后,假设另外一个线程未获取到连接,则此时该线程返回null是不准确的,因为此时很可能还没有到超时时间。

    第二版代码

    在第二版中主要对连接池获取连接的方法做了处理,使其无论是使用notify还是notifyAll都可以正常,并且支持无限等待

    package com.demo.Multithreading.ConnectionPool;
    
    import java.util.LinkedList;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ConnectionPool {
        private Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
        // 存放连接
        private LinkedList<Connection> pool = new LinkedList<Connection>();
        // 连接池的初始容量
        private int capacity;
    
        public ConnectionPool() {
            capacity = 10;
            init();
        }
    
        public ConnectionPool(int capacity) {
            this.capacity = capacity;
            init();
        }
    
        /**
         * 初始化连接池
         */
        public void init() {
            for (int i = 0; i < capacity; i++) {
                pool.add(Connection.createConnection(i));
            }
        }
    
        /**
         * 获取连接,指定的时间内未获取到,则返回null
         * 
         * @param timeout
         *            等待超时时间
         * @return
         * @throws InterruptedException
         */
        public Connection getConnection(long timeout) throws InterruptedException {
            synchronized (pool) {
                if (timeout <= 0) {
                    // 无限等待,直到被唤醒
                    while (pool.isEmpty()) {
                        pool.wait();
                    }
                    return pool.removeFirst();
                } else {
                    // 获取连接超时时的时间点
                    long futureTime = System.currentTimeMillis() + timeout;
                    // 获取连接池剩余等待时间
                    long remain = timeout;
                    // 连接池为空,并且还有剩余等待时间,则继续等待,并且更新剩余等待时间
                    while (pool.isEmpty() && remain > 0) {
                        pool.wait(remain);
                        remain = futureTime - System.currentTimeMillis();
                    }
    
                    if (pool.isEmpty()) {
                        logger.error("timeout");
                    } else {
                        return pool.removeFirst();
                    }
    
                }
            }
    
            return null;
        }
    
        /**
         * 释放连接
         * 
         * @param con
         */
        public void releaseConnection(Connection con) {
            synchronized (pool) {
                if (con != null) {
                    // 在末尾添加连接
                    pool.addLast(con);
                    // 通知其他线程获取连接
                    pool.notify();
                }
            }
        }
    }
    View Code

    无论使用notify还是notifyAll,在20次测试中,五条线程均可以正常获取到连接,且超时时间是准确的,不会因为被notifyAll唤醒,还未到超时间就返回null

     压力测试(本例中线程sleep 2s,可认连接从获取到释放大概为2s时间)

    package com.demo.Multithreading;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.Multithreading.ConnectionPool.ConnectionPool;
    import com.demo.Multithreading.ConnectionPool.test.MyThread;
    
    public class App {
        private static Logger logger = LoggerFactory.getLogger(App.class);
    
        public static void main(String[] args) {
            int poolSize = 100;
            int threadCount = 500;
            ConnectionPool cp = new ConnectionPool(poolSize);
            CountDownLatch cd = new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; i++) {
                MyThread th = new MyThread("mythread" + i, cd, cp);
                th.start();
            }
    
            try {
                cd.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("execute end.");
        }
    
    }

    连接池中100个连接,500条线程,每条线程sleep 2s,连接获取超时时间为5s,未获取到连接的线程数有200个

    连接池中100个连接,300条线程,每条线程sleep 2s,连接获取超时时间为5s,全部都获取到连接

    连接池中100个连接,500条线程,每条线程sleep 2s,连接获取超时时间为10s,全部都获取到连接

    连接池中200个连接,500条线程,每条线程sleep 2s,连接获取超时时间为5s,全部都获取到连接

    连接池中100个连接,500条线程,每条线程sleep 1s,连接获取超时时间为5s,全部都获取到连接

    通过以上简单测试可以得出,连接池大小,线程数(并发数),获取到连接到释放连接所需时间,连接超时时间都会影响到连接池的获取

    对于数据库,设置线程池大小为最大并发数大小不会出现获取不到连接,但会出现连接浪费,应该综合考虑设置参数,比如最大并发数为100,获取到连接到释放连接时间为1s,链接获取超时时间为10s,则此时连接池大小设置为10就差不多可以满足应用所需,但设置的时候可以稍微设置的大一些,比如15

  • 相关阅读:
    四个好看的CSS样式表格
    POJ 2255 Tree Recovery
    黑马程序猿_2014 7月 我使用多线程体验
    Dos命令将合并两个文本文件的内容
    栈和堆之间的差(他转身无数的文章)
    【Espruino】NO.12 加速度计演示
    MySQL进口.sql文件和常用命令
    typedef和define具体的具体差异
    muduo网络图书馆评测
    Web采矿技术
  • 原文地址:https://www.cnblogs.com/qq931399960/p/10868505.html
Copyright © 2011-2022 走看看