zoukankan      html  css  js  c++  java
  • 美团Leaf——全局序列生成器

    Leaf的Github地址:

    https://github.com/Meituan-Dianping/Leaf

    Leaf美团技术团队博客地址:

    https://tech.meituan.com/2017/04/21/mt-leaf.html

    关于Leaf的使用手册、架构说明、Segment和Snowflake的特点和时钟回拨解决办法,参考上面的链接内容都能获得到答案。拒绝重复搬砖。

    在本篇博客里我想就Leaf的Segment模式的源码实现做个简单的注释。代码分支:master。

    1.Segment

    Segment是SegmentBuffer的成员属性,cache中存储的是SegmentBuffer,Segment是双buffer的实现。

    2.初始化Segment

    在初始化Segment时,主要做两件事情。1是根据数据库表中配置的busi_tag更新缓存;2是添加定时任务,定时(一分钟间隔)更新缓存。

        @Override
        public boolean init() {
            logger.info("Init ...");
            // 确保加载到kv后才初始化成功
            updateCacheFromDb();
            initOK = true;
            updateCacheFromDbAtEveryMinute();
            return initOK;
        }

    updateCacheFromDb():

    private void updateCacheFromDb() {
            logger.info("update cache from db");
            StopWatch sw = new Slf4JStopWatch();
            try {
                //从配置的数据源中加载biz_tag
                List<String> dbTags = dao.getAllTags();
                if (dbTags == null || dbTags.isEmpty()) {
                    return;
                }
                //cache中的biz_tag.初始为空.
                List<String> cacheTags = new ArrayList<String>(cache.keySet());
                //存储本次更新操作,要从DB中加载进cache的biz_tag.
                Set<String> insertTagsSet = new HashSet<>(dbTags);
                //存储失效的biz_tag:存在于cache,不存在于DB.
                Set<String> removeTagsSet = new HashSet<>(cacheTags);
                //过滤去重,得到需要存入进cache的biz_tag
                for(int i = 0; i < cacheTags.size(); i++){
                    String tmp = cacheTags.get(i);
                    if(insertTagsSet.contains(tmp)){
                        insertTagsSet.remove(tmp);
                    }
                }
                //存储进cache
                for (String tag : insertTagsSet) {
                    SegmentBuffer buffer = new SegmentBuffer();
                    buffer.setKey(tag);
                    Segment segment = buffer.getCurrent();
                    segment.setValue(new AtomicLong(0));
                    //这里的max、step均为0.所以这一步仅仅将biz_tag存储进了cache,并没有对SegmentBuffer执行初始化操作.
                    segment.setMax(0);
                    segment.setStep(0);
                    cache.put(tag, buffer);
                    logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
                }
                //cache中已失效的biz_tag从cache删除
                for(int i = 0; i < dbTags.size(); i++){
                    String tmp = dbTags.get(i);
                    if(removeTagsSet.contains(tmp)){
                        removeTagsSet.remove(tmp);
                    }
                }
                for (String tag : removeTagsSet) {
                    cache.remove(tag);
                    logger.info("Remove tag {} from IdCache", tag);
                }
            } catch (Exception e) {
                logger.warn("update cache from db exception", e);
            } finally {
                sw.stop("updateCacheFromDb");
            }
        }
    View Code

    3.获取到下一个序列号

        @Override
        public Result get(final String key) {
            if (!initOK) {
                return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
            }
            //判断cache中是否有这个key——bizTag
            if (cache.containsKey(key)) {
                SegmentBuffer buffer = cache.get(key);
                //未初始化,
                if (!buffer.isInitOk()) {
                    synchronized (buffer) {
                        //再次判断,以防重复初始化.
                        if (!buffer.isInitOk()) {
                            try {
                                //对SegmentBuffer当前的Segment进行初始化
                                updateSegmentFromDb(key, buffer.getCurrent());
                                logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                                buffer.setInitOk(true);
                            } catch (Exception e) {
                                logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                            }
                        }
                    }
                }
                return getIdFromSegmentBuffer(cache.get(key));
            }
            return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
        }

    updateSegmentFromDb();

    public void updateSegmentFromDb(String key, Segment segment) {
            StopWatch sw = new Slf4JStopWatch();
            SegmentBuffer buffer = segment.getBuffer();
            LeafAlloc leafAlloc;
            //1.SegmentBuffer尚未初始化,则SegmentBuffer的step等于数据库中的step;
            if (!buffer.isInitOk()) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else if (buffer.getUpdateTimestamp() == 0) {
                //2.SegmentBuffer已经初始化完成:
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else {
                //2.SegmentBuffer已经初始化完成:
                //a)    SegmentBuffer的存活时间小于15分钟:
                //   i.    如果SegmentBuffer当前的step*2大于最大值(一百万),则什么也不做,不再扩大step
                //  ii.    否则,SegmentBuffer的step扩大为原来的2倍。
                //b)    SegmentBuffer的存活时间小于30分钟:什么也不做.
                //c)    SegmentBuffer的存活时间在15至30分钟之间:
                //   i.    如果SegmentBuffer的step/2 大于等于数据库中的step,那么就将SegmentBuffer的step值变为原来的二分之一,否则什么也不做。
                long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
                int nextStep = buffer.getStep();
                if (duration < SEGMENT_DURATION) {
                    if (nextStep * 2 > MAX_STEP) {
                        //do nothing
                    } else {
                        nextStep = nextStep * 2;
                    }
                } else if (duration < SEGMENT_DURATION * 2) {
                    //do nothing with nextStep
                } else {
                    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
                }
                logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
                LeafAlloc temp = new LeafAlloc();
                temp.setKey(key);
                temp.setStep(nextStep);
                leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(nextStep);
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
            }
            // must set value before set max
            long value = leafAlloc.getMaxId() - buffer.getStep();
            segment.getValue().set(value);
            segment.setMax(leafAlloc.getMaxId());
            segment.setStep(buffer.getStep());
            sw.stop("updateSegmentFromDb", key + " " + segment);
        }
    View Code

    getIdFromSegmentBuffer();

        public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
            while (true) {
                //读锁
                buffer.rLock().lock();
                try {
                    //获取到当前正在使用的Segment.
                    final Segment segment = buffer.getCurrent();
                    //如果下一个Segment没有初始化完成,当前Segment的ID闲置数量小于9成,并且没有在执行Segment初始化操作,就去执行对下一个Segment的初始化.
                    //只要当前号段消费数量达到了10%,就对下一个号段进行初始化.
                    if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                        service.execute(new Runnable() {
                            @Override
                            public void run() {
                                Segment next = buffer.getSegments()[buffer.nextPos()];
                                boolean updateOk = false;
                                try {
                                    //根据biz_tag,根据DB对Segment初始化
                                    updateSegmentFromDb(buffer.getKey(), next);
                                    updateOk = true;
                                    logger.info("update segment {} from db {}", buffer.getKey(), next);
                                } catch (Exception e) {
                                    logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                                } finally {
                                    if (updateOk) {
                                        //获取写锁
                                        buffer.wLock().lock();
                                        buffer.setNextReady(true);
                                        buffer.getThreadRunning().set(false);
                                        buffer.wLock().unlock();
                                    } else {
                                        buffer.getThreadRunning().set(false);
                                    }
                                }
                            }
                        });
                    }
                    //Segment的value记录了当前已分配的ID最大值,加一后返回.原子性操作.
                    long value = segment.getValue().getAndIncrement();
                    //如果超过了当前Segment缓存的最大值(当前Segment号段已经消费完毕),就进入阻塞等待.
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                } finally {
                    buffer.rLock().unlock();
                }
                //当前号段已经消费完了,下个号段正在有别的线程在执行初始化,进入等待.
                //做空转.
                waitAndSleep(buffer);
                buffer.wLock().lock();
                //下面这段代码在做空转之前已经执行过,这里为什么还要再次执行呢?
                //在当前线程空转期间,可能已经有别的线程执行完毕了对下个Segment的初始化操作,并进行了切换.防止出现多次切换.
                try {
                    final Segment segment = buffer.getCurrent();
                    long value = segment.getValue().getAndIncrement();
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                    //当前Segment缓存的号段已经消费完了,下一个Segment初始化好了就进行切换
                    if (buffer.isNextReady()) {
                        //将Segment切换为下一个.
                        buffer.switchPos();
                        buffer.setNextReady(false);
                    } else {
                        //ERROR.当前Segment号段满了,下一个号段还未准备好.
                        logger.error("Both two segments in {} are not ready!", buffer);
                        return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                    }
                } finally {
                    buffer.wLock().unlock();
                }
            }
        }
    View Code
  • 相关阅读:
    unexpected inconsistency;run fsck manually esxi断电后虚拟机启动故障
    centos 安装mysql 5.7
    centos 7 卸载mysql
    centos7 在线安装mysql5.6,客户端远程连接mysql
    ubuntu 14.04配置ip和dns
    centos7 上搭建mqtt服务
    windows eclipse IDE打开当前类所在文件路径
    git 在非空文件夹clone新项目
    eclipse中java build path下 allow output folders for source folders 无法勾选,该如何解决 eclipse中java build path下 allow output folders for source folders 无法勾选,
    Eclipse Kepler中配置JadClipse
  • 原文地址:https://www.cnblogs.com/monument/p/13038131.html
Copyright © 2011-2022 走看看