ELK+Kafka+Beats实现海量日志收集平台(二)
三、环境搭建
通过上一小节应用场景和实现原理的介绍,接下来实现所需环境搭建及说明
架构图如下所示:
环境说明:
192.168.232.6 : 部署了demo项目(用于产生数据日志)
filebeat-6.6.0
192.168.232.3 : Kafka (单体)(Zookeeper:192.168.232.3~5)
192.168.232.4 : Kibana
192.168.232.7 : Logstash (单体)
ES集群:192.168.232.8~10
1、filebeat安装配置参考https://www.cnblogs.com/jhtian/p/13731230.html
2、Kafka安装配置参考https://www.cnblogs.com/jhtian/p/13708679.html
3、Logstash安装配置参考https://www.cnblogs.com/jhtian/p/13744753.html
4、ES集群搭建可参考https://www.cnblogs.com/jhtian/p/12703651.html
5、Kibana安装可参考https://www.cnblogs.com/jhtian/p/13785029.html
四、部署demo工程项目:
项目结构图如下,分别调用项目的 /index、/error两个方法分别打印正常、错误日志
(warn及以上级别日志)到logs文件夹中,作为filebeat读取数据的来源。
web访问类文件:indexAction.java
1 package com.tianjh.demo.web; 2 3 import com.tianjh.demo.util.SetMDC; 4 import lombok.extern.slf4j.Slf4j; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.RestController; 7 8 @Slf4j 9 @RestController 10 public class indexAction { 11 12 @RequestMapping(value = "/index") 13 public String index() { 14 SetMDC.putMDC(); 15 log.info("这是一条模拟error日志打印"); 16 log.info("这是一条模拟warn日志打印"); 17 log.info("这是一条模拟info日志打印"); 18 return "hello word"; 19 } 20 21 @RequestMapping(value = "/err") 22 public String error() { 23 SetMDC.putMDC(); 24 try { 25 int a = 5/0; 26 } catch (Exception e) { 27 log.error("算术异常", e); 28 } 29 return "error"; 30 } 31 }
工具类Utils
FastJsonConvertUtil.java
1 package com.tianjh.demo.util; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 6 import com.alibaba.fastjson.JSON; 7 import com.alibaba.fastjson.JSONObject; 8 import com.alibaba.fastjson.serializer.SerializerFeature; 9 10 import lombok.extern.slf4j.Slf4j; 11 12 /** 13 * $FastJsonConvertUtil 14 * @author hezhuo.bai 15 * @since 2019年1月15日 下午4:53:28 16 */ 17 @Slf4j 18 public class FastJsonConvertUtil { 19 20 private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse, 21 SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty }; 22 23 /** 24 * <B>方法名称:</B>将JSON字符串转换为实体对象<BR> 25 * <B>概要说明:</B>将JSON字符串转换为实体对象<BR> 26 * @author hezhuo.bai 27 * @since 2019年1月15日 下午4:53:49 28 * @param data JSON字符串 29 * @param clzss 转换对象 30 * @return T 31 */ 32 public static <T> T convertJSONToObject(String data, Class<T> clzss) { 33 try { 34 T t = JSON.parseObject(data, clzss); 35 return t; 36 } catch (Exception e) { 37 log.error("convertJSONToObject Exception", e); 38 return null; 39 } 40 } 41 42 /** 43 * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR> 44 * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR> 45 * @author hezhuo.bai 46 * @since 2019年1月15日 下午4:54:32 47 * @param data JSONObject对象 48 * @param clzss 转换对象 49 * @return T 50 */ 51 public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) { 52 try { 53 T t = JSONObject.toJavaObject(data, clzss); 54 return t; 55 } catch (Exception e) { 56 log.error("convertJSONToObject Exception", e); 57 return null; 58 } 59 } 60 61 /** 62 * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR> 63 * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR> 64 * @author hezhuo.bai 65 * @since 2019年1月15日 下午4:54:50 66 * @param data JSON字符串数组 67 * @param clzss 转换对象 68 * @return List<T>集合对象 69 */ 70 public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) { 71 try { 72 List<T> t = JSON.parseArray(data, clzss); 73 return t; 74 } catch (Exception e) { 75 log.error("convertJSONToArray Exception", e); 76 return null; 77 } 78 } 79 80 /** 81 * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR> 82 * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR> 83 * @author hezhuo.bai 84 * @since 2019年1月15日 下午4:55:11 85 * @param data List<JSONObject> 86 * @param clzss 转换对象 87 * @return List<T>集合对象 88 */ 89 public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) { 90 try { 91 List<T> t = new ArrayList<T>(); 92 for (JSONObject jsonObject : data) { 93 t.add(convertJSONToObject(jsonObject, clzss)); 94 } 95 return t; 96 } catch (Exception e) { 97 log.error("convertJSONToArray Exception", e); 98 return null; 99 } 100 } 101 102 /** 103 * <B>方法名称:</B>将对象转为JSON字符串<BR> 104 * <B>概要说明:</B>将对象转为JSON字符串<BR> 105 * @author hezhuo.bai 106 * @since 2019年1月15日 下午4:55:41 107 * @param obj 任意对象 108 * @return JSON字符串 109 */ 110 public static String convertObjectToJSON(Object obj) { 111 try { 112 String text = JSON.toJSONString(obj); 113 return text; 114 } catch (Exception e) { 115 log.error("convertObjectToJSON Exception", e); 116 return null; 117 } 118 } 119 120 /** 121 * <B>方法名称:</B>将对象转为JSONObject对象<BR> 122 * <B>概要说明:</B>将对象转为JSONObject对象<BR> 123 * @author hezhuo.bai 124 * @since 2019年1月15日 下午4:55:55 125 * @param obj 任意对象 126 * @return JSONObject对象 127 */ 128 public static JSONObject convertObjectToJSONObject(Object obj){ 129 try { 130 JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj); 131 return jsonObject; 132 } catch (Exception e) { 133 log.error("convertObjectToJSONObject Exception", e); 134 return null; 135 } 136 } 137 138 public static String convertObjectToJSONWithNullValue(Object obj) { 139 try { 140 String text = JSON.toJSONString(obj, featuresWithNullValue); 141 return text; 142 } catch (Exception e) { 143 log.error("convertObjectToJSONWithNullValue Exception", e); 144 return null; 145 } 146 } 147 }
NetUtil.java
1 package com.tianjh.demo.util; 2 3 import java.lang.management.ManagementFactory; 4 import java.lang.management.RuntimeMXBean; 5 import java.net.InetAddress; 6 import java.net.NetworkInterface; 7 import java.net.SocketAddress; 8 import java.net.UnknownHostException; 9 import java.nio.channels.SocketChannel; 10 import java.util.Enumeration; 11 import java.util.regex.Matcher; 12 import java.util.regex.Pattern; 13 14 /** 15 * $NetUtil 16 * 获取本机地址 端口号的工具类 17 * * @author hezhuo.bai 18 * * @since 2019年1月15日 下午4:53:28 19 */ 20 public class NetUtil { 21 22 public static String normalizeAddress(String address){ 23 String[] blocks = address.split("[:]"); 24 if(blocks.length > 2){ 25 throw new IllegalArgumentException(address + " is invalid"); 26 } 27 String host = blocks[0]; 28 int port = 80; 29 if(blocks.length > 1){ 30 port = Integer.valueOf(blocks[1]); 31 } else { 32 address += ":"+port; //use default 80 33 } 34 String serverAddr = String.format("%s:%d", host, port); 35 return serverAddr; 36 } 37 38 public static String getLocalAddress(String address){ 39 String[] blocks = address.split("[:]"); 40 if(blocks.length != 2){ 41 throw new IllegalArgumentException(address + " is invalid address"); 42 } 43 String host = blocks[0]; 44 int port = Integer.valueOf(blocks[1]); 45 46 if("0.0.0.0".equals(host)){ 47 return String.format("%s:%d",NetUtil.getLocalIp(), port); 48 } 49 return address; 50 } 51 52 private static int matchedIndex(String ip, String[] prefix){ 53 for(int i=0; i<prefix.length; i++){ 54 String p = prefix[i]; 55 if("*".equals(p)){ //*, assumed to be IP 56 if(ip.startsWith("127.") || 57 ip.startsWith("10.") || 58 ip.startsWith("172.") || 59 ip.startsWith("192.")){ 60 continue; 61 } 62 return i; 63 } else { 64 if(ip.startsWith(p)){ 65 return i; 66 } 67 } 68 } 69 70 return -1; 71 } 72 73 public static String getLocalIp(String ipPreference) { 74 if(ipPreference == null){ 75 ipPreference = "*>10>172>192>127"; 76 } 77 String[] prefix = ipPreference.split("[> ]+"); 78 try { 79 Pattern pattern = Pattern.compile("[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+"); 80 Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); 81 String matchedIp = null; 82 int matchedIdx = -1; 83 while (interfaces.hasMoreElements()) { 84 NetworkInterface ni = interfaces.nextElement(); 85 Enumeration<InetAddress> en = ni.getInetAddresses(); 86 while (en.hasMoreElements()) { 87 InetAddress addr = en.nextElement(); 88 String ip = addr.getHostAddress(); 89 Matcher matcher = pattern.matcher(ip); 90 if (matcher.matches()) { 91 int idx = matchedIndex(ip, prefix); 92 if(idx == -1) continue; 93 if(matchedIdx == -1){ 94 matchedIdx = idx; 95 matchedIp = ip; 96 } else { 97 if(matchedIdx>idx){ 98 matchedIdx = idx; 99 matchedIp = ip; 100 } 101 } 102 } 103 } 104 } 105 if(matchedIp != null) return matchedIp; 106 return "127.0.0.1"; 107 } catch (Exception e) { 108 return "127.0.0.1"; 109 } 110 } 111 112 public static String getLocalIp() { 113 return getLocalIp("*>10>172>192>127"); 114 } 115 116 public static String remoteAddress(SocketChannel channel){ 117 SocketAddress addr = channel.socket().getRemoteSocketAddress(); 118 String res = String.format("%s", addr); 119 return res; 120 } 121 122 public static String localAddress(SocketChannel channel){ 123 SocketAddress addr = channel.socket().getLocalSocketAddress(); 124 String res = String.format("%s", addr); 125 return addr==null? res: res.substring(1); 126 } 127 128 public static String getPid(){ 129 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 130 String name = runtime.getName(); 131 int index = name.indexOf("@"); 132 if (index != -1) { 133 return name.substring(0, index); 134 } 135 return null; 136 } 137 138 public static String getLocalHostName() { 139 try { 140 return (InetAddress.getLocalHost()).getHostName(); 141 } catch (UnknownHostException uhe) { 142 String host = uhe.getMessage(); 143 if (host != null) { 144 int colon = host.indexOf(':'); 145 if (colon > 0) { 146 return host.substring(0, colon); 147 } 148 } 149 return "UnknownHost"; 150 } 151 } 152 }
SetMDC.java
1 package com.tianjh.demo.util; 2 3 import org.jboss.logging.MDC; 4 import org.springframework.context.EnvironmentAware; 5 import org.springframework.core.env.Environment; 6 import org.springframework.stereotype.Component; 7 8 @Component 9 public class SetMDC implements EnvironmentAware { 10 11 private static Environment environment; 12 13 @Override 14 public void setEnvironment(Environment environment) { 15 SetMDC.environment = environment; 16 } 17 18 public static void putMDC() { 19 MDC.put("hostName", NetUtil.getLocalHostName()); 20 MDC.put("ip", NetUtil.getLocalIp()); 21 MDC.put("applicationName", environment.getProperty("spring.application.name")); 22 } 23 24 }
及关键配置文件
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.5.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.tianjh</groupId> 12 <artifactId>demo</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>demo</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <java.version>1.8</java.version> 19 </properties> 20 21 22 <dependencies> 23 <dependency> 24 <groupId>org.springframework.boot</groupId> 25 <artifactId>spring-boot-starter-web</artifactId> 26 <!-- 排除spring-boot-starter-logging --> 27 <exclusions> 28 <exclusion> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-logging</artifactId> 31 </exclusion> 32 </exclusions> 33 </dependency> 34 35 <dependency> 36 <groupId>org.springframework.boot</groupId> 37 <artifactId>spring-boot-starter-test</artifactId> 38 <scope>test</scope> 39 </dependency> 40 <dependency> 41 <groupId>org.projectlombok</groupId> 42 <artifactId>lombok</artifactId> 43 </dependency> 44 <!-- log4j2 --> 45 <dependency> 46 <groupId>org.springframework.boot</groupId> 47 <artifactId>spring-boot-starter-log4j2</artifactId> 48 </dependency> 49 <dependency> 50 <groupId>com.lmax</groupId> 51 <artifactId>disruptor</artifactId> 52 <version>3.3.4</version> 53 </dependency> 54 55 <dependency> 56 <groupId>com.alibaba</groupId> 57 <artifactId>fastjson</artifactId> 58 <version>1.2.58</version> 59 </dependency> 60 61 </dependencies> 62 63 <build> 64 <finalName>demo</finalName> 65 <!-- 打包时包含properties、xml --> 66 <resources> 67 <resource> 68 <directory>src/main/java</directory> 69 <includes> 70 <include>**/*.properties</include> 71 <include>**/*.xml</include> 72 </includes> 73 <!-- 是否替换资源中的属性--> 74 <filtering>true</filtering> 75 </resource> 76 <resource> 77 <directory>src/main/resources</directory> 78 </resource> 79 </resources> 80 81 <plugins> 82 <plugin> 83 <groupId>org.springframework.boot</groupId> 84 <artifactId>spring-boot-maven-plugin</artifactId> 85 <configuration> 86 <mainClass>com.tianjh.demo.Application</mainClass> 87 </configuration> 88 </plugin> 89 </plugins> 90 </build> 91 92 </project>
Spring配置文件
1 server.servlet.context-path=/ 2 server.port=8001 3 4 spring.application.name=demo 5 spring.http.encoding.charset=UTF-8 6 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss 7 spring.jackson.time-zone=GMT+8 8 spring.jackson.default-property-inclusion=NON_NULL
日志配置文件
1 <?xml version="1.0" encoding="UTF-8"?> 2 <Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600"> 3 <Properties> 4 <!-- 日志要输出的文件--> 5 <Property name="LOG_HOME">logs</Property> 6 <!-- 项目名称--> 7 <property name="FILE_NAME">demo</property> 8 <!-- 日志输出格式--> 9 <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] 10 [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n 11 </property> 12 </Properties> 13 <Appenders> 14 <Console name="CONSOLE" target="SYSTEM_OUT"> 15 <PatternLayout pattern="${patternLayout}"/> 16 </Console> 17 <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/all-${FILE_NAME}.log" 18 filePattern="${LOG_HOME}/all-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"> 19 <PatternLayout pattern="${patternLayout}"/> 20 <Policies> 21 <TimeBasedTriggeringPolicy interval="1"/> 22 <SizeBasedTriggeringPolicy size="500MB"/> 23 </Policies> 24 <DefaultRolloverStrategy max="20"/> 25 </RollingRandomAccessFile> 26 <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/err-${FILE_NAME}.log" 27 filePattern="${LOG_HOME}/err-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"> 28 <PatternLayout pattern="${patternLayout}"/> 29 <Filters> 30 <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> 31 </Filters> 32 <Policies> 33 <TimeBasedTriggeringPolicy interval="1"/> 34 <SizeBasedTriggeringPolicy size="500MB"/> 35 </Policies> 36 <DefaultRolloverStrategy max="20"/> 37 </RollingRandomAccessFile> 38 </Appenders> 39 <Loggers> 40 <!-- 业务相关 异步logger --> 41 <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true"> 42 <AppenderRef ref="appAppender"/> 43 </AsyncLogger> 44 <AsyncLogger name="com.tianjh.*" level="info" includeLocation="true"> 45 <AppenderRef ref="errorAppender"/> 46 </AsyncLogger> 47 <Root level="info"> 48 <Appender-Ref ref="CONSOLE"/> 49 <Appender-Ref ref="appAppender"/> 50 <AppenderRef ref="errorAppender"/> 51 </Root> 52 </Loggers> 53 </Configuration>
打包上传到192.168.232.6这台服务器进行运行
运行之后调用该项目的index方法
在项目指定的文件夹里生成了咋们所要的日志文件
参考前面的链接安装好所有环境后,filebeat、kafka、Logstash、es都应该配置好了
接下来就结合filebeat(生产者-Producer)、kafka(broker)、Logstash(消费者-Consumer)
实现fielbeat从demo.jar项目输出的日志文件logs下读取all-demo.log 、err-demo.log两个日志文件,然后
把相应日志数据发送到kafka中,再由Logstash到Kafka中获取数据进行消费。
在这个过程中需要在kafka中新增两个topic
./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic all-log-demo
--partitions 1 --replication-factor 1
./kafka-topics.sh --zookeeper 192.168.232.3:2181 --create --topic err-log-demo
--partitions 1 --replication-factor 1
也就是采集到的all-demo.log日志数据放入topic:all-log-demo 中,而采集到的err-demo.log日
志数据放入topic:err-log-demo 中。
五、测试
启动demo.jar、filebeat、kafka、logstash
启动demo.jar之前先删除掉之前测试的日志文件all-demo.log 、err-demo.log
启动demo.jar
启动kafka
启动Logstash
随后通过浏览器访问:
http://192.168.232.6:8001/err
http://192.168.232.6:8001/index 两个地址来调用index、err方法打印日志文件。
demo.jar 项目控制台输出了日志---调用index方法
demo.jar 项目控制台输出了日志---调用err方法
在调用上述两个方法之后,filebeat会将日志数据发送到kafka
通过使用如下的命令进行查看消费情况:
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe
--group all-log-group
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe
--group err-log-group
在kafka上也有相应的日志消费记录情况,( all-log-group、err-log-group )是在Logstash中进
行配置的。
通过前面的几个流程之后,现在日志数据到达了kafka-broker之上,现在就需要用logstash来进
行消费数据,logstash上也实时进行数据消费,如下图所示是全量日志过滤:
all-log-demo.log 日志
err-log-demo.log 日志
通过上述的一系列操作,简单的实现了日志数据的生成、采集、过滤。这其中最为重要也最核心
的地方就是kafka,利用kafka的高性能来缓存filebeat生成的海量数据,从而让logstash慢慢的进
行消费,当然上面的例子并不能体现出kafka处理海量数据的能力。