logstash配置
官方文档:https://www.elastic.co/guide/en/logstash/current/index.html
https://www.cnblogs.com/wudequn/p/12983887.html
在config/logstash.yml中
http.host: "192.168.31.102"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: "logstash_system"
xpack.monitoring.elasticsearch.password: "mPpO8ceIo14Z7k0ZQIHJ"
xpack.monitoring.elasticsearch.hosts: ["http://localhost:9200"]
在bin文件下面新建logstash.conf
logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。
input{
输入插件
}
filter{
过滤器插件
}
outer{
输出插件
}
1、input
可以接收来自任何地方的源数据:
https://www.elastic.co/guide/en/logstash/current/input-plugins.html
事件源可以是从stdin屏幕输入读取,可以从file指定的文件,也可以从es,filebeat,kafka,redis等读取
file{ path => ['/var/log/nginx/access.log'] #要输入的文件路径 type => 'nginx_access_log' start_position => "beginning" } # path 可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log # type 通用选项. 用于激活过滤器 # start_position 选择logstash开始读取文件的位置,begining或者end。 还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网
syslog{ port =>"514" type => "syslog" } # port 指定监听端口(同时建立TCP/UDP的514端口的监听) #从syslogs读取需要实现配置rsyslog: # cat /etc/rsyslog.conf 加入一行 *.* @172.17.128.200:514 #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取 # service rsyslog restart #重启日志服务
beats { port => 5044 #要监听的端口 } # 还有host等选项 # 从beat读取需要先配置beat端,从beat输出到logstash。 # vim /etc/filebeat/filebeat.yml .......... output.logstash: hosts: ["localhost:5044"]
kafka{ bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092" topics => ["access_log"] group_id => "logstash-file" codec => "json" } kafka{ bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092" topics => ["weixin_log","user_log"] codec => "json" } # bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。 # topics 要订阅的主题列表,kafka topics # group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id # codec 通用选项,用于输入数据的编解码器。
2、filter
Filter数据中转层,主要进行格式处理,数据类型转换,数据过滤,字段添加,修改等,常用的过滤器:
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
3、output
Output是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用:
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
rabbitmq同步
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-rabbitmq.html
input { rabbitmq { host => "127.0.0.1" port=>5672 vhost=>"CRM" subscription_retry_interval_seconds => "5" codec => "plain" exchange => "log.msg.direct" queue => "CRM_LOG" durable => true key => "rk.log.direct" user => "admin" password => "admin" } } output { elasticsearch { hosts => ["localhost:9200"] #flush_size=> 10 #workers => 5 #document_id => "%{jobid}" #document_type => "BaseSysLog" index => "position-%{+YYYYMMdd}" user => "elastic" password => "caKBawShu0Pm7BsjSFbd" } stdout { codec => rubydebug } }
input说明:
host IP地址
port rabbitmq端口
queue 数据源队列名称
key 交换机key
exchange 交换机
durable 持续性
subscription_retry_interval_seconds 出现错误时5秒后重试
start_position 从队列开始处读写
vhost 虚拟机
use rabbitma用户名
password rabbitma用户密码
https://pro.leanote.com/p/5a98b1bc28389bfc454a6d10
logstash安装
官网:https://www.elastic.co/cn/downloads/logstash
bin目录
创建一个run.bat 内容如下
logstash -f logstash.conf 或者 logstash -f configlogstash.conf
md 运行 nssm install logstash
在弹出的界面设置 Path为run.bat,Details选项卡设置显示名,Dependencies选项卡设置依赖服务 elasticsearch-service-x64
最后点击install service 安装成功