一、ELK搭建
1.ES搭建
2.logstash搭建
1)安装java环境
2)安装logstash
3)配置环境变量
4)logstash的插件
INPUT:插件使Logstash能够读取特定的事件源。
OUTPUT:插件将事件数据发送到特定的目的地,OUTPUT是事件流水线中的最后阶段。
INPUT支持事件源 |
OUTPUT支持输出源 |
CODEC编解码器支持编码 |
|
|
|
azure_event_hubs(微软云事件中心) |
elasticsearch(搜索引擎数据库) |
avro(数据序列化) |
beats(filebeat日志收集工具) |
email(邮件) |
CEF(嵌入式框架) |
elasticsearch(搜索引擎数据库) |
file(文件) |
es_bulk(ES中的bulk api) |
file(文件) |
http(超文本传输协议) |
Json(数据序列化、格式化) |
generator(生成器) |
kafka(基于java的消息队列) |
Json_lines(便于存储结构化) |
heartbeat(高可用软件) |
rabbitmq(消息队列 OpenStack) |
line(行) |
http_poller(http api) |
redis(缓存、消息队列、NoSQL) |
multiline(多行匹配) |
jdbc(java连接数据库的驱动) |
s3*(存储) |
plain(纯文本,事件间无间隔) |
kafka(基于java的消息队列) |
stdout(标准输出) |
rubydebug(ruby语法格式) |
rabbitmq(消息队列 OpenStack) |
tcp(传输控制协议) |
|
redis(缓存、消息队列、NoSQL) |
udp(用户数据报协议) |
|
s3*(存储) |
|
|
stdin(标准输入) |
|
|
syslog(系统日志) |
|
|
tcp(传输控制协议) |
|
|
udp(用户数据报协议) |
|
|
5)logstash练习
1.logstash收集标准输入到标准输出
[root@web01 ~]# logstash -e 'input { stdin {} } output { stdout {} }'
2.logstash收集标准输入指定格式到标准输出
[root@web01 ~]# logstash -e 'input { stdin {} } output { stdout { codec => "rubydebug" } }'
3.logstash收集标准输入到文件
[root@web01 ~]# logstash -e 'input { stdin {} } output { file { path => "/tmp/1.txt" } }'
4.logstash收集标准输入到ES
[root@web01 ~]# logstash -e 'input { stdin {} } output { elasticsearch { hosts => ["10.0.0.71:9200"] index => "test_%{+YYYY-MM-dd}" } }'
3.kibana搭建
1)安装java环境
2)安装kibana
3)配置kibana
4)启动
5)kibana区域定义
1.时间区域:
1)快速查询
2)时间范围查询
3)精确时间范围查询
4)使用过的时间区域
2.搜索区域:
可以根据关键字,状态码等搜索需要的数据
3.数据展示区域:
将收集的对应索引的日志展示在页面上
4.日志列表区域:
可以选择需要的字段进行数据的展示,不需要的可以移除
二、logstash使用学习
不难理解,我们的日志通常都是在日志文件中存储的,所以,当我们在使用INPUT插件时,收集日志,需要使用file模块,从文件中读取日志的内容,那么接下来讲解的是,将日志内容输出到另一个文件中,如此一来,我们可以将日志文件同意目录,方便查找。
注意:Logstash与其他服务不同,收集日志的配置文件需要我们根据实际情况自己去写。
前提:需要Logstash对被收集的日志文件有读的,并且对要写入的文件,有写入的权限。
1.logstash配置文件
[root@web01 ~]# vim /etc/logstash/logstash.yml
path.config: /etc/logstash/conf.d
2.logstash收集单个日志到文件
1)配置
[root@web01 ~]# cd /etc/logstash/conf.d/
[root@web01 /etc/logstash/conf.d]# vim message_file.conf
input {
file {
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
file {
path => "/tmp/messages_%{+YYYY-MM-dd}"
}
}
[root@web01 /etc/logstash/conf.d]# vim message_file.conf
#输入插件
input {
#文件模块
file {
#日志类型
type => "message-log"
#日志路径
path => "/var/log/messages"
#第一次收集日志从头开始
start_position => "beginning"
}
}
#输出插件
output {
#文件模块
file {
#输出路径
path => "/tmp/message_%{+yyyy.MM.dd}.log"
}
}
2)启动
#检测语法
[root@web01 ~]# logstash -f /etc/logstash/conf.d/message_file.conf -t
#启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/message_file.conf &
3)测试日志收集
#实时监控收集到的日志
[root@web01 ~]# tail -f /tmp/messages_2020-12-04
#手动添加一台日志
[root@web01 ~]# echo 111 >> /var/log/messages
3.logstash收集单个日志到ES
1)配置
[root@web01 ~]# vim /etc/logstash/conf.d/message_es.conf
input {
file {
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "message_%{+YYYY-MM-dd}"
}
}
2)启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/message_es.conf &
[2] 82713
4.logstash启动多实例
logstash收集日志时使用多实例方式启动,不是使用system管理启动,但是启动多实例会报错,怎么处理?
1)配置收集/var/log/secure日志
[root@web01 ~]# vim /etc/logstash/conf.d/secure_es.conf
input {
file {
path => "/var/log/secure"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "secure_%{+YYYY-MM-dd}"
}
}
2)启动多实例
logstash只启动一个不需要数据目录,如果想要启动多个进程,需要每个进程指定不同的数据目录,需要加 --path.data参数,然后可以启动多实例
1.创建数据目录
[root@web01 ~]# mkdir /data/logstash/messages_es -p
[root@web01 ~]# mkdir /data/logstash/secure_es -p
[root@web01 ~]# chown -R logstash.logstash /data/logstash/
2.分别指定数据目录再启动两个进程
[root@web01 ~]# logstash -f /etc/logstash/conf.d/message_es.conf --path.data=/data/logstash/messages_es &
[root@web01 ~]# logstash -f /etc/logstash/conf.d/secure_es.conf --path.data=/data/logstash/secure_es &
5.logstash收集多个日志到文件
1)配置
[root@web01 ~]# vim /etc/logstash/conf.d/morefile_file.conf
#输入的插件
input {
#文件模块
file {
#收集文件的路径
path => "/var/log/messages"
#第一次收集从头收集
start_position => "beginning"
#收集日志间隔时间3秒
stat_interval => "3"
}
#第二个文件模块
file {
#第二个收集日志的路径
path => "/var/log/secure"
}
}
#输出插件
output {
#输出时的文件模块
file {
#输出的文件路径
path => "/tmp/morefile.txt"
}
}
2)启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/morefile_file.conf &
3)验证
[root@web01 ~]# tail -f /tmp/morefile.txt
#手动添加文件
[root@web01 ~]# echo 111 >> /var/log/messages
[root@web01 ~]# echo 2222 >> /var/log/secure
6.logstash收集多个日志到ES
1)配置
[root@web01 ~]# vim /etc/logstash/conf.d/morefile_es.conf
input {
file {
path => "/var/log/messages"
}
file {
path => "/var/log/secure"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "/tmp/secure_%{+YYYY-MM-dd}"
}
}
2)启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/morefile_es.conf &
3)验证
7.收集多个日志到多个索引
1)方法一:
#配置
[root@web01 ~]# cat /etc/logstash/conf.d/morefile_es.conf
input {
file {
type => "messages_log"
path => "/var/log/messages"
}
file {
type => "secure_log"
path => "/var/log/secure"
}
}
output {
if [type] == "messages_log" {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "messages_log_%{+YYYY-MM-dd}"
}
}
if [type] == "secure_log" {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "secure_log_%{+YYYY-MM-dd}"
}
}
}
#启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/morefile_es.conf
2)方法二:
#配置
[root@web01 ~]# cat /etc/logstash/conf.d/second_morefile_es.conf
input {
file {
type => "messages_log"
path => "/var/log/messages"
}
file {
type => "secure_log"
path => "/var/log/secure"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "%{type}_%{+YYYY-MM-dd}"
}
}
#启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/second_morefile_es.conf
三、使用logstash收集nginx日志
1.修改nginx日志格式为json格式
[root@web01 ~]# cat /etc/nginx/nginx.conf
... ...
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
log_format json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"url":"$uri",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';
#access_log /var/log/nginx/access.log main;
access_log /var/log/nginx/access.log json;
sendfile on;
client_max_body_size 100M;
keepalive_timeout 65;
include /etc/nginx/conf.d/*.conf;
}
2.重启nginx访问查看日志
[root@web01 ~]# systemctl restart nginx
[root@web01 ~]# tail -f /var/log/nginx/access.log
{"@timestamp":"2020-12-04T17:39:22+08:00","host":"10.0.0.7","clientip":"10.0.0.1","size":0,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"10.0.0.7","url":"/index.html","referer":"-","agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36","status":"304"}
3.配置logstash收集nginx日志
[root@web01 ~]# vim /etc/logstash/conf.d/nginx_log_es.conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "end"
type => "access_log"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "nginx_access_log_%{+YYYY-MM-dd}"
}
}
4.启动并测试
[root@web01 ~]# logstash -f /etc/logstash/conf.d/nginx_log_es.conf
四、logstash收集tomcat日志
在企业中,我们看到tomcat日志遇到异常(exception)一条日志可能是几行或者十几行甚至几十行,组成的,那么,我们需要将多行日志变成一行日志,来收集
1.tomcat日志收集方式
这里我们有几种方式可以实现:
1.将日志改成Json格式
在企业中,想要将java日志改成json格式,并没有那么容易。
因为将日志改成Json格式,查看起来会很难受,有些开发人员不希望将日志格式改成Json的,所以,在改日志格式之前需要跟开发人员进行沟通,那么将tomcat日志格式改成Json格式也有两种方式。
1)开发自己更改,通过程序代码,或者log4j
2)运维修改tomcat的server配置文件
2.通过logstash的mutiline模块实现多行匹配
2.安装tomcat
1)安装java环境
2)安装tomcat
1.上传代码包
[root@web01 ~]# rz
[root@web01 ~]# ll
-rw-r--r-- 1 root root 11026056 2020-12-04 18:04 apache-tomcat-9.0.30.tar.gz
2.解压tomcat包
[root@web01 ~]# tar xf apache-tomcat-9.0.30.tar.gz
3.将安装包移动并改名
[root@web01 ~]# mv apache-tomcat-9.0.30 /usr/local/tomcat-9.0.30
4.做软连接
[root@web01 ~]# ln -s /usr/local/tomcat-9.0.30 /usr/local/tomcat
3)配置站点
1.写一个测试页面到站点目录下的index.html文件中
[root@web01 ~]# echo 'TEST elk' > /usr/local/tomcat/webapps/ROOT/index.html
2.启动tomcat
[root@web01 ~]# /usr/local/tomcat/bin/startup.sh
3.检测tomcat端口是否启动
[root@web01 ~]# netstat -lntup|grep 8080
tcp 0 0 :::8080 :::* LISTEN 12569/java
4)访问测试
http://10.0.0.7:8080/
3.配置logstash收集tomcat日志
1)配置
[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_log_es.conf
input {
file {
path => "/usr/local/tomcat/logs/localhost_access_log.*.txt"
start_position => "end"
type => "tomcat_log"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "tomcat_log_%{+YYYY-MM-dd}"
}
}
2)启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/tomcat_log_es.conf
4.收集tomcat的json格式日志方式一:
1)修改tomcat日志格式
[root@web01 ~]# vim /usr/local/tomcat/conf/server.xml
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="tomcat_access_json" suffix=".log"
pattern="{"clientip":"%h","ClientUser":"%l","authenticated":"%u","AccessTime":"%t","method":"%r","status":"%s","SendBytes":"%b","Query?string":"%q","partner":"%{Referer}i","AgentVersion":"%{User-Agent}i"}"/>
<!-- Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log" suffix=".txt"
pattern="%h %l %u %t "%r" %s %b" / -->
2)重启Tomcat
[root@web01 ~]# /usr/local/tomcat/bin/shutdown.sh
[root@web01 ~]# /usr/local/tomcat/bin/startup.sh
3)查看新的日志
[root@web01 ~]# tail -f /usr/local/tomcat/logs/tomcat_access_json.2020-12-07.log
{"clientip":"10.0.0.1","ClientUser":"-","authenticated":"-","AccessTime":"[07/Dec/2020:22:51:25 +0800]","method":"GET / HTTP/1.1","status":"200","SendBytes":"9","Query?string":"","partner":"-","AgentVersion":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36"}
4)配置logstash收集新的日志
[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_log_es.conf
input {
file {
path => "/usr/local/tomcat/logs/tomcat_access_json.*.log"
start_position => "end"
type => "tomcat_log"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "tomcat_json_log_%{+YYYY-MM-dd}"
}
}
5)启动服务
[root@web01 ~]# logstash -f /etc/logstash/conf.d/tomcat_log_es.conf
5.方式二:使用multiline插件收集java日志
使用codec的multiline插件实现多行匹配,这是一个可以将多行进行合并的插件,而且可以使用what指定将匹配到的行与前面的行合并还是和后面的行合并
帮助文档:https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html
因为目前tomcat日志中没有exception,所以,我们把Logstash部署在ES上,收集一下ES的java日志。
1)测试多行匹配
[root@web01 ~]# vim /etc/logstash/conf.d/stdin_stdout.conf
input {
stdin {
codec => multiline {
pattern => "^["
negate => "true"
what => "previous"
}
}
}
output {
stdout {}
}
2)启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/stdin_stdout.conf
3)测试
#测试输入一堆内容,并没有打印,只有当输入一个以 [ 开头的时候才会结束并输出
jhvc
jkhv
jhv
jc
[
{
"message" => "jhvc
jkhv
jhv
jc",
"@timestamp" => 2020-12-07T15:15:49.182Z,
"@version" => "1",
"tags" => [
[0] "multiline"
],
"host" => "web01"
}
4)收集java日志写入ES
[root@web01 ~]# cat /etc/logstash/conf.d/java_es.conf
input {
file {
path => "/usr/local/tomcat/logs/tomcat_access_json.2020-12-07.log"
start_position => "end"
codec => multiline {
pattern => "^["
negate => "true"
what => "previous"
}
}
}
output {
elasticsearch {
hosts => ['10.0.0.71:9200']
index => "tomcat_json_log_%{+YYYY-MM-DD}"
}
}
5)启动
[root@web01 ~]# logstash -f /etc/logstash/conf.d/java_es.conf &
6)测试收集日志
[root@web01 ~]# cat tomcat.log >> /usr/local/tomcat/logs/tomcat_access_json.2020-12-07.log
7)去页面查看
五、kibana页面作图(简单)
查看日志,就算改成json格式,kibana收集到之后展示时还是一坨,我们想要用数据作图,必须将其修改为json格式,让索引支持使用日志中的key
1.修改logstash获取Tomcat日志作图
[root@web01 ~]# cat /etc/logstash/conf.d/java_es.conf
input {
file {
path => "/usr/local/tomcat/logs/tomcat_access_json.*.log"
start_position => "end"
}
}
#将收集的日志处理
filter {
#转换json格式
json {
#转换的字段
source => "message"
#转换后移除
remove_field => [ "message" ]
}
}
output {
elasticsearch {
hosts => ['10.0.0.71:9200']
index => "tomcat_json_log_%{+YYYY-MM-dd}"
}
}
2.方法二:
[root@web01 ~]# vim /etc/logstash/conf.d/java_es.conf
input {
file {
path => "/usr/local/tomcat/logs/tomcat_access_json.*.log"
start_position => "end"
codec => "json"
}
}
output {
elasticsearch {
hosts => ['10.0.0.71:9200']
index => "tomcat_json_log_%{+YYYY-MM-dd}"
}
}
3.尝试作图
1.都能收集的数据字段有感叹号时,刷新索引即可,删除重建也行
2.转换为json格式的日志后,将转换的字段删除
六、Logstash与Redis那点事
在企业中,日志规模的量级远远超出我们的想象,这就是为什么会有一家公司 日志易 专门做日志收集,给大型金融公司收集日志,比如银行,因为你有可能看到,1秒钟好几千万的日志量,往服务器写入,那么企业中的集群,架构都不是单台的,而是多台的,一台如果是1千万,那么5台的量级,10台的量级,我们要对他们进行收集,进行分析,难免会在网络传输过程中,丢数据。
日志是什么?
日志对于企业来说,有什么作用?
用户使用我们的产品,体验如何?
用户的客诉,我们能拿出什么样的数据来说话?
...
一系列的问题,都和日志相关,如果至关重要的那个数据丢失了,那么公司的损失可不仅仅是一条日志那么简单。如果我们不知道,用户对我们产品最感兴趣的地方在哪,那么产品的寿命也就越来越短。如果被攻击了,恶意攻击的IP源我们都找不到,那么或许就不是产品的寿命越来越短,而是这个企业存在的寿命,越来越短。
好吧,一顿排比句,说的那么浮夸,说白了,我就是想要告诉你们,一个大规模日志量级的企业想要做到数据的安全性,数据的一致性,我们需要消息队列:Redis , Kafka,在ELK5版本中,建议使用Redis来做消息队列,Kafka能不能用?也能,只不过会有一些不必要的坑,需要我们去爬。在ELK6版本中,开始使用Kafka来做消息队列。
话不多说,我们接下来就开始将Logstash收集到的日志,输出到Redis中。
0.环境准备
主机 |
IP |
服务 |
web01 |
10.0.0.7 |
nginx,tomcat,logstash |
redis01 |
10.0.0.81 |
redis |
redis02 |
10.0.0.82 |
logstash |
es01 |
10.0.0.91 |
ES,kibana |
1.收集日志到redis
1)安装redis
[root@redis01 ~]# yum install -y redis
2)配置redis
[root@redis01 ~]# vim /etc/redis.conf
bind 172.16.1.81 127.0.0.1
3)启动redis
[root@redis01 ~]# systemctl start redis
4)配置logstash收集日志写入redis
[root@web01 ~]# vim /etc/logstash/conf.d/file_redis.conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "end"
codec => "json"
}
}
output {
redis {
host => "172.16.1.81"
port => "6379"
key => "nginx_log"
data_type => "list"
}
}
2.将redis数据取出写入ES
1)安装Java环境
[root@redis02 ~]# yum localinstall -y jdk-8u181-linux-x64.rpm
2)安装logstash
[root@redis02 ~]# yum localinstall -y logstash-6.6.0.rpm
3)配置logstash取出redis数据写入ES
[root@redis02 ~]# vim /etc/logstash/conf.d/redis_es.conf
input {
redis {
host => "172.16.1.81"
port => "6379"
data_type => "list"
key => "nginx_log"
}
}
output {
elasticsearch {
hosts => ["10.0.0.71:9200"]
index => "nginx_redis_es_%{+YYYY-MM-dd}"
}
}
4)启动
[root@redis02 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis_es.conf