zoukankan      html  css  js  c++  java
  • 监控心跳实现

    监控主动轮询进程是否会崩溃跪掉,采取措施是通过心跳包形式进行抓取,定时生成文件,jnotify监听发到kafka上,之后通过消费者进行解析,若发现不符规则情况,发邮件短信报警。 

    Java代码 
    1. package heartbeat.monitor;  
    2.   
    3. import java.io.FileInputStream;  
    4. import java.io.IOException;  
    5. import java.io.InputStream;  
    6. import java.util.ArrayList;  
    7. import java.util.List;  
    8. import java.util.Properties;  
    9. import java.util.concurrent.ExecutionException;  
    10.   
    11. import org.apache.commons.io.IOUtils;  
    12.   
    13. import heartbeat.monitor.sendmessage.SendMailUtil;  
    14. import heartbeat.monitor.util.HeartBeatMonitorConstant;  
    15.   
    16. /** 
    17.  * @author Switching 
    18.  * @version 1.0, 2017-01-01 
    19.  * @since heartbeat.monitor 1.0 
    20.  */  
    21. public class HeartBeatMonitorAccess {  
    22.     private List<HeartBeatMonitor> consumers;  
    23.     private static String heartbeatConsumerProperties;  
    24.   
    25.     public HeartBeatMonitorAccess() throws IOException {  
    26.   
    27.         Properties properties = new Properties();  
    28.         if (heartbeatConsumerProperties == null) {  
    29.             properties.load(ClassLoader.getSystemResourceAsStream("heartBeatMonitor.properties"));  
    30.         } else {  
    31.             String propurl = heartbeatConsumerProperties;  
    32.             InputStream in = new FileInputStream(propurl);  
    33.             properties.load(in);  
    34.             IOUtils.closeQuietly(in);  
    35.         }  
    36.         int num = 1;  
    37.         String topic = properties.getProperty(HeartBeatMonitorConstant.MONITOR_TOPIC);  
    38.         consumers = new ArrayList<HeartBeatMonitor>(num);  
    39.         for (int i = 0; i < num; i++) {  
    40.             consumers.add(new HeartBeatMonitor(properties, topic));  
    41.             new SendMailUtil(properties, topic);  
    42.         }  
    43.     }  
    44.   
    45.     public void exeute() throws InterruptedException, ExecutionException {  
    46.         for (HeartBeatMonitor consumer : consumers) {  
    47.             new Thread(consumer).start();  
    48.         }  
    49.     }  
    50.   
    51.     public static void main(String[] args) throws Exception {  
    52.         if (args.length > 0)  
    53.             heartbeatConsumerProperties = args[0];  
    54.   
    55.         HeartBeatMonitorAccess consumerGroup = new HeartBeatMonitorAccess();  
    56.         consumerGroup.exeute();  
    57.     }  
    58.   
    59. }  



    消费者抓取相应的规则进行解析判断 
    后台轮询消费kafka中消息 

    Java代码 
    1. package heartbeat.monitor;  
    2.   
    3. import java.io.IOException;  
    4. import java.text.DateFormat;  
    5. import java.text.ParseException;  
    6. import java.text.SimpleDateFormat;  
    7. import java.util.Collections;  
    8. import java.util.Date;  
    9. import java.util.Map;  
    10. import java.util.Properties;  
    11.   
    12. import org.apache.kafka.clients.consumer.ConsumerRecord;  
    13. import org.apache.kafka.clients.consumer.ConsumerRecords;  
    14. import org.apache.kafka.clients.consumer.KafkaConsumer;  
    15. import org.codehaus.jackson.map.ObjectMapper;  
    16.   
    17. import heartbeat.monitor.sendmessage.SendMailUtil;  
    18. import heartbeat.monitor.util.HeartBeatMonitorConstant;  
    19. import heartbeat.monitor.util.MessageUtil;  
    20.   
    21. /** 
    22.  * @author Switching 
    23.  * @version 1.0, 2017-01-01 
    24.  * @since heartbeat.monitor 1.0 
    25.  */  
    26. public class HeartBeatMonitor implements Runnable {  
    27.     private final KafkaConsumer<String, String> consumer;  
    28.     public static String lastMonitorTime;  
    29.     public static String lastMonitorInfo;  
    30.     public static DateFormat dfMonitorTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    31.   
    32.     private long timeMonitor = 20000;  
    33.     private long maxTriesTime = 3;  
    34.     private String monitorKeyWord = "FILE_WRITTEN_CLOSED";  
    35.     private String monitorLogLvls = "EXCEPTION";  
    36.   
    37.     public HeartBeatMonitor(Properties properties, String topic) throws IOException {  
    38.         if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_OFFSET_TIME)))  
    39.             timeMonitor = Long.parseLong(properties.getProperty(HeartBeatMonitorConstant.MONITOR_OFFSET_TIME));  
    40.         if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_TRIES)))  
    41.             maxTriesTime = Long.parseLong(properties.getProperty(HeartBeatMonitorConstant.MONITOR_TRIES));  
    42.         if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_KEY_WORD)))  
    43.             monitorKeyWord = properties.getProperty(HeartBeatMonitorConstant.MONITOR_KEY_WORD);  
    44.         if (!MessageUtil.isEmpty(properties.getProperty(HeartBeatMonitorConstant.MONITOR_LOG_LVLS)))  
    45.             monitorLogLvls = properties.getProperty(HeartBeatMonitorConstant.MONITOR_LOG_LVLS);  
    46.         consumer = new KafkaConsumer<String, String>(properties);  
    47.         consumer.subscribe(Collections.singletonList(topic));  
    48.     }  
    49.   
    50.     public void close() {  
    51.         consumer.close();  
    52.     }  
    53.   
    54.     public void run() {  
    55.         long triesTime = maxTriesTime - 1;  
    56.         System.out.println("Heartbeat Monitor Start!");  
    57.         if (lastMonitorTime == null) {  
    58.             resetTime();  
    59.         }  
    60.         String name = Thread.currentThread().getName();  
    61.         while (true) {  
    62.             Date dtMonitor;  
    63.             Date dtLocal = new Date();  
    64.   
    65.             ConsumerRecords<String, String> records = consumer.poll(2);  
    66.             for (ConsumerRecord<String, String> record : records) {  
    67.                 ObjectMapper objectMapper = new ObjectMapper();  
    68.                 try {  
    69.                     Map<String, String> recordMap = objectMapper.readValue(record.value(), Map.class);  
    70.                     if (recordMap.get(HeartBeatMonitorConstant.FILE_EVENT).toString().equals(monitorKeyWord)) {  
    71.                         lastMonitorTime = recordMap.get(HeartBeatMonitorConstant.FILE_TIME).toString();  
    72.                         lastMonitorInfo = recordMap.get(HeartBeatMonitorConstant.SFTP_HOST_NAME) + ":"  
    73.                                 + recordMap.get(HeartBeatMonitorConstant.SFTP_HOST_IP);  
    74.                     }  
    75.                 } catch (Exception e) {  
    76.                     e.printStackTrace();  
    77.                 }  
    78.                 System.out.println(name + "---" + record.partition() + ":" + record.offset() + " = " + record.key()  
    79.                         + ":" + record.value());  
    80.             }  
    81.   
    82.             try {  
    83.                 dtMonitor = dfMonitorTime.parse(lastMonitorTime);  
    84.                 if ((dtLocal.getTime() - timeMonitor) > dtMonitor.getTime()) {  
    85.                     if (triesTime > 0) {  
    86.                         triesTime--;  
    87.                     } else {  
    88.                         resetTime();  
    89.                         triesTime = maxTriesTime - 1;  
    90.                     }  
    91.                     if (HeartBeatMonitorConstant.LOG_LVL_EXCEP.equals(monitorLogLvls)  
    92.                             || HeartBeatMonitorConstant.LOG_LVL_ALL.equals(monitorLogLvls)) {  
    93.                         String content = "[" + dtLocal + "] " + lastMonitorInfo  
    94.                                 + " Exception! manual intervention please! " + (triesTime + 1) + " times!  Now("  
    95.                                 + dtLocal + ") is beyond " + timeMonitor / 1000 + "(s) lastMonitorTime(" + dtMonitor  
    96.                                 + ").";  
    97.                         System.out.println(content);  
    98.                         if (triesTime == 0) {  
    99.                             SendMailUtil.sendMailAccess(lastMonitorInfo, content);  
    100.                         }  
    101.                     }  
    102.                 } else {  
    103.                     if (HeartBeatMonitorConstant.LOG_LVL_NOEXCEP.equals(monitorLogLvls)  
    104.                             || HeartBeatMonitorConstant.LOG_LVL_ALL.equals(monitorLogLvls)) {  
    105.                         System.out.println("[" + dtLocal + "] " + name + "NoException!  Now(" + dtLocal  
    106.                                 + ") is Range " + timeMonitor / 1000 + " (s) lastMonitorTime(" + dtMonitor + ").");  
    107.                     }  
    108.                 }  
    109.             } catch (ParseException e) {  
    110.                 e.printStackTrace();  
    111.             } catch (Exception e) {  
    112.                 e.printStackTrace();  
    113.             }  
    114.   
    115.         }  
    116.     }  
    117.   
    118.     public void resetTime() {  
    119.         lastMonitorTime = dfMonitorTime.format(new Date());  
    120.         if (lastMonitorInfo == null) {  
    121.             lastMonitorInfo = "Monitor";  
    122.         }  
    123.     }  
    124. }  




    通过配置文件形式,进行各种事件的监控 

    Java代码 
      1. #Created by Switching  
      2. auto.commit.interval.ms=1000  
      3. auto.offset.reset=earliest  
      4. bootstrap.servers=192.168.102.10:9092  
      5. enable.auto.commit=true  
      6. group.id=monitor1  
      7. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer  
      8. monitorTopic=monitor  
      9. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer  
      10.   
      11. #heartBeat monitor exception notify times(3)  
      12. monitorTries=3  
      13. #heartBeat monitor offset time(20000)(ms)  
      14. monitorOffsetTime=20000  
      15. #heartBeat monitor log lvls(EXCEPTION):ALL|EXCEPTION|NOEXCEPTION|NONE   
      16. monitorLogLvls=EXCEPTION  
      17.   
      18. #heartBeat Monitor mailInfo  
      19. sendEmailAccount=xxxx@jdongtech.com   
      20. sendEmailPassword=xxxxx  
      21. sendEmailSMTPHost=smtp.xxxx.com  
      22. receEMailAccount=xxx@jdongtech.com  
  • 相关阅读:
    Python 类的特性讲解
    Python 类的式列化过程解剖
    Python 面向对象介绍
    Python subprocess模块
    Python re模块
    Python configparser模块
    Python pyYAML模块
    Python logging模块
    Python hashlib模块
    OAuth2.0 错误码
  • 原文地址:https://www.cnblogs.com/huluyisheng/p/6866395.html
Copyright © 2011-2022 走看看