zoukankan      html  css  js  c++  java
  • 如何构建“高性能”“大小无限”(磁盘)队列?

    假设场景:

      1. 针对一个高并发的应用,你是否会选择打印访问日志?
      2. 针对分布式的应用,你是否会选择将所有日志打印到日志中心?

    解决方案:

      1. 如果如果你选择为了性能,不打印日志,那无可厚非。但是你得考虑清楚,出问题的时候是否能够做到快速排查?
      2. 你觉得日志分布在各台机器上很方便,那不用日志中心也行!

      如果,你还是会选择打印大量的访问日志,如果你还是会选择打印日志到日志中心,那么本文对你有用!

      如果自己实现一个日志中心,不说很难吧,也还是要费很大力气的,比如性能,比如容量大小!

      所以,本文选择阿里云的 loghub 作为日志中心,收集所有日志!

    loghub 常规操作:

      在提出本文主题之前,咱们要看看loghub自己的方式,以及存在的问题!
      在官方接入文档里,就建议咱们使用 logProducer 接入。

      其实 logProducer 已经做了太多的优化,比如当日志包数据达到一定数量,才统一进行发送,异步发送等等!

      至于为什么还会存在本篇文章,则是由于这些优化还不够,比如 这些日志发送仍然会影响业务性能,仍然会受到内存限制,仍然会抢占大量cpu。。。

      好吧,接入方式:

      1. 引入maven依赖:

            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>aliyun-log-logback-appender</artifactId>
                <version>0.1.13</version>
            </dependency>

      2. logback中添加appender:

        <appender name="LOGHUB-APPENDER" class="appender:com.aliyun.openservices.log.logback.LoghubAppender">
            <endpoint>${loghub.endpoint}</endpoint>
            <accessKeyId>${loghub.accessKeyId}</accessKeyId>
            <accessKey>${loghub.accessKey}</accessKey>
            <projectName>${loghub.projectName}</projectName>
            <logstore>test-logstore</logstore>
            <topic>${loghub.topic}</topic>
            <packageTimeoutInMS>1500</packageTimeoutInMS>
            <logsCountPerPackage>4096</logsCountPerPackage>
            <!-- 4718592=4M, 3145728=3M, 2097152=2M -->
            <logsBytesPerPackage>3145728</logsBytesPerPackage>
            <!-- 17179869184=2G(溢出丢弃) , 104857600=12.5M, 2147483647=2G, 536870912=512M-->
            <memPoolSizeInByte>536870912</memPoolSizeInByte>
            <retryTimes>1</retryTimes>
            <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool>
            <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
                <level>INFO</level>
            </filter>
        </appender>
        <root level="${logging.level}">
            <appender-ref ref="STDOUT"/>
            <appender-ref ref="LOGHUB-APPENDER" />
        </root>

      3. 在代码中进行日志打印:

        private static Logger logger = LoggerFactory.getLogger(MyClass.class);
        logger.warn("give me five: {}", name);

    看似高效接入,存在的问题:

      1. loghub日志的发送是异步的没错,但是当发送网络很慢时,将会出现大量内存堆积;
      2. 堆积也不怕,如上配置,当堆积内存达到一定限度时,就不会再大了。他是怎么办到的?其实就是通过一个锁,将后续所有请求全部阻塞了,这想想都觉得可怕;
      3. 网络慢我们可以多开几个发送线程嘛,是的,这样能在一定程度上缓解发送问题,但是基本也无补,另外,日志发送线程开多之后,线程的调度将会更可怕,而这只是一个可有可无的功能而已啊;

    针对以上问题,我们能做什么?

      1. 去除不必要的日志打印,这不是废话嘛,能这么干早干了!
      2. 在网络慢的时候,减少日志打印;这有点牵强,不过可以试试!
      3. 直接使用异步线程进行日志接收和发送,从根本上解决问题!
      4. 如果使用异步线程进行发送,那么当日志大量堆积怎么办?
      5. 使用本地文件存储需要进行发送的日志,解决大量日志堆积问题,待网络畅通后,快速发送!

      考虑到使用异步线程发送日志、使用本地磁盘存储大量日志堆积,问题应该基本都解决了!
      但是具体怎么做呢?
      如何异步?
      如何存储磁盘?

      这些都是很现实的问题!

      如果看到这里,觉得很low的同学,基本可以撤了!

    下面我们来看看具体实施方案:

    1. 如何异步?

      能想像到的,基本就是使用一个队列来接收日志写请求,然后,开另外的消费线程进行消费即可!

      但是,这样会有什么问题?因为外部请求访问进来,都是并发的,这个队列得线程安全吧!用 synchronized ? 用阻塞队列?

      总之,看起来都会有一个并行转串行的问题,这会给应用并发能力带去打击的!

      所以,我们得减轻这锁的作用。我们可以使用多个队列来解决这个问题,类似于分段锁!如果并发能力不够,则增加锁数量即可!

      说起来还是很抽象吧,现成的代码撸去吧!

      1. 覆盖原来的 logProducer 的 appender, 使用自己实现的appender, 主要就是解决异步问题:

        <appender name="LOGHUB-APPENDER" class="com.test.AsyncLoghubAppender">
            <endpoint>${loghub.endpoint}</endpoint>
            <accessKeyId>${loghub.accessKeyId}</accessKeyId>
            <accessKey>${loghub.accessKey}</accessKey>
            <projectName>${loghub.projectName}</projectName>
            <logstore>apollo-alarm</logstore>
            <topic>${loghub.topic}</topic>
            <packageTimeoutInMS>1500</packageTimeoutInMS>
            <logsCountPerPackage>4096</logsCountPerPackage>
            <!-- 4718592=4M, 3145728=3M, 2097152=2M -->
            <logsBytesPerPackage>3145728</logsBytesPerPackage>
            <!-- 17179869184=2G(溢出丢弃) , 104857600=12.5M, 2147483647=2G, 536870912=512M-->
            <memPoolSizeInByte>536870912</memPoolSizeInByte>
            <retryTimes>1</retryTimes>
            <maxIOThreadSizeInPool>6</maxIOThreadSizeInPool>
            <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
                <level>INFO</level>
            </filter>
        </appender>

      2. 接下来就是核心的异步实现: AsyncLoghubAppender

    import ch.qos.logback.classic.spi.IThrowableProxy;
    import ch.qos.logback.classic.spi.LoggingEvent;
    import ch.qos.logback.classic.spi.StackTraceElementProxy;
    import ch.qos.logback.classic.spi.ThrowableProxyUtil;
    import ch.qos.logback.core.CoreConstants;
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.fastjson.util.IOUtils;
    import com.aliyun.openservices.log.common.LogItem;
    import com.aliyun.openservices.log.logback.LoghubAppender;
    import com.aliyun.openservices.log.logback.LoghubAppenderCallback;
    import com.test.biz.cache.LocalDiskEnhancedQueueManager;
    import com.test.biz.cache.LocalDiskEnhancedQueueManagerFactory;
    import com.test.model.LoghubItemsWrapper;
    import com.taobao.notify.utils.threadpool.NamedThreadFactory;
    import org.joda.time.DateTime;
    
    import java.time.LocalDateTime;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * 异步写loghub appender, 解决框架的appender 无法承受高并发写的问题
     *
     */
    public class AsyncLoghubAppender<E> extends LoghubAppender<E> {
    
        /**
         * put 线程,从业务线程接收消息过来
         */
        private ExecutorService puterExecutor;
    
        /**
         * 队列搬运线程执行器
         */
        private ExecutorService takerExecutor;
    
        /**
         * mapdb 操作脚手架
         */
        private LocalDiskEnhancedQueueManager localDiskEnhancedQueueManager;
    
        /**
         * 日志消息传球手
         */
        private List<LinkedBlockingQueue<LoghubItemsWrapper>> distributeLogItemPoster;
    
        // puter 的线程数,与cpu核数保持一致
        private final int puterThreadNum = 4;
    
        // taker 的线程数,可以稍微少点
        private final int takerThreadNum = 1;
    
        @Override
        public void start() {
            super.start();
            // 开启单个put 线程
            doStart();
        }
    
        private void doStart() {
            initMapDbQueue();
            initPosterQueue();
            startPutterThread();
            startTakerThread();
        }
    
        /**
         * 初始化 mapdb 数据库
         */
        private void initMapDbQueue() {
            localDiskEnhancedQueueManager = LocalDiskEnhancedQueueManagerFactory.newMapDbQueue();
        }
    
        /**
         * 初始化消息传球手数据
         */
        private void initPosterQueue() {
            distributeLogItemPoster = new ArrayList<>();
            for(int i = 0; i < puterThreadNum; i++) {
                distributeLogItemPoster.add(new LinkedBlockingQueue<>(10000000));
            }
        }
    
        /**
         * 开启 putter 线程组,此线程组不应慢于业务线程太多,否则导致内存溢出
         */
        private void startPutterThread() {
            puterExecutor = Executors.newFixedThreadPool(puterThreadNum,
                    new NamedThreadFactory("Async-LoghubItemPoster"));
            for(int i = 0; i < puterThreadNum; i++) {
                puterExecutor.execute(new InnerQueuePuterThread(distributeLogItemPoster.get(i)));
            }
        }
    
        /**
         * 初始化取数线程组,此线程组可以运行慢
         */
        private void startTakerThread() {
            takerExecutor = Executors.newFixedThreadPool(takerThreadNum,
                    new NamedThreadFactory("Async-LoghubAppender"));
            for(int i = 0; i < takerThreadNum; i++) {
                takerExecutor.execute(new InnerQueueTakerThread());
            }
        }
    
        @Override
        public void stop() {
            super.stop();
            localDiskEnhancedQueueManager.close();
        }
        
        // copy from parent
        @Override
        public void append(E eventObject) {
            try {
                appendEvent(eventObject);
            } catch (Exception e) {
                addError("Failed to append event.", e);
            }
        }
    
        /**
         * 优雅停机
         */
        public void shutdown() {
            puterExecutor.shutdown();
            try {
                puterExecutor.awaitTermination(60, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                addError("【日志appender】loghub shutdown interupt", e);
                Thread.currentThread().interrupt();
            }
        }
    
        // modify from parent
        private void appendEvent(E eventObject) {
            //init Event Object
            if (!(eventObject instanceof LoggingEvent)) {
                return;
            }
            LoggingEvent event = (LoggingEvent) eventObject;
    
            List<LogItem> logItems = new ArrayList<>();
            LogItem item = new LogItem();
            logItems.add(item);
            item.SetTime((int) (event.getTimeStamp() / 1000));
    
            DateTime dateTime = new DateTime(event.getTimeStamp());
            item.PushBack("time", dateTime.toString(formatter));
            item.PushBack("level", event.getLevel().toString());
            item.PushBack("thread", event.getThreadName());
    
            StackTraceElement[] caller = event.getCallerData();
            if (caller != null && caller.length > 0) {
                item.PushBack("location", caller[0].toString());
            }
    
            String message = event.getFormattedMessage();
            item.PushBack("message", message);
    
            IThrowableProxy iThrowableProxy = event.getThrowableProxy();
            if (iThrowableProxy != null) {
                String throwable = getExceptionInfo(iThrowableProxy);
                throwable += fullDump(event.getThrowableProxy().getStackTraceElementProxyArray());
                item.PushBack("throwable", throwable);
            }
    
            if (this.encoder != null) {
                // 框架也未处理好该问题,暂时忽略
    //            item.PushBack("log", new String(this.encoder.encode(eventObject)));
            }
    
            LoghubItemsWrapper itemWrapper = new LoghubItemsWrapper();
            itemWrapper.setLogItemList(logItems);
            putItemToPoster(itemWrapper);
    
        }
    
        /**
         * 将队列放入 poster 中
         *
         * @param itemsWrapper 日志信息
         */
        private void putItemToPoster(LoghubItemsWrapper itemsWrapper) {
            try {
                LinkedBlockingQueue<LoghubItemsWrapper> selectedQueue = getLoadBalancedQueue();
                selectedQueue.put(itemsWrapper);
            } catch (InterruptedException e) {
                addError("【日志appender】放入队列中断");
                Thread.currentThread().interrupt();
            }
        }
    
        /**
         * 选择一个队列进行日志放入
         *
         * @return 队列容器
         */
        private LinkedBlockingQueue<LoghubItemsWrapper> getLoadBalancedQueue() {
            long selectQueueIndex = System.nanoTime() % distributeLogItemPoster.size();
            return distributeLogItemPoster.get((int) selectQueueIndex);
        }
    
        // copy from parent
        private String fullDump(StackTraceElementProxy[] stackTraceElementProxyArray) {
            StringBuilder builder = new StringBuilder();
            for (StackTraceElementProxy step : stackTraceElementProxyArray) {
                builder.append(CoreConstants.LINE_SEPARATOR);
                String string = step.toString();
                builder.append(CoreConstants.TAB).append(string);
                ThrowableProxyUtil.subjoinPackagingData(builder, step);
            }
            return builder.toString();
        }
    
        // copy from parent
        private String getExceptionInfo(IThrowableProxy iThrowableProxy) {
            String s = iThrowableProxy.getClassName();
            String message = iThrowableProxy.getMessage();
            return (message != null) ? (s + ": " + message) : s;
        }
    
        class InnerQueuePuterThread implements Runnable {
    
            private LinkedBlockingQueue<LoghubItemsWrapper> queue;
    
            public InnerQueuePuterThread(LinkedBlockingQueue<LoghubItemsWrapper> queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                // put the item to mapdb
                while (!Thread.interrupted()) {
                    LoghubItemsWrapper itemsWrapper = null;
                    try {
                        itemsWrapper = queue.take();
                    } catch (InterruptedException e) {
                        addError("【日志appender】poster队列中断");
                        Thread.currentThread().interrupt();
                    }
                    if(itemsWrapper != null) {
                        flushLogItemToMapDb(itemsWrapper);
                    }
                }
            }
    
            /**
             * 将内存队列存储到 mapdb 中, 由消费线程获取
             *
             * @param itemsWrapper 日志信息
             */
            private void flushLogItemToMapDb(LoghubItemsWrapper itemsWrapper) {
                byte[] itemBytes = JSONObject.toJSONBytes(itemsWrapper.getLogItemList());
                localDiskEnhancedQueueManager.push(itemBytes);
            }
        }
    
        /**
         * for debug, profiler for mapdb
         */
        private static final AtomicLong takerCounter = new AtomicLong(0);
    
        class InnerQueueTakerThread implements Runnable {
    
            @Override
            public void run() {
                long startTime = System.currentTimeMillis();
                while (!Thread.interrupted()) {
                    //item = fullLogQueues.take();      // take items without lock
                    try {
                        while (localDiskEnhancedQueueManager.isEmpty()) {
                            Thread.sleep(100L);
                        }
                    }
                    catch (InterruptedException e) {
                        addError("【日志appender】中断异常", e);
                        Thread.currentThread().interrupt();
                        break;
                    }
                    byte[] itemBytes = localDiskEnhancedQueueManager.pollFirstItem();
                    try {
                        if(itemBytes != null
                                && itemBytes != localDiskEnhancedQueueManager.EMPTY_VALUE_BYTE_ARRAY) {
                            List<LogItem> itemWrapper = JSONObject.parseArray(
                                    new String(itemBytes, IOUtils.UTF8),
                                    LogItem.class);
                            if(itemWrapper != null) {
                                doSend(itemWrapper);
                            }
                        }
                        else {
                            // 如果数据不为空,且一直在循环,说明存在异常,暂时处理为重置队列,但应从根本上解决问题
                            localDiskEnhancedQueueManager.reset();
                        }
                    }
                    catch (Exception e) {
                        addError("【日志appender】json解析异常", e);
                    }
                    // for debug test, todo: 上线时去除该代码
                    if(takerCounter.incrementAndGet() % 1000 == 0) {
                        System.out.println(LocalDateTime.now() + " - "
                                + Thread.currentThread().getName() + ": per 1000 items took time: "
                                + (System.currentTimeMillis() - startTime) + " ms.");
                        startTime = System.currentTimeMillis();
                    }
                }
            }
    
            /**
             * 发送数据逻辑,主要为 loghub
             *
             * @param item logItem
             */
            private void doSend(List<LogItem> item) {
                AsyncLoghubAppender.this.doSendToLoghub(item);
            }
        }
    
        /**
         * 发送数据逻辑,loghub
         *
         * @param item logItem
         */
        private void doSendToLoghub(List<LogItem> item) {
            producer.send(projectConfig.projectName, logstore, topic, source, item,
                    new LoghubAppenderCallback<>(AsyncLoghubAppender.this,
                            projectConfig.projectName, logstore, topic, source, item));
        }
    
    }

      如上实现,简单说明下:

      1. 开启n个消费线程的 distributeLogItemPoster 阻塞队列,用于接收业务线程发来的日志请求;
      2. 开启n个消费线程, 将从业务线程接收过来的请求队列,放入磁盘队列中,从而避免可能内存溢出;
      3. 开启m个taker线程,从磁盘队列中取出数据,进行loghub的发送任务;

      如上,我们已经完全将日志的发送任务转移到异步来处理了!

      但是,这样真的就ok了吗?磁盘队列是什么?可靠吗?性能如何?线程安全吗?

    2. 如何存储磁盘队列?

      好吧。咱们这里使用的是 mapdb 来实现的磁盘队列, mapdb 的 github star数超3k, 应该还是不错了!

      但是,它更多的是用来做磁盘缓存,队列并没有过多关注,不管怎么样,我们还是可以选择的!

      mapdb项目地址: https://github.com/jankotek/mapdb

      其实mapdb有几个现成的队列可用: IndexTreeList, TreeSet. 但是我们仔细看下他的官宣,看到这些数据结构只支持少量数据时的存储,在数据量巨大之后,性能完全无法保证,甚至 poll 一个元素需要1s+ 。

      所以,还得抛弃其队列实现,只是自己实现一个了,其 HashTree 是个不错的选择, 使用 HashTree 来实现队列,唯一的难点在于,如何进行元素迭代;(大家不仿先自行思考下)

      下面我们来看下我的一个实现方式:

    import com.test.biz.cache.LocalDiskEnhancedQueueManager;
    import com.taobao.notify.utils.threadpool.NamedThreadFactory;
    import org.mapdb.BTreeMap;
    import org.mapdb.DB;
    import org.mapdb.DBMaker;
    import org.mapdb.Serializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.NavigableSet;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * MapDb 实现的内存队列工具类
     *
     */
    public class LocalDiskEnhancedQueueManagerMapDbImpl implements LocalDiskEnhancedQueueManager {
    
        private static final Logger logger = LoggerFactory.getLogger(LocalDiskEnhancedQueueManagerMapDbImpl.class);
    
        /**
         * 默认存储文件
         */
        private final String DEFAULT_DB_FILE = "/opt/mapdb/logappender.db";
    
        /**
         * 队列名
         */
        private final String LOG_ITEM_LIST_TABLE = "hub_log_appender";
        private final String LOG_ITEM_TREE_SET_TABLE = "hub_log_appender_tree_set";
        private final String LOG_ITEM_HASH_MAP_TABLE = "hub_log_appender_hash_map";
        private final String LOG_ITEM_BTREE_TABLE = "hub_log_appender_btree";
    
        private final String QUEUE_OFFSET_HOLDER_BTREE_TABLE = "queue_offset_holder_btree_table";
    
        /**
         * db 实例
         */
        private final DB mapDb;
    
    //    private IndexTreeList<byte[]> indexTreeListQueue;
    
        /**
         * 假装是个队列
         */
        private NavigableSet<byte[]> treeSetQueue;
        private BTreeMap<byte[], Byte> bTreeQueue;
        private ConcurrentMap<Long, byte[]> concurrentMapQueue;
    
        /**
         * 队列偏移量持有器, 对于小容量的节点使用 btree 处理很好
         */
        private BTreeMap<String, Long> queueOffsetDiskHolder;
    
        /**
         * 读队列偏移器, jvm 运行时使用该值, 该值被定时刷新到 mapdb 中
         *
         *      会有部分数据重复情况
         *
         */
        private AtomicLong readerOfQueueOffsetJvmHolder;
    
        /**
         * 写队列偏移器, jvm 运行时使用该值, 该值被定时刷新到 mapdb 中
         *
         *      会有部分数据重复情况
         */
        private AtomicLong writerOfQueueOffsetJvmHolder;
    
        private final String readerOffsetCacheKeyName = "loghub_appender_queue_key_read_offset";
        private final String writerOffsetCacheKeyName = "loghub_appender_queue_key_write_offset";
    
        /**
         * mapdb 构造方法,给出队列持有者
         *
         */
        public LocalDiskEnhancedQueueManagerMapDbImpl() {
            mapDb = DBMaker.fileDB(getDbFilePath())
                    .checksumHeaderBypass()
                    .closeOnJvmShutdown()
                    .fileChannelEnable()
                    .fileMmapEnableIfSupported()
                    // 尝试修复删除元素后磁盘文件大小不变化的bug
                     .cleanerHackEnable()
                    .concurrencyScale(128)
                    .make();
            initQueueOffsetHolder();
            initQueueOwner();
            initCleanUselessSpaceJob();
        }
    
        /**
         * 初始化队列偏移器
         */
        private void initQueueOffsetHolder() {
            queueOffsetDiskHolder = mapDb.treeMap(QUEUE_OFFSET_HOLDER_BTREE_TABLE,
                                        Serializer.STRING, Serializer.LONG)
                                        .createOrOpen();
            initQueueReaderOffset();
            initQueueWriterOffset();
        }
    
        /**
         * 初始化读偏移数据
         */
        private void initQueueReaderOffset() {
            Long readerQueueOffsetFromDisk = queueOffsetDiskHolder.get(readerOffsetCacheKeyName);
            if(readerQueueOffsetFromDisk == null) {
                readerOfQueueOffsetJvmHolder = new AtomicLong(1);
            }
            else {
                readerOfQueueOffsetJvmHolder = new AtomicLong(readerQueueOffsetFromDisk);
            }
        }
    
        /**
         * 初始化写偏移数据
         */
        private void initQueueWriterOffset() {
            Long writerQueueOffsetFromDisk = queueOffsetDiskHolder.get(writerOffsetCacheKeyName);
            if(writerQueueOffsetFromDisk == null) {
                writerOfQueueOffsetJvmHolder = new AtomicLong(1);
            }
            else {
                writerOfQueueOffsetJvmHolder = new AtomicLong(writerQueueOffsetFromDisk);
            }
        }
    
        /**
         * 刷入最新的读偏移
         */
        private void flushQueueReaderOffset() {
            queueOffsetDiskHolder.put(readerOffsetCacheKeyName, readerOfQueueOffsetJvmHolder.get());
        }
    
        /**
         * 刷入最新的读偏移
         */
        private void flushQueueWriterOffset() {
            queueOffsetDiskHolder.put(writerOffsetCacheKeyName, writerOfQueueOffsetJvmHolder.get());
        }
    
        /**
         * 初始化队列容器
         */
        private void initQueueOwner() {
    //        indexTreeListQueue = db.indexTreeList(LOG_ITEM_LIST_TABLE, Serializer.BYTE_ARRAY).createOrOpen();
    //        bTreeQueue = mapDb.treeMap(LOG_ITEM_BTREE_TABLE,
    //                                            Serializer.BYTE_ARRAY, Serializer.BYTE)
    //                                            .counterEnable()
    //                                            .valuesOutsideNodesEnable()
    //                                            .createOrOpen();
    //        treeSetQueue = mapDb.treeSet(LOG_ITEM_TREE_SET_TABLE, Serializer.BYTE_ARRAY)
    //                                            .createOrOpen();
            concurrentMapQueue = mapDb.hashMap(LOG_ITEM_HASH_MAP_TABLE, Serializer.LONG, Serializer.BYTE_ARRAY)
                                            .counterEnable()
                                            // 当处理能力很差时,就将该日志打印丢掉
                                            .expireMaxSize(100 * 10000 * 10000L)
                                            // 3小时后还没消费就过期了
                                            .expireAfterCreate(3L, TimeUnit.HOURS)
                                            .expireAfterGet()
                                            .createOrOpen();
        }
    
        /**
         * 清理无用空间,如磁盘文件等等
         */
        private void initCleanUselessSpaceJob() {
            ScheduledExecutorService scheduledExecutorService =
                    Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Async-MapDbSpaceCleaner"));
            // 每过10分钟清理一次无用空间,看情况调整
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                mapDb.getStore().compact();
            }, 0L, 10L, TimeUnit.MINUTES);
    
            // 每过10s刷入一次读写偏移,允许重复和丢失
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                flushQueueWriterOffset();
                flushQueueReaderOffset();
            }, 30L, 10L, TimeUnit.SECONDS);
        }
    
        /**
         * 获取文件存储位置,考虑后续扩展被子类覆盖
         *
         * @return db文件地址
         */
        protected String getDbFilePath() {
            return DEFAULT_DB_FILE;
        }
    
        /**
         * 获取下一个队列读编号 (确保准确性可承受,性能可承受)
         *
         * @return 队列编号
         */
        private long getNextReaderId() {
            return readerOfQueueOffsetJvmHolder.incrementAndGet();
        }
    
        /**
         * 获取下一个队列写编号 (确保准确性可承受,性能可承受)
         *
         * @return 队列编号
         */
        private long getNextWriterId() {
            return writerOfQueueOffsetJvmHolder.incrementAndGet();
        }
    
        @Override
        public boolean push(byte[] itemBytes) {
    //        return indexTreeListQueue.add(itemBytes);
    //        bTreeQueue.put(itemBytes, (byte)1 );
    //        treeSetQueue.add(itemBytes);
            concurrentMapQueue.put(getNextWriterId(), itemBytes);
            return true;
        }
    
        @Override
        public byte[] pollFirstItem() {
            // 使用时不得使用修改元素方法
    //        return indexTreeListQueue.remove(index);
    //         Map.Entry<byte[], Byte> entry = bTreeQueue.pollFirstEntry();
    //        return treeSetQueue.pollFirst();
            return concurrentMapQueue.remove(getNextReaderId());
        }
    
        @Override
        public boolean isEmpty() {
            // 队列为空,不一定代表就没有可供读取的元素了,因为 counter 可能落后于并发写操作了
            // 队列不为空,不一定代表就一定有可供读取的元素,因为 counter 可能落后于并发 remove 操作了
            // 当读指针等于写指针时,则代表所有元素已被读取完成,应该是比较准确的空判定标准
            return concurrentMapQueue.isEmpty()
                        || readerOfQueueOffsetJvmHolder.get() == writerOfQueueOffsetJvmHolder.get();
        }
    
        @Override
        public void close() {
            flushQueueWriterOffset();
            flushQueueReaderOffset();
            mapDb.close();
        }
    
        @Override
        public void reset() {
            concurrentMapQueue.clear();
            // 同步两个值,非准确的
            readerOfQueueOffsetJvmHolder.set(writerOfQueueOffsetJvmHolder.get());
            logger.error("【mapdb缓存】读写指针冲突,强制重置指针,请注意排查并发问题. reader:{}, writer:{}",
                                readerOfQueueOffsetJvmHolder.get(), writerOfQueueOffsetJvmHolder.get());
        }
    
    }

      如上,就是使用 mapdb的hashMap 实现了磁盘队列功能,主要思路如下:

      1. 使用一个long的自增数据作为 hashMap 的key,将队列存入value中;
      2. 使用另一个 long 的自增指针做为读key, 依次读取数据;
      3. 读写指针都定期刷入磁盘,以防出异常crash时无法恢复;
      4. 当实在出现了未预料的bug时,允许直接丢弃冲突日志,从一个新的读取点开始新的工作;

      最后,再加一个工厂类,生成mapdb队列实例: LocalDiskEnhancedQueueManagerFactory

    import com.test.biz.cache.impl.LocalDiskEnhancedQueueManagerMapDbImpl;
    
    /**
     * 本地磁盘队列等实例工厂类
     *
     */
    public class LocalDiskEnhancedQueueManagerFactory {
    
        /**
         * 生产一个mapDb实现的队列实例
         *
         * @return mapdb 队列实例
         */
        public static LocalDiskEnhancedQueueManager newMapDbQueue() {
            return new LocalDiskEnhancedQueueManagerMapDbImpl();
        }
    
        /**
         * 生产一个使用 ehcache 实现的队列实例
         *
         * @return ehcache 队列实例
         */
        public static LocalDiskEnhancedQueueManager newEhcacheQueue() {
            // 有兴趣的同学可以实现下
            return null;
        }
    
        /**
         * 生产一个使用 fqueue 实现的队列实例
         *
         * @return fqueue 队列实例
         */
        public static LocalDiskEnhancedQueueManager newFQueueQueue() {
            // 有兴趣的同学可以实现下, 不过不太建议
            return null;
        }
    
    
        /**
         * 生产一个使用 自己直接写磁盘文件 实现的队列实例
         *
         * @return file 队列实例
         */
        public static LocalDiskEnhancedQueueManager newOwnFileQueue() {
            // 有兴趣的同学可以挑战下
            return null;
        }
    
    }

      这样,我们就实现了一个既能满足高并发场景下的日志打印需求了吧。业务线程优先,日志线程异步化、可丢弃、cpu占用少、内存不限制。

    老话: 优化之路,道阻且长!

  • 相关阅读:
    iOS 项目中的NSNotification简单使用
    IOS开发之格式化日期时间的使用 && 编程中常见问题
    linker command failed with exit code 1 (use -v to see invocation),经典Xcode编译错误的出现和解决!
    CocoaPods安装和使用教程
    机器学习算法--贝叶斯分类器(二)
    机器学习算法--贝叶斯分类器(一)
    Linux系统初始化过程及运行级别简介
    Linux基本符号
    索引节点inode详解
    Linux文件类型介绍
  • 原文地址:https://www.cnblogs.com/yougewe/p/10988194.html
Copyright © 2011-2022 走看看