zoukankan      html  css  js  c++  java
  • 用Hbase存储Log4j日志数据:HbaseAppender

    业务需求:

    需求很简单,就是把多个系统的日志数据统一存储到Hbase数据库中,方便统一查看和监控。

    解决思路:

    写针对Hbase存储的Log4j Appender,有一个简单的日志储存策略,把Log4j的存储和Hbase的存储分开进行,当到达一定量的时候批量写入Hbase。

    Log4j的日志暂时存到一个队列,启动一个计划任务定时检查是否到达指定的量级,到达后批量写入Hbase将队列清空。

    带来一个问题是在Log4j最后一次的数据可能未达到量级程序关闭而丢失,所以如果日志非常重要的话请同时开启文件存储!

    具体代码

    代码部分略掉所有import,请自行导入。

    log4j.properties

    log4j.rootLogger=INFO,HbaseAppender
    
    #HbaseAppender
    log4j.appender.HbaseAppender=cn.bg.log.HbaseAppender
    log4j.appender.HbaseAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.HbaseAppender.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    

    HbaseAppender

    package cn.bg.log;
    
    public class HbaseAppender extends AppenderSkeleton implements Runnable {
    
        private int batchSize = 10;
        private int period = 1000;
        private String hbLogName = "test";
        private String hbLogFamily = "bg";
        private int hbPools = 2;
        private Queue<LoggingEvent> loggingEvents;
        private ScheduledExecutorService executor;
        private ScheduledFuture<?> task;
        private Configuration conf;
        private HTablePool hTablePool;
        private HTableInterface htable;
    
        /**
         * log4j初始设置,启动日志处理计划任务
         */
        @Override
        public void activateOptions() {
            try {
                super.activateOptions();
                //创建一个计划任务,并自定义线程名
                executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HbaseAppender"));
                //日志队列
                loggingEvents = new ConcurrentLinkedQueue<LoggingEvent>();
                //启动计划任务,如果run函数有异常任务将中断!
                task = executor.scheduleWithFixedDelay(this, period, period, TimeUnit.MILLISECONDS);
                System.out.println("ActivateOptions ok!");
            } catch (Exception e) {
                System.err.println("Error during activateOptions: " + e);
            }
        }
    
        /**
         * 初始HBASE
         *
         * @return
         */
        private boolean initHbase() {
            try {
                if (conf == null) {
                    //根据classpath下hbase-site.xml创建hbase连接,基于zookeeper
                    conf = HBaseConfiguration.create();
                    //htable链接池
                    hTablePool = new HTablePool(conf, hbPools);
                    htable = hTablePool.getTable(hbLogName);
                    System.out.println("Init Hbase OK!");
                }
                return true;
            } catch (Exception e) {
                task.cancel(false);
                executor.shutdown();
                System.err.println("Init Hbase fail !");
                return false;
            }
        }
    
        @Override
        public void run() {
            if (conf == null || htable == null) {
                initHbase();
            }
            try {
                //日志数据超出批量处理大小
                if (batchSize <= loggingEvents.size()) {
                    LoggingEvent event;
                    List<Put> logs = new ArrayList<Put>();
                    //循环处理日志队列
                    while ((event = loggingEvents.poll()) != null) {
                        try {
                            //创建日志并指定ROW KEY
                            Put log = new Put((event.getThreadName() + event.getLevel().toString() + System.currentTimeMillis()).getBytes());
                            //写日志内容
                            log.add(hbLogFamily.getBytes(), "log".getBytes(), layout.format(event).getBytes());
                            logs.add(log);
                        } catch (Exception e) {
                            System.err.println("Error logging put " + e);
                        }
                    }
                    //批量写入HBASE
                    if (logs.size() > 0) htable.put(logs);
                }
            } catch (Exception e) {
                System.err.println("Error run " + e);
            }
        }
    
        /**
         * 日志事件
         *
         * @param loggingEvent
         */
        @Override
        protected void append(LoggingEvent loggingEvent) {
            try {
                populateEvent(loggingEvent);
                //添加到日志队列
                loggingEvents.add(loggingEvent);
            } catch (Exception e) {
                System.err.println("Error populating event and adding to queue" + e);
            }
        }
    
        /**
         * 事件测试
         *
         * @param event
         */
        protected void populateEvent(LoggingEvent event) {
            event.getThreadName();
            event.getRenderedMessage();
            event.getNDC();
            event.getMDCCopy();
            event.getThrowableStrRep();
            event.getLocationInformation();
        }
    
        @Override
        public void close() {
            try {
                task.cancel(false);
                executor.shutdown();
                hTablePool.close();
                htable.close();
            } catch (IOException e) {
                System.err.println("Error close " + e);
            }
        }
    
        @Override
        public boolean requiresLayout() {
            return true;
        }
    
        //设置每一批日志处理数量
        public void setBatchSize(int batchSize) {
            this.batchSize = batchSize;
        }
    
        /**
         * 设置计划任务执行间隔
         *
         * @param period
         */
        public void setPeriod(int period) {
            this.period = period;
        }
    
        /**
         * 设置日志存储HBASE表名
         *
         * @param hbLogName
         */
        public void setHbLogName(String hbLogName) {
            this.hbLogName = hbLogName;
        }
    
        /**
         * 日志表的列族名字
         * @param hbLogFamily
         */
        public void setHbLogFamily(String hbLogFamily) {
            this.hbLogFamily = hbLogFamily;
        }
    }
    

    NamedThreadFactory

    package cn.bg.log;
    
    public class NamedThreadFactory implements ThreadFactory {
        private final String prefix;
        private final ThreadFactory threadFactory;
        private final AtomicInteger atomicInteger = new AtomicInteger();
    
        public NamedThreadFactory(final String prefix){
            this(prefix, Executors.defaultThreadFactory());
        }
    
        public NamedThreadFactory(final String prefix, final ThreadFactory threadFactory){
            this.prefix = prefix;
            this.threadFactory = threadFactory;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = this.threadFactory.newThread(r);
            t.setName(this.prefix + this.atomicInteger.incrementAndGet());
            return t;
        }
    }
    

    完!

  • 相关阅读:
    RE
    【LeetCode】198. House Robber
    【LeetCode】053. Maximum Subarray
    【LeetCode】152. Maximum Product Subarray
    【LeetCode】238.Product of Array Except Self
    【LeetCode】042 Trapping Rain Water
    【LeetCode】011 Container With Most Water
    【LeetCode】004. Median of Two Sorted Arrays
    【LeetCode】454 4Sum II
    【LeetCode】259 3Sum Smaller
  • 原文地址:https://www.cnblogs.com/xguo/p/3149917.html
Copyright © 2011-2022 走看看