zoukankan      html  css  js  c++  java
  • 应用监控CAT之cat-consumer源码阅读(二)

      之前讲了 cat-client 进行cat埋点上报,那么上报给谁呢?以及后续故事如何?让我们来看看 cat-consumer 是如何接收处理的?

      由cat-client发送数据,cat-consumer进行接收请求处理,开始了处理问题之旅!

    首先,让我们来回顾一下 TcpSocketSender 是如何发送数据的:

    // TcpSocketSender 往channel中写入数据,此处有兴趣的同学可以延伸下 netty 的源码!
        private void sendInternal(MessageTree tree) {
            ChannelFuture future = m_manager.channel();
            ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
    
            m_codec.encode(tree, buf);
    
            int size = buf.readableBytes();
            Channel channel = future.channel();
    
            // 以 ByteBuf 形式发送数据
            channel.writeAndFlush(buf);
            // 更新统计数据
            if (m_statistics != null) { 
                m_statistics.onBytes(size);
            }
        }

     

    // TcpSocketReceiver, 接收发送过来的数据,默认端口 2280, 注册服务,线上为分布式部署,即为接口调用式。

        public void init() {
            try {
                startServer(m_port);
            } catch (Throwable e) {
                m_logger.error(e.getMessage(), e);
            }
        }
    
        public synchronized void startServer(int port) throws InterruptedException {
            boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
            int threads = 24;
            ServerBootstrap bootstrap = new ServerBootstrap();
    
            m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
            m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
            bootstrap.group(m_bossGroup, m_workerGroup);
            bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
    
            // 添加处理handler, 进行请求逻辑处理
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    
                    // 此处仅为一个解码器,实际功能在该解码器中完成
                    pipeline.addLast("decode", new MessageDecoder());
                }
            });
    
            bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
            try {
                m_future = bootstrap.bind(port).sync();
                m_logger.info("start netty server!");
            } catch (Exception e) {
                m_logger.error("Started Netty Server Failed:" + port, e);
            }
        }
    
    // 消息解码器,并处理具体业务逻辑,先确认数据已上传完成,再进行逻辑处理
        public class MessageDecoder extends ByteToMessageDecoder {
    
            @Override
            protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
                if (buffer.readableBytes() < 4) {
                    return;
                }
                buffer.markReaderIndex();
                int length = buffer.readInt();
                buffer.resetReaderIndex();
                if (buffer.readableBytes() < length + 4) {
                    return;
                }
                try {
                    if (length > 0) {
                        ByteBuf readBytes = buffer.readBytes(length + 4);
                        readBytes.markReaderIndex();
                        readBytes.readInt();
                // 消息解码,获取头信息,消息体
                        DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);
    
                        readBytes.resetReaderIndex();
                        tree.setBuffer(readBytes);
                        // 交由handler处理实际逻辑
                        m_handler.handle(tree);
                        m_processCount++;
    
                        long flag = m_processCount % CatConstants.SUCCESS_COUNT;
    
                        if (flag == 0) {
                            m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
                        }
                    } else {
                        // client message is error
                        buffer.readBytes(length);
                    }
                } catch (Exception e) {
                    m_serverStateManager.addMessageTotalLoss(1);
                    m_logger.error(e.getMessage(), e);
                }
            }
        }
        

    // 消息解码,为后续处理提供基础设施

        //PlainTextMessageCodec.decode() 消息解码
        @Override
        public MessageTree decode(ByteBuf buf) {
            MessageTree tree = new DefaultMessageTree();
    
            // 使用一个默认消息树,用于接收消息
            decode(buf, tree);
            return tree;
        }
    
        @Override
        public void decode(ByteBuf buf, MessageTree tree) {
            Context ctx = m_ctx.get().setBuffer(buf);
    
            // 解析头信息
            decodeHeader(ctx, tree);
    
            // 解析消息体
            if (buf.readableBytes() > 0) {
                decodeMessage(ctx, tree);
            }
        }
        // 解析头信息,以tab='	' 和 lf='
    ', 进行分割
        protected void decodeHeader(Context ctx, MessageTree tree) {
            BufferHelper helper = m_bufferHelper;
            String id = helper.read(ctx, TAB);
            String domain = helper.read(ctx, TAB);
            String hostName = helper.read(ctx, TAB);
            String ipAddress = helper.read(ctx, TAB);
            String threadGroupName = helper.read(ctx, TAB);
            String threadId = helper.read(ctx, TAB);
            String threadName = helper.read(ctx, TAB);
            String messageId = helper.read(ctx, TAB);
            String parentMessageId = helper.read(ctx, TAB);
            String rootMessageId = helper.read(ctx, TAB);
            String sessionToken = helper.read(ctx, LF);
    
            if (VERSION.equals(id)) {
                tree.setDomain(domain);
                tree.setHostName(hostName);
                tree.setIpAddress(ipAddress);
                tree.setThreadGroupName(threadGroupName);
                tree.setThreadId(threadId);
                tree.setThreadName(threadName);
                tree.setMessageId(messageId);
                tree.setParentMessageId(parentMessageId);
                tree.setRootMessageId(rootMessageId);
                tree.setSessionToken(sessionToken);
            } else {
                throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id));
            }
        }
        // 解析消息体
        protected void decodeMessage(Context ctx, MessageTree tree) {
            Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();
            Message parent = decodeLine(ctx, null, stack);
    
            tree.setMessage(parent);
    
            // 循环读取消息体,直到读取完成
            while (ctx.getBuffer().readableBytes() > 0) {
                Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);
    
                if (message instanceof DefaultTransaction) {
                    parent = message;
                } else {
                    break;
                }
            }
        }
        // 解析内容栈出来
        protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack<DefaultTransaction> stack) {
            BufferHelper helper = m_bufferHelper;
            byte identifier = ctx.getBuffer().readByte();
            String timestamp = helper.read(ctx, TAB);
            String type = helper.read(ctx, TAB);
            String name = helper.read(ctx, TAB);
    
            switch (identifier) {
            // t: transaction 类型消息, T: pop结束, E:Event, M: Metrics, L: Trace, H: heartbeat 消息
            case 't':
                DefaultTransaction transaction = new DefaultTransaction(type, name, null);
    
                helper.read(ctx, LF); // get rid of line feed
                transaction.setTimestamp(m_dateHelper.parse(timestamp));
    
                if (parent != null) {
                    parent.addChild(transaction);
                }
    
                stack.push(parent);
                return transaction;
            case 'A':
                DefaultTransaction tran = new DefaultTransaction(type, name, null);
                String status = helper.read(ctx, TAB);
                String duration = helper.read(ctx, TAB);
                String data = helper.read(ctx, TAB);
    
                helper.read(ctx, LF); // get rid of line feed
                tran.setTimestamp(m_dateHelper.parse(timestamp));
                tran.setStatus(status);
                tran.addData(data);
    
                long d = Long.parseLong(duration.substring(0, duration.length() - 2));
                tran.setDurationInMicros(d);
    
                if (parent != null) {
                    parent.addChild(tran);
                    return parent;
                } else {
                    return tran;
                }
            case 'T':
                String transactionStatus = helper.read(ctx, TAB);
                String transactionDuration = helper.read(ctx, TAB);
                String transactionData = helper.read(ctx, TAB);
    
                helper.read(ctx, LF); // get rid of line feed
                parent.setStatus(transactionStatus);
                parent.addData(transactionData);
    
                long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2));
    
                parent.setDurationInMicros(transactionD);
    
                return stack.pop();
            case 'E':
                DefaultEvent event = new DefaultEvent(type, name);
                String eventStatus = helper.read(ctx, TAB);
                String eventData = helper.read(ctx, TAB);
    
                helper.read(ctx, LF); // get rid of line feed
                event.setTimestamp(m_dateHelper.parse(timestamp));
                event.setStatus(eventStatus);
                event.addData(eventData);
    
                if (parent != null) {
                    parent.addChild(event);
                    return parent;
                } else {
                    return event;
                }
            case 'M':
                DefaultMetric metric = new DefaultMetric(type, name);
                String metricStatus = helper.read(ctx, TAB);
                String metricData = helper.read(ctx, TAB);
    
                helper.read(ctx, LF); // get rid of line feed
                metric.setTimestamp(m_dateHelper.parse(timestamp));
                metric.setStatus(metricStatus);
                metric.addData(metricData);
    
                if (parent != null) {
                    parent.addChild(metric);
                    return parent;
                } else {
                    return metric;
                }
            case 'L':
                DefaultTrace trace = new DefaultTrace(type, name);
                String traceStatus = helper.read(ctx, TAB);
                String traceData = helper.read(ctx, TAB);
    
                helper.read(ctx, LF); // get rid of line feed
                trace.setTimestamp(m_dateHelper.parse(timestamp));
                trace.setStatus(traceStatus);
                trace.addData(traceData);
    
                if (parent != null) {
                    parent.addChild(trace);
                    return parent;
                } else {
                    return trace;
                }
            case 'H':
                DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
                String heartbeatStatus = helper.read(ctx, TAB);
                String heartbeatData = helper.read(ctx, TAB);
    
                helper.read(ctx, LF); // get rid of line feed
                heartbeat.setTimestamp(m_dateHelper.parse(timestamp));
                heartbeat.setStatus(heartbeatStatus);
                heartbeat.addData(heartbeatData);
    
                if (parent != null) {
                    parent.addChild(heartbeat);
                    return parent;
                } else {
                    return heartbeat;
                }
            default:
                m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: "
                      + ctx.getBuffer().toString(Charset.forName("utf-8")));
                throw new RuntimeException("Unknown identifier int name");
            }
        }
    View Code

    // handler 处理流程,由DefaultMessageHandler接手,安排后续工作。

        // DefaultMessageHandler, 接过处理器的第一棒, 交由另一实际的consumer(RealtimeConsumer) handler处理
        @Override
        public void handle(MessageTree tree) {
            if (m_consumer == null) {
                m_consumer = lookup(MessageConsumer.class);
            }
    
            try {
                m_consumer.consume(tree);
            } catch (Throwable e) {
                m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
            }
        }
        // RealtimeConsumer, 进行消费数据
        @Override
        public void consume(MessageTree tree) {
            String domain = tree.getDomain();
            String ip = tree.getIpAddress();
    
            // 进行权限检测,ip,domain
            if (!m_blackListManager.isBlack(domain, ip)) {
                long timestamp = tree.getMessage().getTimestamp();
                Period period = m_periodManager.findPeriod(timestamp);
    
                // 找到period, 再将消息分配过去,否则算作网络异常
                if (period != null) {
                    period.distribute(tree);
                } else {
                    m_serverStateManager.addNetworkTimeError(1);
                }
            } else {
                m_black++;
    
                if (m_black % CatConstants.SUCCESS_COUNT == 0) {
                    Cat.logEvent("Discard", domain);
                }
            }
        }

    // Period.distribute, 将消息依次取出,进行分发到队列

        public void distribute(MessageTree tree) {
            // 统计进行数进行加1
            m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
            boolean success = true;
            String domain = tree.getDomain();
    
            // 将各种类型的监控数据分别取出进行处理
            for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
                List<PeriodTask> tasks = entry.getValue();
                int length = tasks.size();
                int index = 0;
                boolean manyTasks = length > 1;
    
                if (manyTasks) {
                    index = Math.abs(domain.hashCode()) % length;
                }
                PeriodTask task = tasks.get(index);
                // 如果有金条消息,将task重新入队
                boolean enqueue = task.enqueue(tree);
    
                if (enqueue == false) {
                    if (manyTasks) {
                        task = tasks.get((index + 1) % length);
                        enqueue = task.enqueue(tree);
    
                        if (enqueue == false) {
                            success = false;
                        }
                    } else {
                        success = false;
                    }
                }
            }
    
            if (!success) {
                m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
            }
        }
        // PeriodTask.enqueue, 重新入队消息,让消费线程自行消费 LinkedBlockingQueue.offer(..)
        public boolean enqueue(MessageTree tree) {
            boolean result = m_queue.offer(tree);
    
            if (!result) { // trace queue overflow, 记录入队失败日志
                m_queueOverflow++;
    
                if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) {
                    m_logger.warn(m_analyzer.getClass().getSimpleName() + " queue overflow number " + m_queueOverflow);
                }
            }
            return result;
        }

    到此,一条消费线路就完成了。


    // PeriodTask 线程,作为第二个消费线路

        @Override
        public void run() {
            try {
                // 分析各消息数据,做后台消费处理
                m_analyzer.analyze(m_queue);
            } catch (Exception e) {
                Cat.logError(e);
            }
        }
        // 调用统一的抽象类的模板方法,由各类进行具体的 process 处理
        @Override
        public void analyze(MessageQueue queue) {
            while (!isTimeout() && isActive()) {
                MessageTree tree = queue.poll();
    
                if (tree != null) {
                    try {
                        // 调用具体类的process 
                        process(tree);
                    } catch (Throwable e) {
                        m_errors++;
    
                        if (m_errors == 1 || m_errors % 10000 == 0) {
                            Cat.logError(e);
                        }
                    }
                }
            }
    
            // 如果出现超时或者停止动作,则把剩余队列处理完成再退出线程
            while (true) {
                MessageTree tree = queue.poll();
    
                if (tree != null) {
                    try {
                        process(tree);
                    } catch (Throwable e) {
                        m_errors++;
    
                        if (m_errors == 1 || m_errors % 10000 == 0) {
                            Cat.logError(e);
                        }
                    }
                } else {
                    break;
                }
            }
        }
        // 超时规则,当前时间 > 开始时间+1小时+设置额外超时时间
        protected boolean isTimeout() {
            long currentTime = System.currentTimeMillis();
            long endTime = m_startTime + m_duration + m_extraTime;
    
            return currentTime > endTime;
        }
        

    // 具体的 Anlalyzer示例: DumpAnlalyzer.process

    // 具体的 Anlalyzer示例: DumpAnlalyzer.process
        @Override
        public void process(MessageTree tree) {
            String domain = tree.getDomain();
    
            if ("PhoenixAgent".equals(domain)) {
                return;
            } else {
                MessageId messageId = MessageId.parse(tree.getMessageId());
    
                if (messageId.getVersion() == 2) {
                    // 计算出当前时间范围,
                    long time = tree.getMessage().getTimestamp();
                    long fixedTime = time - time % (TimeHelper.ONE_HOUR);
                    long idTime = messageId.getTimestamp();
                    long duration = fixedTime - idTime;
    
                    if (duration == 0 || duration == ONE_HOUR || duration == -ONE_HOUR) {
                        m_bucketManager.storeMessage(tree, messageId);
                    } else {
                        m_serverStateManager.addPigeonTimeError(1);
                    }
                }
            }
        }
    // 存储log消息到本地文件,并后续上传到hdfs
        @Override
        public void storeMessage(final MessageTree tree, final MessageId id) {
            boolean errorFlag = true;
            int hash = Math.abs((id.getDomain() + '-' + id.getIpAddress()).hashCode());
            int index = (int) (hash % m_gzipThreads);
            MessageItem item = new MessageItem(tree, id);
            LinkedBlockingQueue<MessageItem> queue = m_messageQueues.get(index % (m_gzipThreads - 1));
            boolean result = queue.offer(item);
    
            if (result) {
                errorFlag = false;
            } else {
                if (m_last.offer(item)) {
                    errorFlag = false;
                }
            }
    
            if (errorFlag) {
                m_serverStateManager.addMessageDumpLoss(1);
            }
            logStorageState(tree);
        }
        // 每1000个消息添加一个messageDump=1000
        protected void logStorageState(final MessageTree tree) {
            String domain = tree.getDomain();
            int size = ((DefaultMessageTree) tree).getBuffer().readableBytes();
    
            m_serverStateManager.addMessageSize(domain, size);
            if ((++m_total) % CatConstants.SUCCESS_COUNT == 0) {
                m_serverStateManager.addMessageDump(CatConstants.SUCCESS_COUNT);
            }
        }

    // EventAnalyzer.process 处理event消息

        @Override
        public void process(MessageTree tree) {
            String domain = tree.getDomain();
    
            if (m_serverFilterConfigManager.validateDomain(domain)) {
                EventReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
                Message message = tree.getMessage();
                String ip = tree.getIpAddress();
    
                if (message instanceof Transaction) {
                    processTransaction(report, tree, (Transaction) message, ip);
                } else if (message instanceof Event) {
                    processEvent(report, tree, (Event) message, ip);
                }
            }
        }
        // 循环处理多个transation
        private void processTransaction(EventReport report, MessageTree tree, Transaction t, String ip) {
            List<Message> children = t.getChildren();
    
            for (Message child : children) {
                if (child instanceof Transaction) {
                    processTransaction(report, tree, (Transaction) child, ip);
                } else if (child instanceof Event) {
                    processEvent(report, tree, (Event) child, ip);
                }
            }
        }
    // StateAnalyzer.process 对cat的机器作展示
        @Override
        protected void process(MessageTree tree) {
            String domain = tree.getDomain();
    
            if (m_serverFilterConfigManager.validateDomain(domain)) {
                StateReport report = m_reportManager.getHourlyReport(getStartTime(), Constants.CAT, true);
                String ip = tree.getIpAddress();
                Machine machine = report.findOrCreateMachine(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
    
                machine.findOrCreateProcessDomain(domain).addIp(ip);
            }
        }

    // 所有分析线程,由 Period 进行初始化启动所有的Analyzer备用

        public void start() {
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
            m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(),
                  df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));
    
            for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
                List<PeriodTask> taskList = tasks.getValue();
    
                for (int i = 0; i < taskList.size(); i++) {
                    PeriodTask task = taskList.get(i);
    
                    task.setIndex(i);
    
                    Threads.forGroup("Cat-RealtimeConsumer").start(task);
                }
            }
        }

    // 为保证高可用,使用 ChannelManager, 专门检查channel通道是否仍然存活,如果出问题,则发起重连。

        @Override
        public void run() {
            while (m_active) {
                // make save message id index asyc
                m_idfactory.saveMark();
                checkServerChanged();
    
                ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
                List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();
    
                doubleCheckActiveServer(activeFuture);
                reconnectDefaultServer(activeFuture, serverAddresses);
    
                try {
                    Thread.sleep(10 * 1000L); // check every 10 seconds
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
        

    // 最后一个关键点也是很重要的一个点,PeriodManager, 用于滚动式处理每小时的监控数据,保存数据到磁盘

    // PeriodManager, 用于滚动式处理每小时的监控数据
    
    public class PeriodManager implements Task {
        private PeriodStrategy m_strategy;
    
        private List<Period> m_periods = new ArrayList<Period>();
    
        private boolean m_active;
    
        @Inject
        private MessageAnalyzerManager m_analyzerManager;
    
        @Inject
        private ServerStatisticManager m_serverStateManager;
    
        @Inject
        private Logger m_logger;
    
        public static long EXTRATIME = 3 * 60 * 1000L;
    
        public PeriodManager(long duration, MessageAnalyzerManager analyzerManager,
              ServerStatisticManager serverStateManager, Logger logger) {
            m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
            m_active = true;
            m_analyzerManager = analyzerManager;
            m_serverStateManager = serverStateManager;
            m_logger = logger;
        }
    
        private void endPeriod(long startTime) {
            int len = m_periods.size();
    
            for (int i = 0; i < len; i++) {
                Period period = m_periods.get(i);
    
                if (period.isIn(startTime)) {
                    period.finish();
                    m_periods.remove(i);
                    break;
                }
            }
        }
    
        public void init() {
            long startTime = m_strategy.next(System.currentTimeMillis());
    
            startPeriod(startTime);
        }
    
        @Override
        public void run() {
            while (m_active) {
                try {
                    long now = System.currentTimeMillis();
                    long value = m_strategy.next(now);
    
                    if (value > 0) {
                        startPeriod(value);
                    } else if (value < 0) {
                        // 上个运行周期,即1小时已完成后,启用一个结束线程进行规划原来的数据
                        Threads.forGroup("cat").start(new EndTaskThread(-value));
                    }
                } catch (Throwable e) {
                    Cat.logError(e);
                }
    
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    
        private void startPeriod(long startTime) {
            long endTime = startTime + m_strategy.getDuration();
            Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
    
            m_periods.add(period);
            period.start();
        }
    
        private class EndTaskThread implements Task {
    
            private long m_startTime;
    
            public EndTaskThread(long startTime) {
                m_startTime = startTime;
            }
    
            @Override
            public void run() {
                // 调用外部类的结束方法
                endPeriod(m_startTime);
            }
    
        }
    }
        // Period.finish(), 结束
        public void finish() {
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date startDate = new Date(m_startTime);
            Date endDate = new Date(m_endTime - 1);
    
            m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
                  df.format(endDate)));
    
            try {
                for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
                    for (PeriodTask task : tasks.getValue()) {
                        task.finish();
                    }
                }
            } catch (Throwable e) {
                Cat.logError(e);
            } finally {
                m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
                      df.format(endDate)));
            }
        }
        // PeriodTask.finish(), 真正处理上一周期数据
        public void finish() {
            try {
                // 调用各自分析器的 doCheckpoint 检查,后续处理
                m_analyzer.doCheckpoint(true);
                // 销毁分析器, help gc
                m_analyzer.destroy();
            } catch (Exception e) {
                Cat.logError(e);
            }
        }
        // 举例 EventAnalyzer.doCheckpoint, 需加锁处理
        @Override
        public synchronized void doCheckpoint(boolean atEnd) {
            if (atEnd && !isLocalMode()) {
                m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
            } else {
                m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
            }
        }
        // DefaultReportManager.storeHourlyReports, 存储logview, 存在统计数据到db
        @Override
        public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
            Transaction t = Cat.newTransaction("Checkpoint", m_name);
            Map<String, T> reports = m_reports.get(startTime);
            ReportBucket bucket = null;
    
            try {
                t.addData("reports", reports == null ? 0 : reports.size());
    
                if (reports != null) {
                    Set<String> errorDomains = new HashSet<String>();
    
                    for (String domain : reports.keySet()) {
                        if (!m_validator.validate(domain)) {
                            errorDomains.add(domain);
                        }
                    }
                    for (String domain : errorDomains) {
                        reports.remove(domain);
                    }
                    if (!errorDomains.isEmpty()) {
                        m_logger.info("error domain:" + errorDomains);
                    }
    
                    m_reportDelegate.beforeSave(reports);
    
                    if (policy.forFile()) {
                        bucket = m_bucketManager.getReportBucket(startTime, m_name, index);
    
                        try {
                            storeFile(reports, bucket);
                        } finally {
                            m_bucketManager.closeBucket(bucket);
                        }
                    }
    
                    if (policy.forDatabase()) {
                        storeDatabase(startTime, reports);
                    }
                }
                t.setStatus(Message.SUCCESS);
            } catch (Throwable e) {
                Cat.logError(e);
                t.setStatus(e);
                m_logger.error(String.format("Error when storing %s reports of %s!", m_name, new Date(startTime)), e);
            } finally {
                cleanup(startTime);
                t.complete();
    
                if (bucket != null) {
                    m_bucketManager.closeBucket(bucket);
                }
            }
        }
        // DefaultReportManager.storeDatabase, 存储    db
        private void storeDatabase(long startTime, Map<String, T> reports) {
            Date period = new Date(startTime);
            String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
    
            for (T report : reports.values()) {
                try {
                    String domain = m_reportDelegate.getDomain(report);
                    HourlyReport r = m_reportDao.createLocal();
    
                    r.setName(m_name);
                    r.setDomain(domain);
                    r.setPeriod(period);
                    r.setIp(ip);
                    r.setType(1);
    
                    m_reportDao.insert(r);
    
                    int id = r.getId();
                    byte[] binaryContent = m_reportDelegate.buildBinary(report);
                    HourlyReportContent content = m_reportContentDao.createLocal();
    
                    content.setReportId(id);
                    content.setContent(binaryContent);
                    m_reportContentDao.insert(content);
                    m_reportDelegate.createHourlyTask(report);
                } catch (Throwable e) {
                    Cat.getProducer().logError(e);
                }
            }
        }
        // DefaultReportManager.storeFile, 存在file, xml
        private void storeFile(Map<String, T> reports, ReportBucket bucket) {
            for (T report : reports.values()) {
                try {
                    String domain = m_reportDelegate.getDomain(report);
                    String xml = m_reportDelegate.buildXml(report);
    
                    bucket.storeById(domain, xml);
                } catch (Exception e) {
                    Cat.logError(e);
                }
            }
        }

    总结起来就几个东西:
      1. 使用netty开启高性能的接收服务;

      2. 使用队列进行保存消息;

      3. 使用单独线程检测channel有效性,保证高可用;

      4. 所有单小时的数据,保存在内存中,速度特别快;

      5. 多线程技术发挥得很好;

      6. 模板模式的应用,阻塞队列的应用;

      7. hdfs的应用,优雅停机的应用;


    等等,来个图展示下:

    task 运行过程:

    周期报告,汇总:

  • 相关阅读:
    my ReadBook_dianzishangwu / 2020216 / dianzishangwuwuliu
    my ReadBook_shichangyingxiao / 2020208
    C#编写的clock
    Java建立JProgressBar
    java基本类型byte的取值范围
    正则表达式整理大全
    (C#)如何利用Graphics画出一幅图表
    设置鼠标可以移动窗体
    Java编程提高性能的26个方法
    数据加密工具设计经验
  • 原文地址:https://www.cnblogs.com/yougewe/p/9494904.html
Copyright © 2011-2022 走看看