zoukankan      html  css  js  c++  java
  • ELK+Kafka+Beats实现海量日志收集平台(二)

                ELK+Kafka+Beats实现海量日志收集平台(二)

    三、环境搭建

      通过上一小节应用场景和实现原理的介绍,接下来实现所需环境搭建及说明

    架构图如下所示:

      

      环境说明:

      192.168.232.6 : 部署了demo项目(用于产生数据日志)

                 filebeat-6.6.0 

      192.168.232.3 : Kafka (单体)(Zookeeper:192.168.232.3~5)

      192.168.232.4 : Kibana

      192.168.232.7 : Logstash (单体)

       ES集群:192.168.232.8~10

       1、filebeat安装配置参考https://www.cnblogs.com/jhtian/p/13731230.html

       2、Kafka安装配置参考https://www.cnblogs.com/jhtian/p/13708679.html

       3、Logstash安装配置参考https://www.cnblogs.com/jhtian/p/13744753.html

       4、ES集群搭建可参考https://www.cnblogs.com/jhtian/p/12703651.html

       5、Kibana安装可参考https://www.cnblogs.com/jhtian/p/13785029.html

    四、部署demo工程项目:

      项目结构图如下,分别调用项目的 /index、/error两个方法分别打印正常、错误日志

    (warn及以上级别日志)到logs文件夹中,作为filebeat读取数据的来源。

         

    web访问类文件:indexAction.java

     1 package com.tianjh.demo.web;
     2 
     3 import com.tianjh.demo.util.SetMDC;
     4 import lombok.extern.slf4j.Slf4j;
     5 import org.springframework.web.bind.annotation.RequestMapping;
     6 import org.springframework.web.bind.annotation.RestController;
     7 
     8 @Slf4j
     9 @RestController
    10 public class indexAction {
    11 
    12     @RequestMapping(value = "/index")
    13     public String index() {
    14         SetMDC.putMDC();
    15         log.info("这是一条模拟error日志打印");
    16         log.info("这是一条模拟warn日志打印");
    17         log.info("这是一条模拟info日志打印");
    18         return "hello word";
    19     }
    20 
    21     @RequestMapping(value = "/err")
    22     public String error() {
    23         SetMDC.putMDC();
    24         try {
    25             int a = 5/0;
    26         } catch (Exception e) {
    27             log.error("算术异常", e);
    28         }
    29         return "error";
    30     }
    31 }
    indexAction.java

    工具类Utils

    FastJsonConvertUtil.java

      1 package com.tianjh.demo.util;
      2 
      3 import java.util.ArrayList;
      4 import java.util.List;
      5 
      6 import com.alibaba.fastjson.JSON;
      7 import com.alibaba.fastjson.JSONObject;
      8 import com.alibaba.fastjson.serializer.SerializerFeature;
      9 
     10 import lombok.extern.slf4j.Slf4j;
     11 
     12 /**
     13  * $FastJsonConvertUtil
     14  * @author hezhuo.bai
     15  * @since 2019年1月15日 下午4:53:28
     16  */
     17 @Slf4j
     18 public class FastJsonConvertUtil {
     19 
     20     private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
     21             SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
     22 
     23     /**
     24      * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
     25      * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
     26      * @author hezhuo.bai
     27      * @since 2019年1月15日 下午4:53:49 
     28      * @param data JSON字符串
     29      * @param clzss 转换对象
     30      * @return T
     31      */
     32     public static <T> T convertJSONToObject(String data, Class<T> clzss) {
     33         try {
     34             T t = JSON.parseObject(data, clzss);
     35             return t;
     36         } catch (Exception e) {
     37             log.error("convertJSONToObject Exception", e);
     38             return null;
     39         }
     40     }
     41     
     42     /**
     43      * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
     44      * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
     45      * @author hezhuo.bai
     46      * @since 2019年1月15日 下午4:54:32
     47      * @param data JSONObject对象
     48      * @param clzss 转换对象
     49      * @return T
     50      */
     51     public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
     52         try {
     53             T t = JSONObject.toJavaObject(data, clzss);
     54             return t;
     55         } catch (Exception e) {
     56             log.error("convertJSONToObject Exception", e);
     57             return null;
     58         }
     59     }
     60 
     61     /**
     62      * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
     63      * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
     64      * @author hezhuo.bai
     65      * @since 2019年1月15日 下午4:54:50
     66      * @param data JSON字符串数组
     67      * @param clzss 转换对象
     68      * @return List<T>集合对象
     69      */
     70     public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
     71         try {
     72             List<T> t = JSON.parseArray(data, clzss);
     73             return t;
     74         } catch (Exception e) {
     75             log.error("convertJSONToArray Exception", e);
     76             return null;
     77         }
     78     }
     79 
     80     /**
     81      * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
     82      * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
     83      * @author hezhuo.bai
     84      * @since 2019年1月15日 下午4:55:11
     85      * @param data List<JSONObject>
     86      * @param clzss 转换对象
     87      * @return List<T>集合对象
     88      */
     89     public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
     90         try {
     91             List<T> t = new ArrayList<T>();
     92             for (JSONObject jsonObject : data) {
     93                 t.add(convertJSONToObject(jsonObject, clzss));
     94             }
     95             return t;
     96         } catch (Exception e) {
     97             log.error("convertJSONToArray Exception", e);
     98             return null;
     99         }
    100     }
    101 
    102     /**
    103      * <B>方法名称:</B>将对象转为JSON字符串<BR>
    104      * <B>概要说明:</B>将对象转为JSON字符串<BR>
    105      * @author hezhuo.bai
    106      * @since 2019年1月15日 下午4:55:41
    107      * @param obj 任意对象
    108      * @return JSON字符串
    109      */
    110     public static String convertObjectToJSON(Object obj) {
    111         try {
    112             String text = JSON.toJSONString(obj);
    113             return text;
    114         } catch (Exception e) {
    115             log.error("convertObjectToJSON Exception", e);
    116             return null;
    117         }
    118     }
    119     
    120     /**
    121      * <B>方法名称:</B>将对象转为JSONObject对象<BR>
    122      * <B>概要说明:</B>将对象转为JSONObject对象<BR>
    123      * @author hezhuo.bai
    124      * @since 2019年1月15日 下午4:55:55
    125      * @param obj 任意对象
    126      * @return JSONObject对象
    127      */
    128     public static JSONObject convertObjectToJSONObject(Object obj){
    129         try {
    130             JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
    131             return jsonObject;
    132         } catch (Exception e) {
    133             log.error("convertObjectToJSONObject Exception", e);
    134             return null;
    135         }        
    136     }
    137 
    138     public static String convertObjectToJSONWithNullValue(Object obj) {
    139         try {
    140             String text = JSON.toJSONString(obj, featuresWithNullValue);
    141             return text;
    142         } catch (Exception e) {
    143             log.error("convertObjectToJSONWithNullValue Exception", e);
    144             return null;
    145         }
    146     }
    147 }
    FastJsonConvertUtil.java

    NetUtil.java

      1 package com.tianjh.demo.util;
      2 
      3 import java.lang.management.ManagementFactory;
      4 import java.lang.management.RuntimeMXBean;
      5 import java.net.InetAddress;
      6 import java.net.NetworkInterface;
      7 import java.net.SocketAddress;
      8 import java.net.UnknownHostException;
      9 import java.nio.channels.SocketChannel;
     10 import java.util.Enumeration;
     11 import java.util.regex.Matcher;
     12 import java.util.regex.Pattern;
     13 
     14 /**
     15  * $NetUtil
     16  * 获取本机地址 端口号的工具类
     17  *  * @author hezhuo.bai
     18  *  * @since 2019年1月15日 下午4:53:28
     19  */
     20 public class NetUtil {   
     21     
     22     public static String normalizeAddress(String address){
     23         String[] blocks = address.split("[:]");
     24         if(blocks.length > 2){
     25             throw new IllegalArgumentException(address + " is invalid");
     26         }
     27         String host = blocks[0];
     28         int port = 80;
     29         if(blocks.length > 1){
     30             port = Integer.valueOf(blocks[1]);
     31         } else {
     32             address += ":"+port; //use default 80
     33         } 
     34         String serverAddr = String.format("%s:%d", host, port);
     35         return serverAddr;
     36     }
     37     
     38     public static String getLocalAddress(String address){
     39         String[] blocks = address.split("[:]");
     40         if(blocks.length != 2){
     41             throw new IllegalArgumentException(address + " is invalid address");
     42         } 
     43         String host = blocks[0];
     44         int port = Integer.valueOf(blocks[1]);
     45         
     46         if("0.0.0.0".equals(host)){
     47             return String.format("%s:%d",NetUtil.getLocalIp(), port);
     48         }
     49         return address;
     50     }
     51     
     52     private static int matchedIndex(String ip, String[] prefix){
     53         for(int i=0; i<prefix.length; i++){
     54             String p = prefix[i];
     55             if("*".equals(p)){ //*, assumed to be IP
     56                 if(ip.startsWith("127.") ||
     57                    ip.startsWith("10.") ||    
     58                    ip.startsWith("172.") ||
     59                    ip.startsWith("192.")){
     60                     continue;
     61                 }
     62                 return i;
     63             } else {
     64                 if(ip.startsWith(p)){
     65                     return i;
     66                 }
     67             } 
     68         }
     69         
     70         return -1;
     71     }
     72     
     73     public static String getLocalIp(String ipPreference) {
     74         if(ipPreference == null){
     75             ipPreference = "*>10>172>192>127";
     76         }
     77         String[] prefix = ipPreference.split("[> ]+");
     78         try {
     79             Pattern pattern = Pattern.compile("[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+");
     80             Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
     81             String matchedIp = null;
     82             int matchedIdx = -1;
     83             while (interfaces.hasMoreElements()) {
     84                 NetworkInterface ni = interfaces.nextElement();
     85                 Enumeration<InetAddress> en = ni.getInetAddresses(); 
     86                 while (en.hasMoreElements()) {
     87                     InetAddress addr = en.nextElement();
     88                     String ip = addr.getHostAddress();  
     89                     Matcher matcher = pattern.matcher(ip);
     90                     if (matcher.matches()) {  
     91                         int idx = matchedIndex(ip, prefix);
     92                         if(idx == -1) continue;
     93                         if(matchedIdx == -1){
     94                             matchedIdx = idx;
     95                             matchedIp = ip;
     96                         } else {
     97                             if(matchedIdx>idx){
     98                                 matchedIdx = idx;
     99                                 matchedIp = ip;
    100                             }
    101                         }
    102                     } 
    103                 } 
    104             } 
    105             if(matchedIp != null) return matchedIp;
    106             return "127.0.0.1";
    107         } catch (Exception e) { 
    108             return "127.0.0.1";
    109         }
    110     }
    111     
    112     public static String getLocalIp() {
    113         return getLocalIp("*>10>172>192>127");
    114     }
    115     
    116     public static String remoteAddress(SocketChannel channel){
    117         SocketAddress addr = channel.socket().getRemoteSocketAddress();
    118         String res = String.format("%s", addr);
    119         return res;
    120     }
    121     
    122     public static String localAddress(SocketChannel channel){
    123         SocketAddress addr = channel.socket().getLocalSocketAddress();
    124         String res = String.format("%s", addr);
    125         return addr==null? res: res.substring(1);
    126     }
    127     
    128     public static String getPid(){
    129         RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    130         String name = runtime.getName();
    131         int index = name.indexOf("@");
    132         if (index != -1) {
    133             return name.substring(0, index);
    134         }
    135         return null;
    136     }
    137     
    138     public static String getLocalHostName() {
    139         try {
    140             return (InetAddress.getLocalHost()).getHostName();
    141         } catch (UnknownHostException uhe) {
    142             String host = uhe.getMessage();
    143             if (host != null) {
    144                 int colon = host.indexOf(':');
    145                 if (colon > 0) {
    146                     return host.substring(0, colon);
    147                 }
    148             }
    149             return "UnknownHost";
    150         }
    151     }
    152 }
    NetUtil.java

    SetMDC.java

     1 package com.tianjh.demo.util;
     2 
     3 import org.jboss.logging.MDC;
     4 import org.springframework.context.EnvironmentAware;
     5 import org.springframework.core.env.Environment;
     6 import org.springframework.stereotype.Component;
     7 
     8 @Component
     9 public class SetMDC implements EnvironmentAware {
    10 
    11     private static Environment environment;
    12     
    13     @Override
    14     public void setEnvironment(Environment environment) {
    15         SetMDC.environment = environment;
    16     }
    17     
    18     public static void putMDC() {
    19         MDC.put("hostName", NetUtil.getLocalHostName());
    20         MDC.put("ip", NetUtil.getLocalIp());
    21         MDC.put("applicationName", environment.getProperty("spring.application.name"));
    22     }
    23 
    24 }
    SetMDC.java

    及关键配置文件

    pom.xml

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     4     <modelVersion>4.0.0</modelVersion>
     5     <parent>
     6         <groupId>org.springframework.boot</groupId>
     7         <artifactId>spring-boot-starter-parent</artifactId>
     8         <version>2.1.5.RELEASE</version>
     9         <relativePath/> <!-- lookup parent from repository -->
    10     </parent>
    11     <groupId>com.tianjh</groupId>
    12     <artifactId>demo</artifactId>
    13     <version>0.0.1-SNAPSHOT</version>
    14     <name>demo</name>
    15     <description>Demo project for Spring Boot</description>
    16 
    17     <properties>
    18         <java.version>1.8</java.version>
    19     </properties>
    20 
    21 
    22     <dependencies>
    23         <dependency>
    24             <groupId>org.springframework.boot</groupId>
    25             <artifactId>spring-boot-starter-web</artifactId>
    26             <!--     排除spring-boot-starter-logging -->
    27             <exclusions>
    28                 <exclusion>
    29                     <groupId>org.springframework.boot</groupId>
    30                     <artifactId>spring-boot-starter-logging</artifactId>
    31                 </exclusion>
    32             </exclusions>
    33         </dependency>
    34 
    35         <dependency>
    36             <groupId>org.springframework.boot</groupId>
    37             <artifactId>spring-boot-starter-test</artifactId>
    38             <scope>test</scope>
    39         </dependency>
    40         <dependency>
    41             <groupId>org.projectlombok</groupId>
    42             <artifactId>lombok</artifactId>
    43         </dependency>
    44         <!-- log4j2 -->
    45         <dependency>
    46             <groupId>org.springframework.boot</groupId>
    47             <artifactId>spring-boot-starter-log4j2</artifactId>
    48         </dependency>
    49         <dependency>
    50             <groupId>com.lmax</groupId>
    51             <artifactId>disruptor</artifactId>
    52             <version>3.3.4</version>
    53         </dependency>
    54 
    55         <dependency>
    56             <groupId>com.alibaba</groupId>
    57             <artifactId>fastjson</artifactId>
    58             <version>1.2.58</version>
    59         </dependency>
    60 
    61     </dependencies>
    62 
    63     <build>
    64         <finalName>demo</finalName>
    65         <!-- 打包时包含properties、xml -->
    66         <resources>
    67             <resource>
    68                 <directory>src/main/java</directory>
    69                 <includes>
    70                     <include>**/*.properties</include>
    71                     <include>**/*.xml</include>
    72                 </includes>
    73                 <!-- 是否替换资源中的属性-->
    74                 <filtering>true</filtering>
    75             </resource>
    76             <resource>
    77                 <directory>src/main/resources</directory>
    78             </resource>
    79         </resources>
    80 
    81         <plugins>
    82             <plugin>
    83                 <groupId>org.springframework.boot</groupId>
    84                 <artifactId>spring-boot-maven-plugin</artifactId>
    85                 <configuration>
    86                     <mainClass>com.tianjh.demo.Application</mainClass>
    87                 </configuration>
    88             </plugin>
    89         </plugins>
    90     </build>
    91 
    92 </project>
    pom.xml

    Spring配置文件

    1 server.servlet.context-path=/
    2 server.port=8001
    3 
    4 spring.application.name=demo
    5 spring.http.encoding.charset=UTF-8
    6 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    7 spring.jackson.time-zone=GMT+8
    8 spring.jackson.default-property-inclusion=NON_NULL
    application.properties

    日志配置文件

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
     3     <Properties>
     4         <!--        日志要输出的文件-->
     5         <Property name="LOG_HOME">logs</Property>
     6         <!--        项目名称-->
     7         <property name="FILE_NAME">demo</property>
     8         <!--        日志输出格式-->
     9         <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger]
    10             [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
    11         </property>
    12     </Properties>
    13     <Appenders>
    14         <Console name="CONSOLE" target="SYSTEM_OUT">
    15             <PatternLayout pattern="${patternLayout}"/>
    16         </Console>
    17         <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/all-${FILE_NAME}.log"
    18                                  filePattern="${LOG_HOME}/all-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
    19             <PatternLayout pattern="${patternLayout}"/>
    20             <Policies>
    21                 <TimeBasedTriggeringPolicy interval="1"/>
    22                 <SizeBasedTriggeringPolicy size="500MB"/>
    23             </Policies>
    24             <DefaultRolloverStrategy max="20"/>
    25         </RollingRandomAccessFile>
    26         <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/err-${FILE_NAME}.log"
    27                                  filePattern="${LOG_HOME}/err-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
    28             <PatternLayout pattern="${patternLayout}"/>
    29             <Filters>
    30                 <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
    31             </Filters>
    32             <Policies>
    33                 <TimeBasedTriggeringPolicy interval="1"/>
    34                 <SizeBasedTriggeringPolicy size="500MB"/>
    35             </Policies>
    36             <DefaultRolloverStrategy max="20"/>
    37         </RollingRandomAccessFile>
    38     </Appenders>
    39     <Loggers>
    40         <!-- 业务相关 异步logger -->
    41         <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true">
    42             <AppenderRef ref="appAppender"/>
    43         </AsyncLogger>
    44         <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true">
    45             <AppenderRef ref="errorAppender"/>
    46         </AsyncLogger>
    47         <Root level="info">
    48             <Appender-Ref ref="CONSOLE"/>
    49             <Appender-Ref ref="appAppender"/>
    50             <AppenderRef ref="errorAppender"/>
    51         </Root>
    52     </Loggers>
    53 </Configuration>
    log4j2.xml

     打包上传到192.168.232.6这台服务器进行运行

    运行之后调用该项目的index方法

    在项目指定的文件夹里生成了咋们所要的日志文件

    参考前面的链接安装好所有环境后,filebeat、kafka、Logstash、es都应该配置好了

    接下来就结合filebeat(生产者-Producer)、kafka(broker)、Logstash(消费者-Consumer)

    实现fielbeat从demo.jar项目输出的日志文件logs下读取all-demo.log 、err-demo.log两个日志文件,然后

    把相应日志数据发送到kafka中,再由Logstash到Kafka中获取数据进行消费。

    在这个过程中需要在kafka中新增两个topic 

     ./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic all-log-demo

    --partitions 1 --replication-factor 1 

     ./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic err-log-demo

    --partitions 1 --replication-factor 1 

    也就是采集到的all-demo.log日志数据放入topic:all-log-demo 中,而采集到的err-demo.log日

    志数据放入topic:err-log-demo 中。

    五、测试

    启动demo.jar、filebeat、kafka、logstash

    启动demo.jar之前先删除掉之前测试的日志文件all-demo.log 、err-demo.log

    启动demo.jar

    启动kafka

     

     启动Logstash

     

    随后通过浏览器访问:

    http://192.168.232.6:8001/err

    http://192.168.232.6:8001/index 两个地址来调用index、err方法打印日志文件。

    demo.jar 项目控制台输出了日志---调用index方法

    demo.jar 项目控制台输出了日志---调用err方法

    在调用上述两个方法之后,filebeat会将日志数据发送到kafka

    通过使用如下的命令进行查看消费情况:

     ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe

    --group all-log-group 

     ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe

    --group err-log-group 

    在kafka上也有相应的日志消费记录情况,( all-log-group、err-log-group )是在Logstash中进

    行配置的。

     

    通过前面的几个流程之后,现在日志数据到达了kafka-broker之上,现在就需要用logstash来进

    行消费数据,logstash上也实时进行数据消费,如下图所示是全量日志过滤:

    all-log-demo.log 日志

     

    err-log-demo.log 日志

    通过上述的一系列操作,简单的实现了日志数据的生成、采集、过滤。这其中最为重要也最核心

    的地方就是kafka,利用kafka的高性能来缓存filebeat生成的海量数据,从而让logstash慢慢的进

    行消费,当然上面的例子并不能体现出kafka处理海量数据的能力。

  • 相关阅读:
    开采镍矿与冶炼加工镍的上市公司一览(转载)
    2010年螺纹期货基本走势分析(个人分析原创文章)
    从松柏那转载的ajax类
    推荐30个可以养站的博客(转载)
    年报掘金:机构增仓路线图曝光(20100306转载)
    一个人过习惯了,两个人不知道怎么过。。。
    各大搜索引擎的网站登录入口(转载)
    志向
    主力做庄骗术
    网上发现襄阳市场,说是正品,初步推断举步维艰
  • 原文地址:https://www.cnblogs.com/jhtian/p/13744827.html
Copyright © 2011-2022 走看看