zoukankan      html  css  js  c++  java
  • commons-pool与commons-pool2连接池(Hadoop连接池)

    commons-pool和commons-pool2是用来建立对象池的框架,提供了一些将对象池化必须要实现的接口和一些默认动作。对象池化之后可以通过pool的概念去管理其生命周期,例如对象的创建,使用,销毁等。例如我们通常使用的连接池,连接池可以有效管理连接的数量和状态,保证连接资源的情况而且避免并发场景下连接的频繁建立和释放。

    我们这里来讲述如何使用commons-pool2来池化对象。我们以池化hadoop连接为例。

     

    1、先解决依赖

         <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.3</version>
            </dependency>

    2、如何使用连接池

    我们是在spingboot框架中池化hadoop集群连接,先看一下池化之后的效果。

    下面是我们池化之后的hadoop集群客户端。可以看到我们可以通过连接池的方式管理hadoo集群的链接。

    1)配置连接池

    最大连接数maxTotal

    最大空闲连接数maxIdle

    最小空闲连接数minIdle

    获取连接的最大等待时间maxWait

    可以看到传入这些配置的时候我们使用了一个config对象JHadoopPoolConfig,后面我们将说明这个config对象如何实现。

    2)管理连接池

    我们以三个函数说明了如何去连接池中申请连接,使用连接和释放链接资源。

    申请资源pool.getResource()

    释放资源pool.returnBrokenResource()和pool.returnResource()

    这里要注意的是,一定要在catch和finally中成功释放资源,不然会导致could not get a Resource from the Pool的异常

    package com.xiaoju.dqa.jazz.hadoop.client;
    
    import org.apache.hadoop.fs.FileStatus;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    
    public class JHadoopClient {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        private JHadoopPool jHadoopPool;
        private String coreResource;
        private String hdfsResource;
        private int maxTotal;
        private int maxIdle;
        private int minIdle;
        private int maxWaitMillis;
    
        public String getCoreResource() {
            return coreResource;
        }
        public void setCoreResource(String coreResource) {
            this.coreResource = coreResource;
        }
        public String getHdfsResource() {
            return hdfsResource;
        }
        public void setHdfsResource(String hdfsResource) {
            this.hdfsResource = hdfsResource;
        }
        public int getMaxTotal() {
            return maxTotal;
        }
        public void setMaxTotal(int maxTotal) {
            this.maxTotal = maxTotal;
        }
        public int getMaxIdle() {
            return maxIdle;
        }
        public void setMaxIdle(int maxIdle) {
            this.maxIdle = maxIdle;
        }
        public int getMaxWaitMillis() {
            return maxWaitMillis;
        }
        public void setMaxWaitMillis(int maxWaitMillis) {
            this.maxWaitMillis = maxWaitMillis;
        }
        public int getMinIdle() {
            return minIdle;
        }
        public void setMinIdle(int minIdle) {
            this.minIdle = minIdle;
        }
    
        public void init() {
            try {
                JHadoopPoolConfig conf = new JHadoopPoolConfig();
                conf.setMaxTotal(maxTotal);
                conf.setMaxIdle(maxIdle);
                conf.setMinIdle(minIdle);
                conf.setMaxWaitMillis(maxWaitMillis);
                jHadoopPool = new JHadoopPool(conf, coreResource, hdfsResource);
                logger.info("[HDFS]初始化JHadoopClient成功");
            } catch (Exception ex) {
                logger.error("[HDFS]初始化JHadoopClient失败", ex);
            }
        }
    
        public void stop() {
            try {
                jHadoopPool.destroy();
            } catch(Exception e) {
            }
        }
    
        public boolean exists(String path) throws Exception {
            JHadoop jHadoop = null;
            boolean broken = false;
            try {
                jHadoop = jHadoopPool.getResource();
                return jHadoop.exists(path);
            } catch (Exception e) {
                broken = true;
                jHadoopPool.returnBrokenResource(jHadoop);
                logger.error("[HDFS]判断文件是否存在失败", e);
                throw e;
            } finally {
                if (null != jHadoop && !broken) {
                    jHadoopPool.returnResource(jHadoop);
                }
            }
        }
    
        public String getModificationTime(String path) throws Exception {
            JHadoop jHadoop = null;
            boolean broken = false;
            try {
                jHadoop = jHadoopPool.getResource();
                FileStatus fileStatus = jHadoop.getFileStatus(path);
                long modifyTimestamp = fileStatus.getModificationTime();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                Date date = new Date(modifyTimestamp);
                return simpleDateFormat.format(date);
            } catch (Exception e) {
                broken = true;
                jHadoopPool.returnBrokenResource(jHadoop);
                logger.error("[HDFS]获取最近修改时间失败", e);
                throw e;
            } finally {
                if (null != jHadoop && !broken) {
                    jHadoopPool.returnResource(jHadoop);
                }
            }
        }
    
        public long getPathSize(String path) throws Exception {
            JHadoop jHadoop = null;
            boolean broken = false;
            try {
                jHadoop = jHadoopPool.getResource();
                return jHadoop.getContentSummary(path).getLength();
            } catch (Exception e) {
                broken = true;
                jHadoopPool.returnBrokenResource(jHadoop);
                logger.error("[HDFS]获取路径大小失败", e);
                throw e;
            } finally {
                if (null != jHadoop && !broken) {
                    jHadoopPool.returnResource(jHadoop);
                }
            }
        }
    
    }

    3)注册成bean

    通过配置文件传入链接池相应的配置。

    package com.xiaoju.dqa.jazz.hadoop.configuration;
    
    import com.xiaoju.dqa.jazz.hadoop.client.JHadoopClient;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class HadoopConfig {
    
        @Value("${hadoop.core.resource}")
        private String coreResource;
        @Value("${hadoop.hdfs.resource}")
        private String hdfsResource;
        @Value("${hadoop.pool.maxTotal}")
        private int maxTotal;
        @Value("${hadoop.pool.maxIdle}")
        private int maxIdle;
        @Value("${hadoop.pool.minIdle}")
        private int minIdle;
        @Value("${hadoop.pool.maxWaitMillis}")
        private int maxWaitMillis;
    
        @Bean(initMethod = "init", destroyMethod = "stop")
        public JHadoopClient jHadoopClient() {
            JHadoopClient jHadoopClient = new JHadoopClient();
            jHadoopClient.setMaxTotal(maxTotal);
            jHadoopClient.setMaxIdle(maxIdle);
            jHadoopClient.setMinIdle(minIdle);
            jHadoopClient.setMaxWaitMillis(maxWaitMillis);
            jHadoopClient.setCoreResource(coreResource);
            jHadoopClient.setHdfsResource(hdfsResource);
            return jHadoopClient;
        }
    }

    4)config对象如何实现

    我们这里要说明一下下面这些参数的含义:

    1)setTestWhileConfig - 在空闲时检查有效性, 默认false

    2)setMinEvictableIdleTimeMillis - 逐出连接的最小空闲时间

    3)setTimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1

    4)setNumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目
    package com.xiaoju.dqa.jazz.hadoop.client;
    
    
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    
    public class JHadoopPoolConfig extends GenericObjectPoolConfig {
        public JHadoopPoolConfig() {
            this.setTestWhileIdle(true);
            this.setMinEvictableIdleTimeMillis(60000L);
            this.setTimeBetweenEvictionRunsMillis(30000L);
            this.setNumTestsPerEvictionRun(-1);
        }
    }

    3、连接池JHadoopPool

    这个类继承了Pool<JHadoop>,用来初始化连接池对象。而JHadoop是Pool要管理的连接对象。

    可以看到JHadoopPool在初始化的时候传入了一个JHadoopFactory的实例。这个实例将会以工厂模式来创建实际的JHadoop

    JHadoopPool代码

    package com.xiaoju.dqa.jazz.hadoop.client;
    
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    public class JHadoopPool extends Pool<JHadoop> {
        public JHadoopPool(GenericObjectPoolConfig poolConfig, String coreResource, String hdfsResource) {
            super(poolConfig, new JHadoopFactory(coreResource, hdfsResource));
        }
    
        public JHadoopPool(GenericObjectPoolConfig poolConfig) {
            super(poolConfig, new JHadoopFactory());
        }
    
    }

    JHadoop代码

    JHadoop实现了hadoop.fs中的方法调用。

    我这里只给出了几个函数的简单封装,你可以根据具体的需要进行增加。

    package com.xiaoju.dqa.jazz.hadoop.client;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.ContentSummary;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    
    public class JHadoop {
        protected final Logger logger = LoggerFactory.getLogger(this.getClass());
        private FileSystem fs;
        private String coreResource;
        private String hdfsResource;
    
        public JHadoop(String coreResource, String hdfsResource) {
            this.coreResource = coreResource;
            this.hdfsResource = hdfsResource;
        }
    
        public String getCoreResource() {
            return coreResource;
        }
        public void setCoreResource(String coreResource) {
            this.coreResource = coreResource;
        }
        public String getHdfsResource() {
            return hdfsResource;
        }
        public void setHdfsResource(String hdfsResource) {
            this.hdfsResource = hdfsResource;
        }
    
    
        public void open() {
            try {
                Configuration conf = new Configuration();
                conf.addResource(coreResource);
                conf.addResource(hdfsResource);
                fs = FileSystem.get(conf);
                logger.info("[JHadoop]创建实例成功");
            } catch (Exception e) {
                logger.error("[JHadoop]创建实例失败", e);
            }
        }
    
        public void close() {
            try {
                if (null != fs) {
                    fs.close();
                    logger.info("[JHadoop]关闭实例成功");
                }
            } catch(Exception e) {
                logger.error("[JHadoop]关闭实例失败", e);
            }
        }
    
        public boolean isConnected() throws IOException {
            fs.exists(new Path("/forTest"));
            return true;
        }
    
        public boolean exists(String path) throws IOException {
            Path hdfsPath = new Path(path);
            return fs.exists(hdfsPath);
        }
    
        public FileStatus getFileStatus(String path) throws IOException {
            Path hdfsPath = new Path(path);
            return fs.getFileStatus(hdfsPath);
        }
    
        public ContentSummary getContentSummary(String path) throws IOException {
            ContentSummary contentSummary = null;
            Path hdfsPath = new Path(path);
            if (fs.exists(hdfsPath)) {
                contentSummary = fs.getContentSummary(hdfsPath);
            }
            return contentSummary;
        }
    
    }

    4、连接工厂类JHadoopFactory

    JHadoopFactory这个类管理着连接对象的创建,销毁,验证等动作

    package com.xiaoju.dqa.jazz.hadoop.client;
    
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.PooledObjectFactory;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    
    
    public class JHadoopFactory implements PooledObjectFactory<JHadoop> {
        private final String coreResource;
        private final String hdfsResource;
    
        public JHadoopFactory() {
            this.coreResource = "core-site.xml";
            this.hdfsResource = "hdfs-site.xml";
        }
        public JHadoopFactory(String coreResource, String hdfsResource) {
            this.coreResource = coreResource;
            this.hdfsResource = hdfsResource;
        }
    
        @Override
        public PooledObject<JHadoop> makeObject() throws Exception {
            JHadoop jHadoop = new JHadoop(coreResource, hdfsResource);
            jHadoop.open();
            return new DefaultPooledObject<JHadoop>(jHadoop);
        }
    
        @Override
        public void destroyObject(PooledObject<JHadoop> pooledJHadoop) throws Exception {
            JHadoop jHadoop = pooledJHadoop.getObject();
            jHadoop.close();
        }
    
        @Override
        public boolean validateObject(PooledObject<JHadoop> pooledJHadoop) {
            JHadoop jHadoop = pooledJHadoop.getObject();
            try {
                return jHadoop.isConnected();
            } catch (Exception e) {
                return false;
            }
        }
    
        @Override
        public void activateObject(PooledObject<JHadoop> pooledObject) throws Exception {
    
        }
    
        @Override
        public void passivateObject(PooledObject<JHadoop> pooledObject) throws Exception {
    
        }
    }
  • 相关阅读:
    的地方d'fe'w
    日期
    equals方法
    接口作为成员变量类型
    匿名内部类注意事项
    匿名内部类
    局部内部类注意问题
    可以
    C中的volatile关键字
    继承权限问题
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7674480.html
Copyright © 2011-2022 走看看