zoukankan      html  css  js  c++  java
  • java并发实战:连接池实现

    池化技术简介

    在我们使用数据库的过程中,我们往往使用数据库连接池而不是直接使用数据库连接进行操作,这是因为每一个数据库连接的创建和销毁的代价是昂贵的,而池化技术则预先创建了资源,这些资源是可复用的,这样就保证了在多用户情况下只能使用指定数目的资源,避免了一个用户创建一个连接资源,造成程序运行开销过大。关于Java并发编程的总结和思考

    连接池实现原理

    这里只实现一个简易的连接池,更多复杂的需求可根据该连接池进行改进,该连接池主要参数如下:

    1. 一个繁忙队列busy
    2. 一个空闲队列idle
    3. 连接池最大活动连接数maxActive
    4. 连接池最大等待时间maxWait
    5. 连接池的活动连接数activeSize

    程序流程图如下:

    在这里插入图片描述

    代码实现

    泛型接口ConnectionPool.java

    
    public interface ConnectionPool<T> {
    
        /**
         * 初始化池资源
         * @param maxActive 池中最大活动连接数
         * @param maxWait 最大等待时间
         */
        void init(Integer maxActive, Long maxWait);
    
        /**
         * 从池中获取资源
         * @return 连接资源
         */
        T getResource() throws Exception;
    
        /**
         * 释放连接
         * @param connection 正在使用的连接
         */
        void release(T connection) throws Exception;
    
        /**
         * 释放连接池资源
         */
        void close();
    
    
    }
    

    以zookeeper为例,实现zookeeper连接池,ZookeeperConnectionPool.java

    
    public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {
        //最大活动连接数
        private Integer maxActive; 
        //最大等待时间
        private Long maxWait; 
        //空闲队列
        private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();
        //繁忙队列
        private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();
        //连接池活动连接数
        private AtomicInteger activeSize = new AtomicInteger(0);
        //连接池关闭标记
        private AtomicBoolean isClosed = new AtomicBoolean(false);
        //总共获取的连接记数
        private AtomicInteger createCount = new AtomicInteger(0);
        //等待zookeeper客户端创建完成的计数器
        private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));
    
        public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
            this.init(maxActive, maxWait);
        }
    
        @Override
        public void init(Integer maxActive, Long maxWait) {
            this.maxActive = maxActive;
            this.maxWait = maxWait;
        }
    
        @Override
        public ZooKeeper getResource() throws Exception {
            ZooKeeper zooKeeper;
            Long nowTime = System.currentTimeMillis();
            final CountDownLatch countDownLatch = latchThreadLocal.get();
            
            //空闲队列idle是否有连接
            if ((zooKeeper = idle.poll()) == null) {
                //判断池中连接数是否小于maxActive
                if (activeSize.get() < maxActive) {
                    //先增加池中连接数后判断是否小于等于maxActive
                    if (activeSize.incrementAndGet() <= maxActive) {
                        //创建zookeeper连接
                        zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
                            if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
                                countDownLatch.countDown();
                            }
                        });
                        countDownLatch.await();
                        System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
                        busy.offer(zooKeeper);
                        return zooKeeper;
                    } else {
                        //如增加后发现大于maxActive则减去增加的
                        activeSize.decrementAndGet();
                    }
                }
                //若活动线程已满则等待busy队列释放连接
                try {
                    System.out.println("Thread:" + Thread.currentThread().getId() + "等待获取空闲资源");
                    Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
                    zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    throw new Exception("等待异常");
                }
                //判断是否超时
                if (zooKeeper != null) {
                    System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");
                    busy.offer(zooKeeper);
                    return zooKeeper;
                } else {
                    System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
                    throw new Exception("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");
                }
            }
            //空闲队列有连接,直接返回
            busy.offer(zooKeeper);
            return zooKeeper;
        }
    
        @Override
        public void release(ZooKeeper connection) throws Exception {
            if (connection == null) {
                System.out.println("connection 为空");
                return;
            }
            if (busy.remove(connection)){
                idle.offer(connection);
            } else {
                activeSize.decrementAndGet();
                throw new Exception("释放失败");
            }
        }
    
        @Override
        public void close() {
            if (isClosed.compareAndSet(false, true)) {
                idle.forEach((zooKeeper) -> {
                    try {
                        zooKeeper.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                busy.forEach((zooKeeper) -> {
                    try {
                        zooKeeper.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    
    

    测试用例

    这里创建20个线程并发测试连接池,Test.java

    
    public class Test {
    
        public static void main(String[] args) throws Exception {
            int threadCount = 20;
            Integer maxActive = 10;
            Long maxWait = 10000L;
            ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
            CountDownLatch countDownLatch = new CountDownLatch(20);
            for (int i = 0; i < threadCount; i++) {
                new Thread(() -> {
                    countDownLatch.countDown();
                    try {
                        countDownLatch.await();
                        ZooKeeper zooKeeper = pool.getResource();
                        Thread.sleep(2000);
                        pool.release(zooKeeper);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }).start();
            }
            while (true){
    
            }
        }
    }
    

    来源:https://segmentfault.com/a/1190000017926611

  • 相关阅读:
    WPF Margin和Padding
    WPF Tab切换顺序设置
    WPF DataGrid DataGridTemplateColumn
    WPF CheckBox IsHitTestVisible
    WPF Tag
    WPF RadioButton
    WPF 用户控件(UserControl)
    WPF ToolTip
    Style Lessons in Clarity and Grace (11th Edition)中文翻译
    AI for AI
  • 原文地址:https://www.cnblogs.com/lalalagq/p/10286937.html
Copyright © 2011-2022 走看看