zoukankan      html  css  js  c++  java
  • 利用bdb实现持久化队列

    一、BDB数据库环境,可以缓存StoredClassCatalog并共享--BdbEnvironment

    import java.io.File;
    
    import com.sleepycat.bind.serial.StoredClassCatalog;
    import com.sleepycat.je.Database;
    import com.sleepycat.je.DatabaseConfig;
    import com.sleepycat.je.DatabaseException;
    import com.sleepycat.je.Environment;
    import com.sleepycat.je.EnvironmentConfig;
    public class BdbEnvironment extends Environment {
        StoredClassCatalog classCatalog;
        Database classCatalogDB;
    
        /**
         * Constructor
         *
         * @param envHome 数据库环境目录
         * @param envConfig config options  数据库换纪念馆配置
         * @throws DatabaseException
         */
        public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
            super(envHome, envConfig);
        }
    
        /**
         * 返回StoredClassCatalog
         * @return the cached class catalog
         */
        public StoredClassCatalog getClassCatalog() {
            if(classCatalog == null) {
                DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setAllowCreate(true);
                try {
                    //事务、数据库名、配置项
                    classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
                    classCatalog = new StoredClassCatalog(classCatalogDB);
                } catch (DatabaseException e) {
                    // TODO Auto-generated catch block
                    throw new RuntimeException(e);
                }
            }
            return classCatalog;
        }
    
        @Override
        public synchronized void close() throws DatabaseException {
            if(classCatalogDB!=null) {
                classCatalogDB.close();
            }
            super.close();
        }
    }

    二、持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭 。相比一般的内存Queue,插入和获取值需要多消耗一定的时间。这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,  其他如remove,addAll,AbstractQueue会基于这几个方法去实现 。

    import java.io.File;
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.AbstractQueue;
    import java.util.Iterator;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.apache.commons.io.FileUtils;
    
    import com.sleepycat.bind.EntryBinding;
    import com.sleepycat.bind.serial.SerialBinding;
    import com.sleepycat.bind.serial.StoredClassCatalog;
    import com.sleepycat.bind.tuple.TupleBinding;
    import com.sleepycat.collections.StoredMap;
    import com.sleepycat.collections.StoredSortedMap;
    import com.sleepycat.je.Database;
    import com.sleepycat.je.DatabaseConfig;
    import com.sleepycat.je.DatabaseException;
    import com.sleepycat.je.DatabaseExistsException;
    import com.sleepycat.je.DatabaseNotFoundException;
    import com.sleepycat.je.EnvironmentConfig;
    /**
     * @contributor
     * @param <E>
     */
    public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
            Serializable {
        private static final long serialVersionUID = 3427799316155220967L;
        private transient BdbEnvironment dbEnv;            // 数据库环境,无需序列化
        private transient Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化
        private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key为指针位置,Value为值,无需序列化
        private transient String dbDir;                 // 数据库所在目录
        private transient String dbName;                // 数据库名字
        //AtomicLong:元子类型,线程安全
        //i++线程不安全
        private AtomicLong headIndex;                   // 头部指针
        private AtomicLong tailIndex;                   // 尾部指针
        private transient E peekItem=null;              // 当前获取的值
    
        /**
         * 构造函数,传入BDB数据库
         *
         * @param db
         * @param valueClass
         * @param classCatalog
         */
        public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
            this.queueDb=db;
            this.dbName=db.getDatabaseName();
            headIndex=new AtomicLong(0);
            tailIndex=new AtomicLong(0);
            bindDatabase(queueDb,valueClass,classCatalog);
        }
        /**
         * 构造函数,传入BDB数据库位置和名字,自己创建数据库
         *
         * @param dbDir
         * @param dbName
         * @param valueClass
         */
        public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
            //headIndex=new AtomicLong(0);
            //tailIndex=new AtomicLong(0);
            this.dbDir=dbDir;
            this.dbName=dbName;
            createAndBindDatabase(dbDir,dbName,valueClass);
        }
        /**
         * 绑定数据库
         *
         * @param db
         * @param valueClass
         * @param classCatalog
         */
        public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
            EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
            if(valueBinding == null) {
                valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化绑定
            }
            queueDb = db;
            queueMap = new StoredSortedMap<Long,E>(
                    db,                                             // db
                    TupleBinding.getPrimitiveBinding(Long.class),   //Key 序列化类型
                    valueBinding,                                   // Value
                    true);                                // allow write
            //todo
            Long firstKey = ((StoredSortedMap<Long, E>) queueMap).firstKey();
            Long lastKey = ((StoredSortedMap<Long, E>) queueMap).lastKey();
    
            headIndex=new AtomicLong(firstKey == null ? 0 : firstKey);
            tailIndex=new AtomicLong(lastKey==null?0:lastKey+1);
        }
        /**
         * 创建以及绑定数据库
         *
         * @param dbDir
         * @param dbName
         * @param valueClass
         * @throws DatabaseNotFoundException
         * @throws DatabaseExistsException
         * @throws DatabaseException
         * @throws IllegalArgumentException
         */
        private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
                DatabaseExistsException,DatabaseException,IllegalArgumentException{
            File envFile = null;
            EnvironmentConfig envConfig = null;
            DatabaseConfig dbConfig = null;
            Database db=null;
    
            try {
                // 数据库位置
                envFile = new File(dbDir);
    
                // 数据库环境配置
                envConfig = new EnvironmentConfig();
                envConfig.setAllowCreate(true);
                //不支持事务
                envConfig.setTransactional(false);
    
                // 数据库配置
                dbConfig = new DatabaseConfig();
                dbConfig.setAllowCreate(true);
                dbConfig.setTransactional(false);
                //是否要延迟写
                dbConfig.setDeferredWrite(true);
    
                // 创建环境
                dbEnv = new BdbEnvironment(envFile, envConfig);
                // 打开数据库
                db = dbEnv.openDatabase(null, dbName, dbConfig);
                // 绑定数据库
                bindDatabase(db,valueClass,dbEnv.getClassCatalog());
    
            } catch (DatabaseNotFoundException e) {
                throw e;
            } catch (DatabaseExistsException e) {
                throw e;
            } catch (DatabaseException e) {
                throw e;
            } catch (IllegalArgumentException e) {
                throw e;
            }
    
    
        }
    
        /**
         * 值遍历器
         */
        @Override
        public Iterator<E> iterator() {
            return queueMap.values().iterator();
        }
        /**
         * 大小
         */
        @Override
        public int size() {
            synchronized(tailIndex){
                synchronized(headIndex){
                    return (int)(tailIndex.get()-headIndex.get());
                }
            }
        }
    
        /**
         * 插入值
         */
        @Override
        public boolean offer(E e) {
            synchronized(tailIndex){
                queueMap.put(tailIndex.getAndIncrement(), e);// 从尾部插入
    //            if (tailIndex.get()==0){
    //                //i++:先将值赋给再加1
    //                queueMap.put(tailIndex.get(), e);// 从0插入
    //            }else {
    //                //增加并获取++i;先增加再返回
    //                queueMap.put(tailIndex.incrementAndGet(), e);// 从尾部插入
    //            }
                //将数据不保存在缓冲区,直接存入磁盘
                dbEnv.sync();
            }
            return true;
        }
    
        /**
         * 获取值,从头部获取
         */
        @Override
        public E peek() {
            synchronized(headIndex){
                if(peekItem!=null){
                    return peekItem;
                }
                E headItem=null;
                while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围
                    headItem=queueMap.get(headIndex.get());
                    if(headItem!=null){
                        peekItem=headItem;
                        continue;
                    }
                    headIndex.incrementAndGet();    // 头部指针后移
                }
                return headItem;
            }
        }
    
        /**
         * 移出元素,移出头部元素
         */
        @Override
        public E poll() {
            synchronized(headIndex){
                E headItem=peek();
                if(headItem!=null){
                    queueMap.remove(headIndex.getAndIncrement());
                    //从磁盘上移除
                    dbEnv.sync();
                    peekItem=null;
                    return headItem;
                }
            }
            return null;
        }
    
        /**
         * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
         */
        public void close(){
            try {
                if(queueDb!=null){
                    queueDb.sync();
                    queueDb.close();
                }
    
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (UnsupportedOperationException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        /**
         * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
         */
        @Override
        public void clear() {
            try {
                close();
                if(dbEnv!=null&&queueDb!=null){
                    dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);
                    dbEnv.close();
                }
            } catch (DatabaseNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                try {
                    if(this.dbDir!=null){
                        FileUtils.deleteDirectory(new File(this.dbDir));
                    }
    
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }

    三、测试类-PersistenceQueueDemo

    public class PersistenceQueueDemo {
        public static void main(String[] args) {
            //String.class:value类型
            BdbPersistentQueue<String> queue = new BdbPersistentQueue<String>("D:\bdb", "test", String.class);
            queue.offer("first");
            queue.offer("double");
            queue.offer("String");
            //获取移除队列
            String p1 = queue.poll();
            String p2 = queue.poll();
            System.out.println(p1);
            System.out.println(p2);
            //获取不移除队列--每次取出的都是第一个元素
            //String p1 = queue.peek();
            //String p2 = queue.peek();
        }
    }

    1>第一遍执行在控制台中看到如下

    first
    double

    Process finished with exit code 0

    2>将插入数据库的方法注释掉,再执行一边

    String
    null

    Process finished with exit code 0

    3>再将上面注释的方法打开,再执行一遍,在控制台看到如下:

    first
    double

    Process finished with exit code 0

    由上述的三次操作,可以看出实现了队列的性质(FOFO-先进先出),并持久化到了磁盘中。

  • 相关阅读:
    KMP算法
    Python 正则表达式
    Python 装饰器
    C/C++ 之输入输出
    PAT(Basic Level)--个位数统计
    Java 接口与抽象类
    Java集合-01概述
    数据结构--红黑树
    数据结构--(AVL)平衡二叉树
    数据结构--二叉搜索树
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305577.html
Copyright © 2011-2022 走看看