zoukankan      html  css  js  c++  java
  • ELK简单安装测试

    1 介绍组件

    Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)。

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

    ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch 中的 Index 是一组具有相似特征的文档集合,类似于关系数据库模型中的数据库实例,Index 中可以指定 Type 区分不同的文档,类似于数据库实例中的关系表,Document 是存储的基本单位,都是 JSON 格式,类似于关系表中行级对象。我们处理后的 JSON 文档格式的日志都要在 Elasticsearch 中做索引,相应的 Logstash 有 Elasticsearch output 插件,对于用户是透明的。

    Hadoop 生态圈为大规模数据集的处理提供多种分析功能,但实时搜索一直是 Hadoop 的软肋。如今,Elasticsearch for Apache Hadoop(ES-Hadoop)弥补了这一缺陷,为用户整合了 Hadoop 的大数据分析能力以及 Elasticsearch 的实时搜索能力。

    Logstash 是一种功能强大的信息采集工具,类似于 Hadoop 生态圈里的 Flume。通常在其配置文件规定 Logstash 如何处理各种类型的事件流,一般包含 input、filter、output 三个部分。Logstash 为各个部分提供相应的插件,因而有 input、filter、output 三类插件完成各种处理和转换;另外 codec 类的插件可以放在 input 和 output 部分通过简单编码来简化处理过程。

    Kibana是ElasticSearch的用户界面。

    在实际应用场景下,为了满足大数据实时检索的场景,利用Filebeat去监控日志文件,将Kafka作为Filebeat的输出端,Kafka实时接收到Filebeat后以Logstash作为输出端输出,到Logstash的数据也许还不是我们想要的格式化或者特定业务的数据,这时可以通过Logstash的一些过了插件对数据进行过滤最后达到想要的数据格式以ElasticSearch作为输出端输出,数据到ElasticSearch就可以进行丰富的分布式检索了。

    2 下载安装包

    下载 elasticsearch、logstash、kibana、filebeat 的压缩包,并将四个压缩包上传到 /data/elk 目录下


    官方文档:
    Filebeat:
    https://www.elastic.co/cn/products/beats/filebeat
    Logstash:
    https://www.elastic.co/cn/products/logstash
    Kibana:
    https://www.elastic.co/cn/products/kibana
    Elasticsearch:
    https://www.elastic.co/cn/products/elasticsearch
    elasticsearch中文社区:
    https://elasticsearch.cn/

    3 安装ELK

    3.1 安装jdk1.8

    3.2 安装ElasticSearch

    3.2.1 安装配置es

    说明:ElasticSearch的运行不能用root执行,自己用useradd命令新建一个用户如下所示:
    添加普通用户elk并设置密码
    useradd elk
    passwd elk 然后根据提示输入密码即可

    修改文件所有者
    chown -R elk:elk /data/elk

    解压
    tar -zxvf elasticsearch-6.7.0.tar.gz

    修改 es 的配置文件
    cd /data/elk/elasticsearch-6.7.0/config
    vim elasticsearch.yml
    修改方法参考如下:

    cluster.name: my-application
    node.name: node-1
    node.attr.rack: r1
    path.data: /data/elk/elasticsearch-6.7.0/data
    path.logs: /data/elk/elasticsearch-6.7.0/logs
    network.host: localhost
    http.port: 9200

    3.2.2 启动

    su elk
    ./bin/elasticsearch -d

    启动时间有点慢,耐心等待10-20s或者更长,查看9200,9300端口是否开启

    netstat -tnlp|grep 9[23]00

    tcp6       0      0 localhost:9200       :::*                    LISTEN      30155/java          
    tcp6       0      0 localhost:9300       :::*                    LISTEN      30155/java

    访问地址:http://localhost:9200


    停止服务:

    ps -ef |grep elasticsearch 
    kill PID

    3.3 安装filebeat

    3.3.1 安装配置

    filebeat:部署在具体的业务机器上,通过定时监控的方式获取增量的日志,并转发到logstash、elasticsearch、kafka等等。

    解压
    tar –zxvf filebeat-6.7.0-linux-x86_64.tar.gz
    cd filebeat-6.7.0-linux-x86_64
    修改kibana 的配置文件
    vim filebeat.yml

    ===================Filebeat prospectors ===========

    filebeat.prospectors:

    #
     Each - is a prospector. Most options can be set at the prospector level, so
    # you can use different prospectors for various configurations.
    # Below are the prospector specific configurations.

    - input_type: log

      #
     Paths that should be crawled and fetched. Glob based paths.
      paths:
        - /data/elk/logfile
    #- c:programdataelasticsearchlogs*

    #
    输出到logstash
    #-------------------- Logstash output --------------------------------
    output.logstash:
      # The Logstash hosts
      hosts: ["localhost:5044"]

    3.3.2 启动

    ./filebeat -e -c filebeat.yml -d "publish"

    3.4 安装logstash

    3.4.1 安装配置

    解压
    tar -zxvf logstash-6.7.0.tar.gz
    cd logstash-6.7.0
    配置
    在logstash文件夹的下bin目录创建配置文件logstash.conf ,内容如下:
    input:对应输入的配置,其中path是监控的文**件路劲, codec编码个格式
    output:elasticsearch : { hosts => "localhost:9200" }
    对应输出到elasticsearch,hosts是elasticsearch安装地址

    input {
        beats {
            host => "localhost"
            port => 5044 
            codec => json
        }
    }

    output {

        stdout {
            codec => rubydebug
        }

        elasticsearch {
            hosts => "localhost:9200"
            index="test_index"
        }
    }

    3.4.2 启动

    测试你的配置文件 是否正确( 解析配置文件并报告任何错误。)
    ./logstash -f logstash.conf --config.test_and_exit

    Sending Logstash logs to /data/elk/logstash-6.7.0/logs which is now configured via log4j2.properties
    [2019-03-28T17:55:34,114][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/data/elk/logstash-6.7.0/data/queue"}
    [2019-03-28T17:55:34,169][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/data/elk/logstash-6.7.0/data/dead_letter_queue"}
    [2019-03-28T17:55:34,856][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    Configuration OK
    [2019-03-28T17:55:43,327][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

    启动命令(启用自动配置重新加载,这样就不必每次修改配置文件时都停止并重新启动Logstash )
    ./logstash -f logstash.conf --config.reload.automatic

    停止服务
    ps aux | grep logstash
    kill -9 pid

    3.5 安装kibana

    3.5.1 安装配置

    解压
    tar –zxvf kibana-6.7.0-linux-x86_64.tar.gz
    cd kibana-6.7.0-linux-x86_64/config
    配置
    vim kibana.yml

    server.port: 5601
    server.host: "localhost"

    #
    elasticsearch.username: "elastic"
    #elasticsearch.password: "changeme"

    elasticsearch.hosts: ["http://localhost:9200"]
    kibana.index: ".kibana"

    3.5.2 启动

    ./bin/kibana

    访问地址:
    http://localhost:5601

    netstat -anltp|grep 5601查找对应端口
    kill -9 pid

    4 测试

    4.1 监测指定文件,Kibana展示

    4.1.1 Logstash监控文件

    我们通过echo往这文件/data/elk/test.log追加内容,来测试整个日志收集系统是否可行
    vim test2.conf

    input {
      file {
        type =>"syslog"
         path => ["/data/elk/test.log" ]
      }
      syslog {
        type =>"syslog"
        port =>"5544"
      }
    }
    output {
      stdout { codec=> rubydebug }
      elasticsearch {hosts => "http:// localhost:9200"}
    }

    启动
    ./logstash -f test2.conf --config.reload.automatic

    向监控文件中写入数据

    echo "{ "firstName": "1", "lastName":"McLaughlin", "email": "aaaa" }" >> test.log 

    Logstash打印接收到的数据:

    {
        "message" => "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
        "@timestamp" => 2019-03-29T02:39:35.229Z,
          "@version" => "1",
              "host" => "node223",
              "path" => "/data/elk/test.log"
    }

    4.1.2 es 查看接收数据

    查看es所有index
    curl -X GET 'http://localhost:9200/_cat/indices?v'

    health status index                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    green  open   .kibana_task_manager ZYN7zZl3QLuB1d-EE_ysHA   1   0          2            0     12.5kb         12.5kb
    yellow open   logstash-2019.03.29  rfkkxW7rQDKnl40_Ekaf7g   5   1          2            0     10.8kb         10.8kb
    yellow open   test-index           hFBQF-jZSkqajPRYzegfwg   5   1          0            0      1.2kb          1.2kb
    green  open   .kibana_1            sEUUnVMxQ0amHbQGcOedOQ   1   0          4            1     19.8kb         19.8kb

    查看单个index数据
    curl 'localhost:9200/logstash-2019.03.29/_search?pretty=true'

    {
      "took" : 4,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 2,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "logstash-2019.03.29",
            "_type" : "doc",
            "_id" : "W85Rx2kBAxlZ_OvPIlZf",
            "_score" : 1.0,
            "_source" : {
              "message" : "{ firstName: 1, lastName:McLaughlin, email: aaaa }",
              "@timestamp" : "2019-03-29T02:39:35.229Z",
              "@version" : "1",
              "host" : "node223",
              "path" : "/data/elk/test.log"
            }
          }
        ]
      }
    }

    4.1.3 用ES连接到Kibana

    在你开始用Kibana之前,你需要告诉Kibana你想探索哪个Elasticsearch索引。第一次访问Kibana是,系统会提示你定义一个索引模式以匹配一个或多个索引的名字。

    1、访问Kibana UI。例如,localhost:56011 或者 http://YOURDOMAIN.com:5601
    2、指定一个索引模式来匹配一个或多个你的Elasticsearch索引。当你指定了你的索引模式以后,任何匹配到的索引都将被展示出来。
      (画外音:*匹配0个或多个字符; 指定索引默认是为了匹配索引,确切的说是匹配索引名字)
    3、点击“Next Step”以选择你想要用来执行基于时间比较的包含timestamp字段的索引。如果你的索引没有基于时间的数据,那么选择“I don’t want to use the Time Filter”选项。
    4、点击“Create index pattern”按钮来添加索引模式。第一个索引模式自动配置为默认的索引默认,以后当你有多个索引模式的时候,你就可以选择将哪一个设为默认。(提示:Management > Index Patterns)


    现在,Kibana已经连接到你的Elasticsearch数据。Kibana展示了一个只读的字段列表,这些字段是匹配到的这个索引配置的字段。

    4.1.4 Kibana展示

    http://localhost:5601

    4.2 监控Spring Boot服务日志

    要求日志格式为json格式

    4.2.1 json日志格式配置

    在Spring Boot工程resources目录下配置logback.xml文件,旨在将log4j日志按json格式输出到指定目录,logback.xml文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="false">
       <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 
       <property name="LOG_HOME" value="E:/demo/home" />
       -->

       <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
       <!--<property name="LOG_PATTERN" value="[  %d{yyyy-MM-dd HH:mm:ss.SSS}] [UUID:%X{requestId}] [%level]|[${HOSTNAME}] [%thread]|[%logger{36}] | %msg|%n " />-->
       <property name="TIME_PATTERN" value="yyyy-MM-dd'T'HH:mm:ss.SSS+08:00" />
       <property name="LOG_PATTERN" value='{"event":"log","timestamp":"%d{${TIME_PATTERN}}","level":"%level","serviceName":"${springAppName:-}","pid": "${PID:-}","host":"${HOSTNAME}","class": "%logger{40}","thread":"%thread","message":"%msg"}%n ' />

       <!-- Console 输出设置 -->
       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
             <pattern>${LOG_PATTERN}</pattern>
             <charset>utf8</charset>
          </encoder>
       </appender>

       <!-- 按照每天生成日志文件 -->
       <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
          <file>logs/newsinfo-dataservice.log</file>
          <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
             <FileNamePattern>logs/newsinfo-dataservice.%d{yyyy-MM-dd}.log</FileNamePattern>
          </rollingPolicy>
          <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
             <pattern>${LOG_PATTERN}</pattern>
          </encoder>
       </appender>

       <!--log4jdbc -->
       <logger name="com.sohu" level="debug" additivity="false">
            <appender-ref ref="STDOUT" />
            <appender-ref ref="FILE" />    
        </logger>

       <!-- 日志输出级别 -->
       <root level="info">
          <appender-ref ref="STDOUT" />
          <appender-ref ref="FILE" />
       </root>
    </configuration>

    输出日志示例如下:

    {"event":"log","timestamp":"2019-04-09T19:47:07.908Z","level":"INFO","serviceName":"springAppName_IS_UNDEFINED","pid": "7196","host":"node1","class": "o.s.b.web.servlet.FilterRegistrationBean","thread":"localhost-startStop-1","message":"Mapping filter: 'MyFilter' to urls: [/getUser, /hello]"}

    4.2.2 DomeOS启动Spring Boot服务

    启动服务的同时,配置日志收集到kafka,然后调用ELK读取kafka日志存入ES,进行日志的全文检索和实时告警分析。


    创建新版本时,如上配置下日志收集模块,日志就自动通过Flume接入Kafka了。

    路径:/code/logs/newsinfo-dataservice.log
    自动收集 :自动将日志收集到 Kafka
    Kafka Topic:newsinfo_appdata_service_log
  • 相关阅读:
    关于数据库中浮点运算、保留小数和时间计算
    几个常用的操作
    数字转换为字符串
    Dll控件出错的处理办法
    小巧的服务程序源码(转)
    DELPHI中MDI子窗口的关闭和打开
    用Delphi创建服务程序
    Delphi如何获取QQ2010聊天窗口句柄?
    拖动Form上的图片,Form一起动
    仿药易通输入单位信息后如果没有则自动加入功能
  • 原文地址:https://www.cnblogs.com/xiaodf/p/10679336.html
Copyright © 2011-2022 走看看