zoukankan      html  css  js  c++  java
  • 美团Leaf——全局序列生成器

    Leaf的Github地址:

    https://github.com/Meituan-Dianping/Leaf

    Leaf美团技术团队博客地址:

    https://tech.meituan.com/2017/04/21/mt-leaf.html

    关于Leaf的使用手册、架构说明、Segment和Snowflake的特点和时钟回拨解决办法,参考上面的链接内容都能获得到答案。拒绝重复搬砖。

    在本篇博客里我想就Leaf的Segment模式的源码实现做个简单的注释。代码分支:master。

    1.Segment

    Segment是SegmentBuffer的成员属性,cache中存储的是SegmentBuffer,Segment是双buffer的实现。

    2.初始化Segment

    在初始化Segment时,主要做两件事情。1是根据数据库表中配置的busi_tag更新缓存;2是添加定时任务,定时(一分钟间隔)更新缓存。

        @Override
        public boolean init() {
            logger.info("Init ...");
            // 确保加载到kv后才初始化成功
            updateCacheFromDb();
            initOK = true;
            updateCacheFromDbAtEveryMinute();
            return initOK;
        }

    updateCacheFromDb():

    private void updateCacheFromDb() {
            logger.info("update cache from db");
            StopWatch sw = new Slf4JStopWatch();
            try {
                //从配置的数据源中加载biz_tag
                List<String> dbTags = dao.getAllTags();
                if (dbTags == null || dbTags.isEmpty()) {
                    return;
                }
                //cache中的biz_tag.初始为空.
                List<String> cacheTags = new ArrayList<String>(cache.keySet());
                //存储本次更新操作,要从DB中加载进cache的biz_tag.
                Set<String> insertTagsSet = new HashSet<>(dbTags);
                //存储失效的biz_tag:存在于cache,不存在于DB.
                Set<String> removeTagsSet = new HashSet<>(cacheTags);
                //过滤去重,得到需要存入进cache的biz_tag
                for(int i = 0; i < cacheTags.size(); i++){
                    String tmp = cacheTags.get(i);
                    if(insertTagsSet.contains(tmp)){
                        insertTagsSet.remove(tmp);
                    }
                }
                //存储进cache
                for (String tag : insertTagsSet) {
                    SegmentBuffer buffer = new SegmentBuffer();
                    buffer.setKey(tag);
                    Segment segment = buffer.getCurrent();
                    segment.setValue(new AtomicLong(0));
                    //这里的max、step均为0.所以这一步仅仅将biz_tag存储进了cache,并没有对SegmentBuffer执行初始化操作.
                    segment.setMax(0);
                    segment.setStep(0);
                    cache.put(tag, buffer);
                    logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
                }
                //cache中已失效的biz_tag从cache删除
                for(int i = 0; i < dbTags.size(); i++){
                    String tmp = dbTags.get(i);
                    if(removeTagsSet.contains(tmp)){
                        removeTagsSet.remove(tmp);
                    }
                }
                for (String tag : removeTagsSet) {
                    cache.remove(tag);
                    logger.info("Remove tag {} from IdCache", tag);
                }
            } catch (Exception e) {
                logger.warn("update cache from db exception", e);
            } finally {
                sw.stop("updateCacheFromDb");
            }
        }
    View Code

    3.获取到下一个序列号

        @Override
        public Result get(final String key) {
            if (!initOK) {
                return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
            }
            //判断cache中是否有这个key——bizTag
            if (cache.containsKey(key)) {
                SegmentBuffer buffer = cache.get(key);
                //未初始化,
                if (!buffer.isInitOk()) {
                    synchronized (buffer) {
                        //再次判断,以防重复初始化.
                        if (!buffer.isInitOk()) {
                            try {
                                //对SegmentBuffer当前的Segment进行初始化
                                updateSegmentFromDb(key, buffer.getCurrent());
                                logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                                buffer.setInitOk(true);
                            } catch (Exception e) {
                                logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                            }
                        }
                    }
                }
                return getIdFromSegmentBuffer(cache.get(key));
            }
            return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
        }

    updateSegmentFromDb();

    public void updateSegmentFromDb(String key, Segment segment) {
            StopWatch sw = new Slf4JStopWatch();
            SegmentBuffer buffer = segment.getBuffer();
            LeafAlloc leafAlloc;
            //1.SegmentBuffer尚未初始化,则SegmentBuffer的step等于数据库中的step;
            if (!buffer.isInitOk()) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else if (buffer.getUpdateTimestamp() == 0) {
                //2.SegmentBuffer已经初始化完成:
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else {
                //2.SegmentBuffer已经初始化完成:
                //a)    SegmentBuffer的存活时间小于15分钟:
                //   i.    如果SegmentBuffer当前的step*2大于最大值(一百万),则什么也不做,不再扩大step
                //  ii.    否则,SegmentBuffer的step扩大为原来的2倍。
                //b)    SegmentBuffer的存活时间小于30分钟:什么也不做.
                //c)    SegmentBuffer的存活时间在15至30分钟之间:
                //   i.    如果SegmentBuffer的step/2 大于等于数据库中的step,那么就将SegmentBuffer的step值变为原来的二分之一,否则什么也不做。
                long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
                int nextStep = buffer.getStep();
                if (duration < SEGMENT_DURATION) {
                    if (nextStep * 2 > MAX_STEP) {
                        //do nothing
                    } else {
                        nextStep = nextStep * 2;
                    }
                } else if (duration < SEGMENT_DURATION * 2) {
                    //do nothing with nextStep
                } else {
                    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
                }
                logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
                LeafAlloc temp = new LeafAlloc();
                temp.setKey(key);
                temp.setStep(nextStep);
                leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(nextStep);
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
            }
            // must set value before set max
            long value = leafAlloc.getMaxId() - buffer.getStep();
            segment.getValue().set(value);
            segment.setMax(leafAlloc.getMaxId());
            segment.setStep(buffer.getStep());
            sw.stop("updateSegmentFromDb", key + " " + segment);
        }
    View Code

    getIdFromSegmentBuffer();

        public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
            while (true) {
                //读锁
                buffer.rLock().lock();
                try {
                    //获取到当前正在使用的Segment.
                    final Segment segment = buffer.getCurrent();
                    //如果下一个Segment没有初始化完成,当前Segment的ID闲置数量小于9成,并且没有在执行Segment初始化操作,就去执行对下一个Segment的初始化.
                    //只要当前号段消费数量达到了10%,就对下一个号段进行初始化.
                    if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                        service.execute(new Runnable() {
                            @Override
                            public void run() {
                                Segment next = buffer.getSegments()[buffer.nextPos()];
                                boolean updateOk = false;
                                try {
                                    //根据biz_tag,根据DB对Segment初始化
                                    updateSegmentFromDb(buffer.getKey(), next);
                                    updateOk = true;
                                    logger.info("update segment {} from db {}", buffer.getKey(), next);
                                } catch (Exception e) {
                                    logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                                } finally {
                                    if (updateOk) {
                                        //获取写锁
                                        buffer.wLock().lock();
                                        buffer.setNextReady(true);
                                        buffer.getThreadRunning().set(false);
                                        buffer.wLock().unlock();
                                    } else {
                                        buffer.getThreadRunning().set(false);
                                    }
                                }
                            }
                        });
                    }
                    //Segment的value记录了当前已分配的ID最大值,加一后返回.原子性操作.
                    long value = segment.getValue().getAndIncrement();
                    //如果超过了当前Segment缓存的最大值(当前Segment号段已经消费完毕),就进入阻塞等待.
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                } finally {
                    buffer.rLock().unlock();
                }
                //当前号段已经消费完了,下个号段正在有别的线程在执行初始化,进入等待.
                //做空转.
                waitAndSleep(buffer);
                buffer.wLock().lock();
                //下面这段代码在做空转之前已经执行过,这里为什么还要再次执行呢?
                //在当前线程空转期间,可能已经有别的线程执行完毕了对下个Segment的初始化操作,并进行了切换.防止出现多次切换.
                try {
                    final Segment segment = buffer.getCurrent();
                    long value = segment.getValue().getAndIncrement();
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                    //当前Segment缓存的号段已经消费完了,下一个Segment初始化好了就进行切换
                    if (buffer.isNextReady()) {
                        //将Segment切换为下一个.
                        buffer.switchPos();
                        buffer.setNextReady(false);
                    } else {
                        //ERROR.当前Segment号段满了,下一个号段还未准备好.
                        logger.error("Both two segments in {} are not ready!", buffer);
                        return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                    }
                } finally {
                    buffer.wLock().unlock();
                }
            }
        }
    View Code
  • 相关阅读:
    【图片加载大小优化】
    img标签实现和背景图一样的显示效果——object-fit和object-position
    【ios bug解决】 输入框聚焦时光标不显示
    service worker 实现页面通信
    【获取url 问号后参数】防中文乱码
    js去掉url后某参数【函数封装】
    ES6字符串模板
    ES6扩展运算符和rest运算符
    ES6变量的解构赋值
    ES6新的声明方式,var let const三种声明方式的区别
  • 原文地址:https://www.cnblogs.com/monument/p/13038131.html
Copyright © 2011-2022 走看看