zoukankan      html  css  js  c++  java
  • 基于Berkeley DB实现的持久化队列

    转自:http://guoyunsky.iteye.com/blog/1169912

       队列很常见,但大部分的队列是将数据放入到内存.如果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.比如爬虫,将要抓取的URL放到内存,而URL过多,内存肯定要爆.在读Heritrix源码中,发现Heritrix是基于Bdb实现了一个持久化队列,于是我就将这块代码独立出来,平时使用也蛮爽的,现在拿出来共享.同时数据已经持久化,相比放在内存的一次性,可以循环累加使用.

          大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.

          这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue,这里贴上代码.以下代码需引用bdb-je,common-io,junit.请在附件中下载

    1.自定义的BDB环境类,可以缓存StoredClassCatalog并共享

    package com.guoyun.util;
    
    import java.io.File;
    
    import com.sleepycat.bind.serial.StoredClassCatalog;
    import com.sleepycat.je.Database;
    import com.sleepycat.je.DatabaseConfig;
    import com.sleepycat.je.DatabaseException;
    import com.sleepycat.je.Environment;
    import com.sleepycat.je.EnvironmentConfig;
    /**
     * BDB数据库环境,可以缓存StoredClassCatalog并共享
     * 
     * @contributor guoyun
     */
    public class BdbEnvironment extends Environment {
        StoredClassCatalog classCatalog; 
        Database classCatalogDB;
        
        /**
         * Constructor
         * 
         * @param envHome 数据库环境目录
         * @param envConfig config options  数据库换纪念馆配置
         * @throws DatabaseException
         */
        public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
            super(envHome, envConfig);
        }
    
        /**
         * 返回StoredClassCatalog
         * @return the cached class catalog
         */
        public StoredClassCatalog getClassCatalog() {
            if(classCatalog == null) {
                DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setAllowCreate(true);
                try {
                    classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
                    classCatalog = new StoredClassCatalog(classCatalogDB);
                } catch (DatabaseException e) {
                    // TODO Auto-generated catch block
                    throw new RuntimeException(e);
                }
            }
            return classCatalog;
        }
    
        @Override
        public synchronized void close() throws DatabaseException {
            if(classCatalogDB!=null) {
                classCatalogDB.close();
            }
            super.close();
        }
    
    }

     2.  基于BDB实现的持久化队列

    package com.guoyun.util;
    
    import java.io.File;
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.AbstractQueue;
    import java.util.Iterator;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.apache.commons.io.FileUtils;
    
    import com.sleepycat.bind.EntryBinding;
    import com.sleepycat.bind.serial.SerialBinding;
    import com.sleepycat.bind.serial.StoredClassCatalog;
    import com.sleepycat.bind.tuple.TupleBinding;
    import com.sleepycat.collections.StoredMap;
    import com.sleepycat.collections.StoredSortedMap;
    import com.sleepycat.je.Database;
    import com.sleepycat.je.DatabaseConfig;
    import com.sleepycat.je.DatabaseException;
    import com.sleepycat.je.DatabaseExistsException;
    import com.sleepycat.je.DatabaseNotFoundException;
    import com.sleepycat.je.EnvironmentConfig;
    /**
     * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭
     * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间
     * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,
     * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现
     * 
     * @contributor guoyun
     * @param <E>
     */
    public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
            Serializable {
        private static final long serialVersionUID = 3427799316155220967L;
        private transient BdbEnvironment dbEnv;            // 数据库环境,无需序列化
        private transient Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化
        private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key为指针位置,Value为值,无需序列化
        private transient String dbDir;                 // 数据库所在目录
        private transient String dbName;                // 数据库名字
        private AtomicLong headIndex;                   // 头部指针
        private AtomicLong tailIndex;                   // 尾部指针
        private transient E peekItem=null;              // 当前获取的值
        
        /**
         * 构造函数,传入BDB数据库
         * 
         * @param db
         * @param valueClass
         * @param classCatalog
         */
        public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
            this.queueDb=db;
            this.dbName=db.getDatabaseName();
            headIndex=new AtomicLong(0);
            tailIndex=new AtomicLong(0);
            bindDatabase(queueDb,valueClass,classCatalog);
        }
        /**
         * 构造函数,传入BDB数据库位置和名字,自己创建数据库
         * 
         * @param dbDir
         * @param dbName
         * @param valueClass
         */
        public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
            headIndex=new AtomicLong(0);
            tailIndex=new AtomicLong(0);
            this.dbDir=dbDir;
            this.dbName=dbName;
            createAndBindDatabase(dbDir,dbName,valueClass);
        }
        /**
         * 绑定数据库
         * 
         * @param db
         * @param valueClass
         * @param classCatalog
         */
        public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
            EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
            if(valueBinding == null) {
                valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化绑定
            }
            queueDb = db;
            queueMap = new StoredSortedMap<Long,E>(
                    db,                                             // db
                    TupleBinding.getPrimitiveBinding(Long.class),   //Key
                    valueBinding,                                   // Value
                    true);                                          // allow write
        }
        /**
         * 创建以及绑定数据库
         * 
         * @param dbDir
         * @param dbName
         * @param valueClass
         * @throws DatabaseNotFoundException
         * @throws DatabaseExistsException
         * @throws DatabaseException
         * @throws IllegalArgumentException
         */
        private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
        DatabaseExistsException,DatabaseException,IllegalArgumentException{
            File envFile = null;
            EnvironmentConfig envConfig = null;
            DatabaseConfig dbConfig = null;
            Database db=null;
    
            try {
                // 数据库位置
                envFile = new File(dbDir);
                
                // 数据库环境配置
                envConfig = new EnvironmentConfig();
                envConfig.setAllowCreate(true);
                envConfig.setTransactional(false);
                
                // 数据库配置
                dbConfig = new DatabaseConfig();
                dbConfig.setAllowCreate(true);
                dbConfig.setTransactional(false);
                dbConfig.setDeferredWrite(true);
                
                // 创建环境
                dbEnv = new BdbEnvironment(envFile, envConfig);
                // 打开数据库
                db = dbEnv.openDatabase(null, dbName, dbConfig);
                // 绑定数据库
                bindDatabase(db,valueClass,dbEnv.getClassCatalog());
                 
            } catch (DatabaseNotFoundException e) {
                throw e;
            } catch (DatabaseExistsException e) {
                throw e;
            } catch (DatabaseException e) {
                throw e;
            } catch (IllegalArgumentException e) {
                throw e;
            }
    
            
        }
        
        /**
         * 值遍历器
         */
        @Override
        public Iterator<E> iterator() {
            return queueMap.values().iterator();
        }
        /**
         * 大小
         */
        @Override
        public int size() {
            synchronized(tailIndex){
                synchronized(headIndex){
                    return (int)(tailIndex.get()-headIndex.get());
                }
            }
        }
        
        /**
         * 插入值
         */
        @Override
        public boolean offer(E e) {
            synchronized(tailIndex){
                queueMap.put(tailIndex.getAndIncrement(), e);   // 从尾部插入
            }
            return true;
        }
        
        /**
         * 获取值,从头部获取
         */
        @Override
        public E peek() {
            synchronized(headIndex){
                if(peekItem!=null){
                    return peekItem;
                }
                E headItem=null;
                while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围
                    headItem=queueMap.get(headIndex.get());
                    if(headItem!=null){
                        peekItem=headItem;
                        continue;
                    } 
                    headIndex.incrementAndGet();    // 头部指针后移
                }
                return headItem;
            }
        }
        
        /**
         * 移出元素,移出头部元素
         */
        @Override
        public E poll() {
            synchronized(headIndex){
                E headItem=peek();
                if(headItem!=null){
                    queueMap.remove(headIndex.getAndIncrement());
                    peekItem=null;
                    return headItem;
                }
            }
            return null;
        }
        
        /**
         * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
         */
        public void close(){
            try {
                if(queueDb!=null){
                    queueDb.sync();
                    queueDb.close();
                }
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (UnsupportedOperationException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        /**
         * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
         */
        @Override
        public void clear() {
           try {
               close();
               if(dbEnv!=null&&queueDb!=null){
                    dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName); 
                    dbEnv.close();
               }
            } catch (DatabaseNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                try {
                    if(this.dbDir!=null){
                        FileUtils.deleteDirectory(new File(this.dbDir));
                    }
                    
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        
    }

    3. 测试类,测试数据准确性和性能

    package com.guoyun.util;
    
    import java.io.File;
    import java.util.Queue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    import junit.framework.TestCase;
    
    public class BdbPersistentQueueTest extends TestCase{
        Queue<String> memoryQueue;
        Queue<String> persistentQueue;
        
        @Override
        protected void setUp() throws Exception {
            super.setUp();
            memoryQueue=new LinkedBlockingQueue<String>();
            String dbDir="E:/java/test/bdbDir";
            File file=new File(dbDir);
            if(!file.exists()||!file.isDirectory()){
                file.mkdirs();
            }
            persistentQueue=new BdbPersistentQueue(dbDir,"pq",String.class);
        }
    
        @Override
        protected void tearDown() throws Exception {
            super.tearDown();
            memoryQueue.clear();
            memoryQueue=null;
            persistentQueue.clear();
            persistentQueue=null;
        }
        
        /**
         * 排放值
         * @param queue
         * @return      排放的数据个数
         */
        public int drain(Queue<String> queue){
            int count=0;
            while(true){
                try {
                    queue.remove();
                    count++;
                } catch (Exception e) {
                    return count;
                }
            }
           
        }
        /**
         * 
         * @param queue
         * @param size
         */
        public void fill(Queue<String> queue,int size){
            for(int i=0;i<size;i++){
                queue.add(i+"");
            }
        }
        
        public void checkTime(int size){
            System.out.println("1.内存Queue插入和排空数据所耗时间");
            long time=0;
            long start=System.nanoTime();
            fill(memoryQueue,size);
            time=System.nanoTime()-start;
            System.out.println("	填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");
            start=System.nanoTime();
            drain(memoryQueue);
            time=System.nanoTime()-start;
            System.out.println("	排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");
            
            System.out.println("2.持久化Queue插入和排空数据所耗时间");
            start=System.nanoTime();
            fill(persistentQueue,size);
            time=System.nanoTime()-start;
            System.out.println("	填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000000)+" 豪秒");
            start=System.nanoTime();
            drain(persistentQueue);
            time=System.nanoTime()-start;
            System.out.println("	排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000)+" 豪秒");
            
        }
        
        /**
         * 十万数量级测试
         */
        public void testTime_tenThousand(){
            System.out.println("========测试1000000(十万)条数据=================");
            checkTime(100000);
        }
        
        
        /**
         * 百万数量级测试
         */
        public void testTime_mil(){
            System.out.println("========测试1000000(百万)条数据=================");
            checkTime(1000000);
        }
        
    
        /**
         * 千万数量级测试,注意要防止内存溢出
         */
        public void testTime_tenMil(){
            System.out.println("========测试10000000(千万)条数据=================");
            checkTime(10000000);
        }
        
        /**
         * 测试队列数据准确性
         * @param queue
         * @param queueName
         * @param size
         */
        public void checkDataExact(Queue<String> queue,String queueName,int size){
            if(queue.size()!=size){
                System.err.println("Error size of "+queueName);
            }
            String value=null;
            for(int i=0;i<size;i++){
                value=queue.remove();
                if(!((i+"").equals(value))){
                    System.err.println("Error "+queueName+":"+i+"->"+value);
                }
            }
        }
        
        /**
         * 测试队列中数据的准确性,包括长度
         */
        public void testExact(){
            int size=100;
            fill(memoryQueue,size);
            fill(persistentQueue,size);
            
            checkDataExact(memoryQueue,"MemoryQueue",100);
            checkDataExact(persistentQueue,"PersistentQueue",100);
             
        }
        
    }

    4.测试性能

    ========测试1000000(十万)条数据=================
    1.内存Queue插入和排空数据所耗时间
     填充 100000 条数据耗时: 53.550787 毫秒,单条耗时: 535.50787 纳秒
     排空 100000 条数据耗时: 27.09901 毫秒,单条耗时: 270.9901 纳秒
    2.持久化Queue插入和排空数据所耗时间
     填充 100000 条数据耗时: 1399.644305 毫秒,单条耗时: 0.01399644305 豪秒
     排空 100000 条数据耗时: 2104.765179 毫秒,单条耗时: 21.04765179 豪秒

     持久化写入是内存写入的26倍,读取是77倍

    ========测试1000000(百万)条数据=================
    1.内存Queue插入和排空数据所耗时间
     填充 1000000 条数据耗时: 699.105888 毫秒,单条耗时: 699.105888 纳秒
     排空 1000000 条数据耗时: 158.792281 毫秒,单条耗时: 158.792281 纳秒
    2.持久化Queue插入和排空数据所耗时间
     填充 1000000 条数据耗时: 11978.132218 毫秒,单条耗时: 0.011978132218 豪秒
     排空 1000000 条数据耗时: 22355.617205 毫秒,单条耗时: 22.355617204999998 豪秒

     持久化写入是内存写入的17倍,读取是141倍

    ========测试10000000(千万)条数据=================
    1.内存Queue插入和排空数据所耗时间
     填充 10000000 条数据耗时: 9678.377046 毫秒,单条耗时: 967.8377046 纳秒
     排空 10000000 条数据耗时: 1473.416825 毫秒,单条耗时: 147.3416825 纳秒
    2.持久化Queue插入和排空数据所耗时间
     填充 10000000 条数据耗时: 151177.036391 毫秒,单条耗时: 0.0151177036391 豪秒
     排空 10000000 条数据耗时: 361642.655135 毫秒,单条耗时: 36.164265513500006 豪秒

     持久化写入是内存写入的15倍,读取是245倍

    可以看出写入和遍历一条都是在毫秒级别,还有千万级的数据,BDB的性能着实牛逼.而且随着数据的增多,写的时间在缩短,读的时间在增长.

    jar包:

    commons-io-1.4.jar

    junit-4.8.2.jar

    je-4.0.71.jar

  • 相关阅读:
    数组(2)
    数组(1)
    【探路者】第二周立会报告5(总第11次)
    【探路者】第二周立会报告4(总第10次)
    【探路者】第二周立会报告3(总第9次)
    【探路者】第二周立会报告2(总第8次)
    【探路者】第二周立会报告(总第7次)
    第五周作业1
    【探路者】10月19日立会报告(总第6次)
    【探路者】10月18日立会报告(团队第1周-第5次)
  • 原文地址:https://www.cnblogs.com/gisblogs/p/3953532.html
Copyright © 2011-2022 走看看