zoukankan      html  css  js  c++  java
  • pinpoint web报警机制源码解析

    背景:(简述)

    Pinpoint 是一套APM (Application Performance Management)工具,主要用于帮助分析系统的总体结构和组件如何相互调用,也可用于追踪线上性能问题,方便定位出现问题的点。

    Pinpoint主要有如下几个组成部分:

    • Pinpoint Agent :通过字节码增强技术,附加到 用户的java 应用来做采样,程序启动时指定javaagent以及agentId,pplicationName。
    • HBase :用于存储agent采样的数据。
    • Pinpoint Collector :信息的收集者,部署在tomcat中,由于收集agent采样的数据并存入hbase。
    • Pinpoint Web :提供WEB_UI界面,部署在tomcat中,提供可视化页面,并且提供监控报警功能。(需要自行实现)

    博文的主要内容与主要目的:

      pinpoint的报警功能是需要自行拓展实现的,网上有很多的实现方法,但是没有一个对于此报警机制的源码分析,本文旨在填补此处空白,让读者有一些基本的了解,如此调试报警的时候才能够得心应手。

    背景知识:(详情可以百度一下,后面也会介绍相关内容)

      Spring Task:Spring内置的定时任务。

      Spring Batch: 一个大数据量的并行处理框架。

    概述:

       通过Spring Task的定时任务,每分钟做一次通过SpringBatch的批处理检查。

    该模块github源码地址:

      https://github.com/naver/pinpoint/tree/master/web/src/main/java/com/navercorp/pinpoint/web/alarm

    源码解析:(本文重点)

      1.定时任务入口: src/main/resources/batch/applicationContext-batch-schedule.xml

    <task:scheduled-tasks scheduler="scheduler">
           <task:scheduled ref="batchJobLauncher" method="alarmJob" cron="0 0/1 * * * *" />
           <!--省略-->
    </task:scheduled-tasks>
    <task:scheduler id="scheduler" pool-size="5"/>

    此段代码标签使用的是SpringTask的标签,大概意思为,定义了一个线程池大小为5的调度器scheduler。

    scheduler执行的任务有三个,batchJobLauncher的alarmJob方法,每一分钟执行一次,alarmJob,顾名思义,报警的任务。

      2.批处理任务入口: src/main/resources/batch/applicationContext-alarmJob.xml

    前置说明-SpringBatch批处理框架中与此相关的解释:

    如果是批处理的话,自然有批处理任务(对应下面的Job标签),每个任务自然有一个或者多个步骤(对应下面的Step标签)。

    每个步骤有三个操作,读取数据(对应reader),处理数据(对应processor),回写数据(对应writer)。这三者中的参数是按照顺序传递的。

    大家可能会想,报警机制和读取数据,处理数据,回写数据有什么关系吗?下面我说明一下pinpoint相关的对应的业务关系:

    reader:读取数据 => 通过用户配置的规则提供Checker,即异常校验器。

    processor:处理数据 => 用Checker进行校验,标记异常状态。

    writer:回写数据 => 判断Checker是否有异常情况,有则报警。

    下面的配置的源码:
    <!--定义了一个alramJob的批处理任务-->
      <batch:job id="alarmJob">    
        <batch:step id="alarmPartitionStep">        
          <!--此alarmJob只有一个Step-->
            <batch:partition step="alarmStep" partitioner="alarmPartitioner">
                <!--设置执行的线程池-->
                <batch:handler task-executor="alarmPoolTaskExecutorForPartition" />
            </batch:partition>
        </batch:step>
        <batch:listeners>
           <batch:listener ref="jobFailListener"/>
        </batch:listeners>
    </batch:job>
    
    <batch:step id="alarmStep">
        <!--代表step的一种处理策略-->
        <batch:tasklet>
            <!--批处理流程-->
            <!-- 顺序执行
            reader:读取数据 => 提供Checker,即异常校验器,见下面的bean
            processor:处理数据 => 用Checker进行校验,见下面的bean
            writer:回写数据 => 判断Checker是否有异常情况,有则报警,见下面的bean
            -->
            <batch:chunk reader="reader" processor="processor" writer="writer" commit-interval="1"/>
        </batch:tasklet>
    </batch:step>
    
    <bean id="alarmPartitioner" class="com.navercorp.pinpoint.web.alarm.AlarmPartitioner"/>
    
    <bean id="reader" class="com.navercorp.pinpoint.web.alarm.AlarmReader" scope="step"/>
    <bean id="processor" class="com.navercorp.pinpoint.web.alarm.AlarmProcessor" scope="step"/>
    <bean id="writer" class="com.navercorp.pinpoint.web.alarm.AlarmWriter" scope="step"/>

      3.通过对AlarmReader、AlarmProcessor、AlarmWriter的解析,梳理报警原理

    (1) com.navercorp.pinpoint.web.alarm.AlarmReader:实现了ItemReader接口 
    // StepExecutionListener监听器,可以定义Step开始前后的操作
    public class AlarmReader implements ItemReader<AlarmChecker>, StepExecutionListener {
        ...
       // 报警所用的Checker在内存里
        private final Queue<AlarmChecker> checkers = new ConcurrentLinkedDeque<>();
        ...
    
        // Checker出队,供processor使用,
       @Override
        public AlarmChecker read() {
            return checkers.poll();
        }
    
        // 批处理之前,将应用的报警规则加入Checker之中
        @Override
        public void beforeStep(StepExecution stepExecution) {
         // 查询所有的应用
            List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();
         // 根据应用用户配置的规则,添加Checker到队列当中
            for (Application application : applicationList) {
                addChecker(application);
            }
        }
    
        private void addChecker(Application application) {
         // 根据应用名称获取所有的规则,应用名称就是配置agent的时候,指定的applicationName
            List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
            long timeSlotEndTime = System.currentTimeMillis();
            Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();
            // 遍历规则
            for (Rule rule : rules) {
            // CheckerCategory是一个枚举类,预置了所有的报警规则模版,比如失败请求次数、慢请求次数等
                CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
            // 数据收集器是为检验规则准备的,例如Rule是失败请求次数,但是次数从哪里来,就是从这个收集器来的
    
                DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
           // 这里是一个基于Map的缓存
                if (collector == null) {
                    collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
                    collectorMap.put(collector.getDataCollectorCategory(), collector);
                }
                // 创建Checker,有兴趣的读者可以看看CheckerCategroy的源码,设计的还是很不错的。
            // AlaramChecker是一个抽象类,具体的功能由子类实现
                AlarmChecker checker = checkerCategory.createChecker(collector, rule);
           // 加入队列
                checkers.add(checker);
            }
            
        }
        ...
    }
    
     (2) com.navercorp.pinpoint.web.alarm.AlarmProcessor:实现了ItemProcessor接口
    public class AlarmProcessor implements ItemProcessor<AlarmChecker, AlarmChecker> {
       // 此处的AlarmChecker是上面的read()方法传递过来的
    @Override public AlarmChecker process(AlarmChecker checker) { // check,顾名思义,检验,标记是否有异常情况,check()方法见下 checker.check(); return checker; } }
    com.navercorp.pinpoint.web.alarm.checker.AlarmProcessor
    protected abstract boolean decideResult(T value);
    
        public void check() {
            // 收集数据
            dataCollector.collect();
            // 标记是否有异常情况,意为是否满足报警的阀值,decideResult是一个抽象方法
         // detected字段在后续的Writter中会被检查
            detected = decideResult(getDetectedValue()); }
    (3)com.navercorp.pinpoint.web.alarm.AlarmWriter:实现了ItemWriter接口
    public class AlarmWriter implements ItemWriter<AlarmChecker> {
    
        // 需要用户自定义配置在Spring中的AlarmMessageSender,如果不配置,则是一个空实现
        @Autowired(required = false)
        private AlarmMessageSender alarmMessageSender = new EmptyMessageSender();
    
        @Autowired
        private AlarmService alarmService;
    
       // 实现的接口的方法,主要内容 @Override public void write(List<? extends AlarmChecker> checkers) throws Exception { Map<String, CheckerResult> beforeCheckerResults = alarmService.selectBeforeCheckerResults(checkers.get(0).getRule().getApplicationId()); // 遍历上面传递的Checker for (AlarmChecker checker : checkers) { CheckerResult beforeCheckerResult = beforeCheckerResults.get(checker.getRule().getCheckerName()); if (beforeCheckerResult == null) { beforeCheckerResult = new CheckerResult(checker.getRule().getApplicationId(), checker.getRule().getCheckerName(), false, 0, 1); } // 对上面的Processor标记的detected值进行检查 if (checker.isDetected()) { sendAlarmMessage(beforeCheckerResult, checker); } // 记录报警历史 alarmService.updateBeforeCheckerResult(beforeCheckerResult, checker); } } private void sendAlarmMessage(CheckerResult beforeCheckerResult, AlarmChecker checker) { if (isTurnToSendAlarm(beforeCheckerResult)) { // 是否配置了发送报警短信 if (checker.isSMSSend()) { alarmMessageSender.sendSms(checker, beforeCheckerResult.getSequenceCount() + 1); } // 是否配置了发送报警邮件 if (checker.isEmailSend()) { alarmMessageSender.sendEmail(checker, beforeCheckerResult.getSequenceCount() + 1); } } }   // ... }

    总结:

        通过SpringTask定时任务,alarmJob任务每分钟执行一次,alarmJob分为三个步骤来执行,1.首先AlarmReader根据已有的应用以及配置的报警名称,通过实现read()方法,传递给AlaramProcessor;2.AlaramProcessor再调用check()方法,通过收集器去收集数据,查看是否满足报警规则,并通过detected字段进行标示,传递给AlarmWriter;3.AlarmWriter再根据用户配置的报警信息,进行报警并记录报警记录。

    Tips:

    即使满足报警规则,也不一定报警:因为是否报警还是有算法来处理,用于防止可能发生的每分钟都有报警导致的邮件轰炸,我将在下一章节介绍。
  • 相关阅读:
    Java生鲜电商平台-SpringCloud微服务架构中核心要点和实现原理
    Java生鲜电商平台-SpringCloud微服务架构中网络请求性能优化与源码解析
    Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战
    超硬核Java学习路线图+学习资源+实战项目汇总,看完以后不用再问我怎么学Java了!
    还没使用过消息队列?这一份书单值得你好好看看!
    学习Spring,看这几本书就够了
    这份书单,想要晋级高级Java工程师的朋友值得一看!
    搞定JVM基本原理和面试题,看看这几本书妥妥够了!
    《自拍教程16》cmd的常用技巧
    《自拍教程15》命令行软件的通用技巧
  • 原文地址:https://www.cnblogs.com/langshiquan/p/9497464.html
Copyright © 2011-2022 走看看