zoukankan      html  css  js  c++  java
  • 一致性数据服务

    继承ActualDataModelOperationHandler类并使用updateData方法更新数据,使用readData方法获取数据。

    package com.yy.fastcustom.actualupdate;
    
    import java.util.concurrent.BlockingQueue;
    
    /**
     * Created by zzq on 2019/5/16.
     */
    public abstract class ActualDataModelOperationHandler<Data> {
        private String readWriteFlag;
    
        public final String getReadWriteFlag() {
            return readWriteFlag;
        }
    
        public final void setReadWriteFlag(String readWriteFlag) {
            this.readWriteFlag = readWriteFlag;
        }
    
        private long readWaitMillis;
    
        public final long getReadWaitMillis() {
            return readWaitMillis;
        }
    
        public final void setReadWaitMillis(long readWaitMillis) {
            this.readWaitMillis = readWaitMillis;
        }
    
        /**
         * 更新缓存信息
         *
         * @return 成功返回true
         */
        public abstract boolean removeCache();
    
        /**
         * 更新数据信息
         *
         * @return 成功返回true
         */
        public abstract boolean updateOriginalDataAndCache();
    
        /**
         * 综合读取数据(先从缓存读取,读不到在读数据库)
         *
         * @return
         */
        public abstract Data getOriginalData();
    
        public abstract Data getCacheData();
    
        public abstract void setCacheData(Data data);
    
        public final void updateData() throws Exception {
            BlockingQueue<ActualDataModelOperationHandler> actualDataModels = ActualDataServiceManager.getUpdateQueue(this.getReadWriteFlag());
            actualDataModels.put(this);
        }
    
        public final Data readData() {
            String flag = this.getReadWriteFlag();
            if (ActualDataServiceManager.contains(flag)) {
                long startTime = System.currentTimeMillis();//当前毫秒数
                for (; true; ) {
                    if (ActualDataServiceManager.noContains(flag) || (System.currentTimeMillis() - startTime > this.getReadWaitMillis())) //如果自旋时间大于设定阈值
                        break;
                    Data retData = this.getCacheData();
                    if (retData != null)
                        return retData;
                }
            }
            Data retData = this.getCacheData();
            if (retData != null)
                return retData;
            retData = this.getOriginalData();
            if (retData != null)
                this.setCacheData(retData);
            return retData;
        }
    }
    package com.yy.fastcustom.actualupdate;
    
    import java.util.*;
    import java.util.concurrent.*;
    
    /**
     * Created by zzq on 2019/5/16.
     */
    public class ActualDataServiceManager {
        private static volatile List<DataUpdateQueueWrapper> queueList = null;
        private static Map<String, Object> updateOperationTag = null;
        private static ExecutorService executor = null;
    
        public static void initQueueNum(int size) {
            if (queueList == null)
                synchronized (ActualDataServiceManager.class) {
                    if (queueList == null)
                        initBasicObject(size);
                }
        }
    
        private static void initBasicObject(int size) {
            int sizeExtend = size * 1000;//hash范围放大1000倍
            queueList = new ArrayList<>(sizeExtend);
            updateOperationTag = new ConcurrentHashMap<>();
            executor = new ThreadPoolExecutor(
                    size,
                    size,
                    60L,
                    TimeUnit.SECONDS,
                    new SynchronousQueue(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            DataUpdateQueueWrapper ary[] = new DataUpdateQueueWrapper[size];
            for (int i = 0; i < size; i++) {
                DataUpdateQueueWrapper dataUpdateQueueWrapper = new DataUpdateQueueWrapper();
                ary[i] = dataUpdateQueueWrapper;
                executor.submit(dataUpdateQueueWrapper);
            }
            for (int i = 0; i < sizeExtend; i++) {//生成虚拟hash节点
                queueList.add(ary[i % size]);
            }
        }
    
        /**
         * 根据标记获取队列
         *
         * @param flag
         * @return
         */
        public static BlockingQueue<ActualDataModelOperationHandler> getUpdateQueue(String flag) throws Exception {
            if (queueList == null) {
                throw new Exception("在调用该方法前,调用initQueueNum初始化更新队列个数");
            }
            int hash;
            int index = flag == null ? 0 : (hash = flag.hashCode()) ^ (hash >>> 16);
            return queueList.get((queueList.size() - 1) & index).getQueue();
        }
    
        public static void addFlag(String flag) {
            updateOperationTag.put(flag, "flagValue");
        }
    
        public static void removeFlag(String flag) {
            updateOperationTag.remove(flag);
        }
    
        public static boolean contains(String flag) {
            return updateOperationTag.containsKey(flag);
        }
    
        public static boolean noContains(String flag) {
            return !contains(flag);
        }
    }

    更新操作容器

    package com.yy.fastcustom.actualupdate;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * Created by zzq on 2019/5/16.
     */
    public class DataUpdateQueueWrapper implements Callable {
        private volatile BlockingQueue<ActualDataModelOperationHandler> updateQueue = null;
    
        {
            updateQueue = new LinkedBlockingQueue<>();
        }
    
        public BlockingQueue<ActualDataModelOperationHandler> getQueue() {
            return updateQueue;
        }
    
        @Override
        public Object call() throws Exception {
            for (; ; ) {
                ActualDataModelOperationHandler actualDataModel = updateQueue.take();
                String flag = actualDataModel.getReadWriteFlag();
                ActualDataServiceManager.addFlag(flag);//这里由单独的工作线程loop队列
                try {
                    boolean b1 = actualDataModel.removeCache();
                    if (b1)//如果缓存删除失败则,不执行更新数据库并刷新到缓存;缓存删除成功即使updateDbAndCache执行失败也能保证旧数据的一致性
                        actualDataModel.updateOriginalDataAndCache();
                } finally {
                    ActualDataServiceManager.removeFlag(flag);
                }
            }
        }
    }

    测试例子

    package com.yy.fastcustom.actualupdate;
    
    
    /**
     * Created by zzq on 2019/5/27/027.
     */
    public class TestDataModel extends ActualDataModelOperationHandler<String> {
        public static void main(String[] args) {
    //        ceshi_hash();
    
    
            String key = "1";
            ActualDataServiceManager.initQueueNum(3);
    
            final TestDataModel actualDataModel = new TestDataModel();
            actualDataModel.setReadWriteFlag(key);
            actualDataModel.setReadWaitMillis(20 * 1000);
    
            try {
                actualDataModel.updateData();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            new Thread(() -> {
                String ret = null;
                ret = actualDataModel.readData();
    
                System.out.println("=====end out====" + ret);
            }).start();
        }
    
        private static void ceshi_hash() {
            int p1 = 356349;
            int p2 = 453546;
            int p3 = 678345;
            int p4 = 768678;
            int p5 = 967897;
    
            int r1 = p1 & 2;
            int r2 = p2 & 2;
            int r3 = p3 & 2;
            int r4 = p4 & 2;
            int r5 = p5 & 2;
    
            int r1_ = p1 & 2999;
            int r2_ = p2 & 2999;
            int r3_ = p3 & 2999;
            int r4_ = p4 & 2999;
            int r5_ = p5 & 2999;
    
            System.out.println("====");
        }
    
        volatile String uu = null;
    
        @Override
        public boolean removeCache() {
            for (int i = 0; i < 5; i++) {
                System.out.println("====removeCache==缓存==");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return true;
        }
    
        @Override
        public boolean updateOriginalDataAndCache() {
            for (int i = 0; i < 5; i++) {
                System.out.println("====updateOriginalDataAndCache==数据库==");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (i == 1) {
                    uu = "获取到了缓存数据";
                }
            }
            return false;
        }
    
        @Override
        public void setCacheData(String s) {
            uu = s;
            System.out.println("====读取数据库成功,将其放入缓存==");
        }
    
        @Override
        public String getOriginalData() {
            return "没有找到缓存,从数据库加载!";
        }
    
        @Override
        public String getCacheData() {
            System.out.println("====读取缓存但没读取到==");
            return null;
        }
    }
  • 相关阅读:
    云原生 Serverless Database 使用体验
    如何画一张架构图(内含知识图谱)
    阿里云 Serverless Kubernetes 的落地实践分享
    【阿里云 CDP 公开课】 第二讲:CDH/HDP 何去何从
    阿里云数据治理系列(一):治理项目启动前的必答三问
    使用递归方法全选/反选TreeView中CheckBox子节点
    C# 安装部署项目 【转】
    HashTable存储树形数据
    Delphi中MessageBox用法【转】
    js nextSibling属性和previousSibling属性
  • 原文地址:https://www.cnblogs.com/zzq-include/p/11055252.html
Copyright © 2011-2022 走看看