zoukankan      html  css  js  c++  java
  • Java 之 JDBC线程池(源码版)

    一、目录

    二、代码

    PoolConstant

    package cn.kgc.kb08.jdbc.dao3.impl;
    
    public interface PoolConstant {
        String POOL_CORE_COUNT="coreCount";
        String POOL_MAX_COUNT="maxCount";
        String POOL_MAX_IDELE="maxIdel";
        String POOL_MAX_WAIT="maxWait";
        String POOL_RETRY_INTERVAL="retryInterval";
        String POOL_MAX_RETRY_COUNT="maxRetryCount";
        String POOL_EXIT_ON_ERR="exitOnErr";
    
        String[] POOL={
                POOL_CORE_COUNT,
                POOL_MAX_COUNT,
                POOL_MAX_IDELE,
                POOL_MAX_WAIT,
                POOL_RETRY_INTERVAL,
                POOL_MAX_RETRY_COUNT,
                POOL_EXIT_ON_ERR
        };
    
    
    
    
        String MYSQL_DRI="driver";
        String MYSQL_URI="url";
        String MYSQL_USER="username";
        String MYSQL_PASS="password";
        String[] MYSQL = {
                MYSQL_DRI,
                MYSQL_URI,
                MYSQL_USER,
                MYSQL_PASS
        };
    
    
    
    
    }

    PoolUtil

    package cn.kgc.kb08.jdbc.dao3.impl;
    
    import cn.kgc.kb08.jdbc.dao2.Dao;
    import cn.kgc.kb08.jdbc.dao2.impl.BaseDao;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    public class PoolUtil {
    
    
        private static Dao dao;
    
        /**
         * 解析数据源配置信息
         * @param dataSource 数据源名称
         * @return Map<String,String>
         */
        protected static <T>Map<String,T> parse(Class<T> c,String dataSource, List<String> items){
    //        File config = new File("config/sys.properties");
    //        Properties pro = new Properties();
    //        try {
    //            pro.load(new FileInputStream(config));
    //            Map<String,T> map = new HashMap<>(items.size());
    //            for (String item : items) {
    //                String key = dataSource+"."+ item;
    //                if (!pro.containsKey(key)){
    //                    throw new IOException("缺少配置项目"+item);
    //                }
    //                map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));
    //            }
    //        } catch (Exception e) {
    //            e.printStackTrace();
    //            System.out.println("资源配置缺失,系统强制退出"+e.getMessage());
    //            System.exit(-1);
    //        }finally {
    //            if(null!=pro){
    //                pro.clear();
    //                pro = null;
    //
    //            }
    //        }
    //
    //        return null;
            File config = new File("config/sys.properties");
            Properties pro = new Properties();//Properties是一个文件
            try {
                pro.load(new FileInputStream(config));
                //final String[] items = {"driver", "url", "username", "password"};
                Map<String,T> map = new HashMap<>(items.size());
                for (String item : items) {
                    String key = dataSource + "." + item;
                    if (!pro.containsKey(key)) {
                        throw new IOException("缺少配置项:" + item);//不包含,就是缺项了
                    }
                    map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));
                }
                return map;
            } catch (Exception e) {
                System.err.println(dataSource+"数据源配置信息异常,系统强制退出:" + e.getMessage());
                System.exit(-1);
            } finally {
                if (null != pro) {
                    pro.clear();
                    pro = null;
                }
            }
            return null;
    
        }
    
    
        protected  static void close(AutoCloseable...acs){
            for (AutoCloseable ac : acs) {
                if (null != ac) {
                    try {
                        ac.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }}
    
    }

    ***重点类**** ConPool

    package cn.kgc.kb08.jdbc.dao3.impl;
    
    
    import cn.kgc.kb08.jdbc.dao3.SelRtn;
    import cn.kgc.kb08.jdbc.dao3.Dao;
    import cn.kgc.kb08.jdbc.dao3.Pool;
    
    import java.lang.reflect.Method;
    import java.sql.*;
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 弹性连接池:生产和管理对象的
     */
    public final  class ConPool implements Pool {
    
    //    pool.maxIdel=30是什么 => 请看文档,官方会写
    //    pool.retryInterval=50
    //    pool.maxRetryCount=8
    
        /**
         * 池中连接
         */
        class PoolCon {
            boolean free = true;
            boolean core;
            Connection con;
            long idleBegin;
    
            public PoolCon(boolean core, Connection con) {
                this.core = core;
                this.con = con;
                restIdle();
            }
    
            public void restIdle() {
                if (!core) {
                    this.idleBegin = System.currentTimeMillis();
                }
            }
    
    
        }
    
        private ConcurrentMap<Integer, PoolCon> pool;
        private Map<String, Integer> cnfPool;
        private Map<String, String> cnfCon;
        /**
         * 执行定期清理线程池
         * 检查核心连接对象的有效性,无效则创建新核心连接对象覆盖
         * 检查临时连接对象是否超时,超时则关闭并移除
         */
    
        private ScheduledExecutorService schedule;
        private ExecutorService service;
        private Lock lock;
        private Condition cond;
        private boolean clearing;
    
        public ConPool() {
            initCnf();
            initPool();
            startClear();
        }
    
    // 塞进pool和mysql的配置:比如Map中driver:xxx的键值对
        private void initCnf() {
            cnfPool = PoolUtil.parse(Integer.class, "pool",
                    Arrays.asList(PoolConstant.POOL));
            cnfCon = PoolUtil.parse(String.class, "mysql01",
                    Arrays.asList(PoolConstant.MYSQL));
        }
    
    
        //    初始化连接池
        private void initPool() {
            final int MAX_COUNT = cnfPool.get(PoolConstant.POOL_MAX_COUNT);
            service = Executors.newFixedThreadPool(MAX_COUNT * 2);
            schedule = Executors.newSingleThreadScheduledExecutor();
            lock = new ReentrantLock(true);
            cond = lock.newCondition();
            //分段锁的集合
            pool = new ConcurrentHashMap<>(MAX_COUNT);
            // 池中连接
            PoolCon pc;
            final int CORE_COUNT = cnfPool.get(PoolConstant.POOL_CORE_COUNT);
            for (Integer i = 0, j = 1; i <= CORE_COUNT; i++) {
                pc = makePoolCon(true);
                if (null != pc) {
                    // 给核心连接一个编号
                    pool.put(j++, pc);
                }
            }
            if (pool.size() == 0) {
                System.err.println("连接池初始化失败,系统强制退出");
                System.exit(-1);
            }
            // 如果配置让你失败便退出,且核心池数量小于一半
            if (cnfPool.get(PoolConstant.POOL_EXIT_ON_ERR) == 1
                    && pool.size() <= CORE_COUNT / 2) {
                System.err.println("连接池初始化过半异常,系统强制退出");
                System.exit(-1);
    
            }
        }
    
    
        /**
         * 创建一个池中的连接对象
         *
         * @param core 池对象类型,true:核心对象,false:临时对象
         * @return
         */
    
        private PoolCon makePoolCon(boolean core) {
            PoolCon pc = null;
            // 最大重试次数,创建n次,创建出一个连接对象
            for (int i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
                try {
                    Connection con = DriverManager.getConnection(
                            cnfCon.get(PoolConstant.MYSQL_URI),
                            cnfCon.get(PoolConstant.MYSQL_USER),
                            cnfCon.get(PoolConstant.MYSQL_PASS)
                    );
                    pc = new PoolCon(core, con);
                } catch (SQLException e) {
                    try {
                        // 创建失败就休息片刻再创建(重试)
                        TimeUnit.SECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                        continue;
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                        System.out.println("cuocuocuo");
                    }
                    e.printStackTrace();
                }
            }
            return pc;
    
        }
    
        /**
         * 验证核心连接对象是否有效
         *
         * @param pc
         * @return
         */
        private boolean isPCValid(PoolCon pc) {
            try {
                pc.con.createStatement().executeQuery("select 1");
                return true;
            } catch (SQLException e) {
                return false;
            }
        }
    
        /**
         * 验证临时连接对象是否过期
         *
         * @param pc 池连接对象
         * @return true:过期,false:没过期
         */
        private boolean isExpired(PoolCon pc) {
            return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - pc.idleBegin) >=
                    cnfPool.get(PoolConstant.POOL_MAX_IDELE);
        }
    
    
        /**
         * 验证用户是否超出配置最大时限
         * @param waitBegin   计算参考起点时间
         * @return
         */
        private boolean isWaitExpired(long waitBegin){
            return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-waitBegin)>=
                    cnfPool.get(PoolConstant.POOL_MAX_WAIT);
        }
    
        /**
         * 开启定期清理任务
         * maxIdle,最长闲置时间
         */
        private void startClear() {
            int delay = cnfPool.get(PoolConstant.POOL_MAX_IDELE);
            schedule.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    clearing = true;
                    for (Integer key : pool.keySet()) {
                        PoolCon pc = pool.get(key);
                        if (!pc.free) {
                            continue;
                        }
                        if (pc.core) {
                            if (!isPCValid(pc)) {
                                pool.put(key, makePoolCon(true));
                            }
                        } else {
                            if (isExpired(pc) || !isPCValid(pc)) {
                                pool.remove(key);
                            }
                        }
                    }
    
    
                    clearing = false;
                    cond.signalAll();
                    lock.unlock();
                }
            }, delay, delay, TimeUnit.SECONDS);
        }
    
    
        /**
         * 连接池销毁
         */
        @Override
        public void destory() {
            while (pool.size() > 0) {
                for (Integer key : pool.keySet()) {
                    PoolCon pc = pool.get(key);
                    if (pc.free) {
                        pc.free = false;
                        PoolUtil.close(pc.con);
                        pool.remove(key);
                    }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
    
    
        }
    
    
        /**
         *
         * @return
         */
        private PoolCon fetch() {
            long waitBegin = System.currentTimeMillis();
    
                for (Integer i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
                    try {
                        lock.lock();
                        if (clearing) {
                            cond.await();
                        }
                        for (Integer key : pool.keySet()) {
                            PoolCon pc = pool.get(key);
                            if (pc.free && isPCValid(pc)) {
                                pc.free = false;
                                return pc;
                            }
                        }
                        if(isWaitExpired(waitBegin)){
                            return null;
                        }
                        TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
                if(pool.size()< cnfPool.get(PoolConstant.POOL_MAX_COUNT)){
                       PoolCon pc = makePoolCon(false);
                       if (null != pc){
                           pc.free = false;
                          pool.put(pool.size()+1,pc);
                          return pc;
                       }
                }
            return null;
        }
    
    
    
    
        private  void  giveback(PoolCon pc){
            if(null==pc){
                return;
            }
            if(!pc.core){
                pc.restIdle();
            }
            pc.free = true;
        }
    
    
    
    @Override
        public Dao newDao(){
            return new Dao() {
                private PreparedStatement getPst(Connection con, final String SQL, Object... params) throws SQLException {
                    PreparedStatement pst = con.prepareStatement(SQL);
                    if (null != params && params.length > 0) {
                        for (int i = 0; i < params.length; i++) {
                            pst.setObject(i + 1, params[i]);
                        }
                    }
                    return pst;
                }
    
                private int update(PreparedStatement pst) throws SQLException {
                    return pst.executeUpdate();
                }
    
                private ResultSet query(PreparedStatement pst) throws SQLException {
                    return pst.executeQuery();
                }
    
                private Map<String, Method> parseMethod(Class c) {
                    Map<String, Method> mapMethod = new HashMap<>();
                    final String PREFIX = "set";
                    for (Method method : c.getDeclaredMethods()) {
                        String name = method.getName();
                        if (!name.startsWith(PREFIX)) {
                            continue;
                        }
                        name = name.substring(3);
                        name = name.substring(0, 1).toLowerCase() + name.substring(1);
                        mapMethod.put(name, method);
                    }
                    return mapMethod;
                }
    
                private String[] parseStruct(ResultSetMetaData md) throws SQLException {
                    String[] names = new String[md.getColumnCount()];
                    for (int i = 0; i < names.length; i++) {
                        names[i] = md.getColumnLabel(i + 1);
                    }
                    return names;
                }
    
                @Override
                public int exeUpd(final String SQL, final Object... params) {
                    try {
                        return service.submit(new Callable<Integer>() {
                            @Override
                            public Integer call() throws Exception {
                                int rst = 0;
                                PoolCon pc = null;
                                //                        Connection con = null;
                                PreparedStatement pst = null;
                                try {
                                    pc = fetch();
                                    if (null != pc) {
                                        pst = getPst(pc.con, SQL, params);
                                        rst = update(pst);
                                    }
    
                                } catch (SQLException e) {
                                    rst = -1;
                                } finally {
                                    PoolUtil.close(pst);
                                    giveback(pc);
                                }
                                return rst;
                            }
                        }).get();
                    } catch (Exception e) {
                        return -1;
                    }
    
                }
    
                @Override
                public <T> SelRtn exeSingle(final Class<T> c, final String SQL, final Object... params) {
                    try {
                        return service.submit(new Callable<SelRtn>() {
                            @Override
                            public SelRtn call() throws Exception {
                                PoolCon pc = null;
                                PreparedStatement pst = null;
                                ResultSet rst = null;
                                try {
                                    pc = fetch();
                                    pst = getPst(pc.con, SQL, params);
                                    rst = query(pst);
                                    if (null != rst && rst.next()) {
                                        //                调用类型(非Character基本类型包装类)c的,带有唯一字符串参数的构造方法
                                        //                c.getConstructor(String.class)//基本类型创建对象
                                        return SelRtn.succeed(
                                                c.getConstructor(String.class).newInstance(rst.getObject(1).toString()));
                                    } else {
                                        return SelRtn.succeed(null);
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
    
                                } finally {
                                    //                           close(rst, pst, con);
                                    PoolUtil.close(rst, pst);
                                    giveback(pc);
                                }
                                return SelRtn.fail();
                            }
                        }).get();
                    } catch (Exception e) {
                        return SelRtn.fail();
                    }
                }
    
                @Override
                public <T> SelRtn exeQuery(final Class<T> c, final String SQL, final Object... params) {
                    try {
                        return service.submit(new Callable<SelRtn>() {
                            @Override
                            public SelRtn call() throws Exception {
                                PoolCon pc = null;
                                PreparedStatement pst = null;
                                ResultSet rst = null;
                                try {
                                    pst = getPst(pc.con, SQL, params);
                                    rst = query(pst);
                                    if (null != rst && rst.next()) {
                                        List<T> list = new ArrayList<>();
                                        Map<String, Method> map = parseMethod(c);
                                        String[] names = parseStruct(rst.getMetaData());
                                        do {
                                            T t = c.newInstance();
                                            for (String name : names) {
                                                map.get(name).invoke(t, rst.getObject(name));
                                            }
                                            list.add(t);
                                        } while (rst.next());
                                        return SelRtn.succeed(list);
                                    } else {
                                        return SelRtn.succeed(null);
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
                                } finally {
                                    PoolUtil.close(rst, pst);
                                    giveback(pc);
                                }
                                return SelRtn.fail();
                            }
                        }).get();
                    } catch (Exception e) {
                        return SelRtn.fail();
                    }
                }
            };
    
    
        }
    }

    Pool

    public interface Pool {
        void destory();
        Dao newDao();
    }

    PoolFactory

    package cn.kgc.kb08.jdbc.dao3;
    
    import cn.kgc.kb08.jdbc.dao3.impl.ConPool;
    
    public abstract class PoolFactory {
        private static Dao dao;
        private static synchronized  void init(){
            if(null==dao){
                dao = new ConPool().newDao();
            }
        }
    
        public static Dao get(){
            if(null==dao){
                init();
            }
            return dao;
        }
    }

    SelRtn

    package cn.kgc.kb08.jdbc.dao3;
    
    /**
     * 完善查询操作返回类型,对于异常的缺失
     */
    public final  class SelRtn {
        private boolean err = false;
        private Object rtn;
    
        public static SelRtn succeed(Object rtn){
            return new SelRtn(rtn);
        }
        public static SelRtn fail(){
            return new SelRtn();
        }
    
    
    
        private SelRtn(Object rtn) {
            this.rtn = rtn;
        }
    
        private SelRtn() {
            this.err = true;
        }
    
        public boolean isErr(){
            return this.err;
        }
    
        public <T> T getRtn(){
            return (T) rtn;
        }
    
    }
  • 相关阅读:
    Coursera self-driving2, State Estimation and Localization Week2, kalman filter 卡尔曼滤波
    Coursera Self-driving1, introduction
    Coursera, Big Data 5, Graph Analytics for Big Data, Week 5
    初创电商公司Drop的数据湖实践
    什么是LakeHouse?
    Apache Hudi 0.5.1版本重磅发布
    Apache Hudi使用问题汇总(一)
    ApacheHudi常见问题汇总
    写入Apache Hudi数据集
    Hudi基本概念
  • 原文地址:https://www.cnblogs.com/sabertobih/p/14011825.html
Copyright © 2011-2022 走看看