zoukankan      html  css  js  c++  java
  • Storm-源码分析-LocalState (backtype.storm.utils)

    LocalState

    A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.

    基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景.

        public synchronized Map<Object, Object> snapshot() throws IOException {
            int attempts = 0;
            while(true) {
                String latestPath = _vs.mostRecentVersionPath();
                if(latestPath==null) return new HashMap<Object, Object>();
                try {
                    return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
                } catch(IOException e) {
                    attempts++;
                    if(attempts >= 10) {
                        throw e;
                    }
                }
            }

    读写操作都是基于map的操作, get和put, 但是put需要做persist操作.
    这里使用synchronized来做对象的线程间同步, 对于一个LocalState对象, 所有synchronized标有的函数只能被串行操作.

        public Object get(Object key) throws IOException {
            return snapshot().get(key);
        }
        public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
            Map<Object, Object> curr = snapshot();
            curr.put(key, val);
            persist(curr, cleanup);
        }

    当然不止这么简单, 为了达到atomic, 还使用了VersionedStore, 参考下一章
    persist不会去update现有的文件, 而是不断的产生递增version的文件, 故每一批更新都会产生一个新的文件

    把需要写入的数据序列化
    创建新的versionfile的path
    把数据写入versionfile
    调用succeedVersion, 创建tokenfile以标志versionfile的写入完成
    清除旧版本, 只保留4个版本

        private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
            byte[] toWrite = Utils.serialize(val);
            String newPath = _vs.createVersion();
            FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
            _vs.succeedVersion(newPath);
            if(cleanup) _vs.cleanup(4);
        }

     

    VersionedStore

        public VersionedStore(String path) throws IOException {
          _root = path;
          mkdirs(_root);
        }

    这个store, 其实就是_root目录下的一堆文件
    文件分两种,
    VersionFile, _root + version, 真正的数据存储文件
    TokenFile, _root + version + “.version”, 标志位文件, 标志version文件是否完成写操作, 以避免读到正在更新的文件

    getAllVersions就是读出所有_root目录下的所有完成写操作的文件, 读出version, 并做从大到小的排序

        public List<Long> getAllVersions() throws IOException {
            List<Long> ret = new ArrayList<Long>();
            for(String s: listDir(_root)) {
                if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
                    ret.add(validateAndGetVersion(s));
                }
            }
            Collections.sort(ret);
            Collections.reverse(ret);
            return ret;
        }

     

    找到最新的版本文件

        public Long mostRecentVersion() throws IOException {
            List<Long> all = getAllVersions();
            if(all.size()==0) return null;
            return all.get(0);

     

    创建新版本号, 用当前时间作为version

        public String createVersion() throws IOException {
            Long mostRecent = mostRecentVersion();
            long version = Time.currentTimeMillis();
            if(mostRecent!=null && version <= mostRecent) {
                version = mostRecent + 1;
            }
            return createVersion(version);
        }
    
        public String createVersion(long version) throws IOException {
            String ret = versionPath(version);
            if(getAllVersions().contains(version))
                throw new RuntimeException("Version already exists or data already exists");
            else
                return ret;
        }
     

    创建tokenfile, 以标记versionfile写完成

        public void succeedVersion(String path) throws IOException {
            long version = validateAndGetVersion(path);
            // should rewrite this to do a file move
            createNewFile(tokenPath(version));
        }
     

    清除旧的版本, 只保留versionsToKeep个, 清除操作就是删除versionfile和tokenfile

        public void cleanup(int versionsToKeep) throws IOException {
            List<Long> versions = getAllVersions();
            if(versionsToKeep >= 0) {
                versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
            }
            HashSet<Long> keepers = new HashSet<Long>(versions);
    
            for(String p: listDir(_root)) {
                Long v = parseVersion(p);
                if(v!=null && !keepers.contains(v)) {
                    deleteVersion(v);
                }
            }
        }
  • 相关阅读:
    Java-IO流-简介
    Java-异常处理-自定义异常
    致橡树-舒婷
    js字符串/数组常用方法总结
    使用vue-cli4快速搭建vue项目demo
    使用vue-cli4快速搭建vue项目demo
    小白第一次用MacOS
    文字背景对比度contrast ratio的计算公式
    如何使用 v-model 绑定一个 computed 属性?
    Java基础--数组
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3184305.html
Copyright © 2011-2022 走看看