zoukankan      html  css  js  c++  java
  • Filebeat6.31整合Kafka集群消息队列(三)

    wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz

    [root@es-node1 ~]# tar zxvf filebeat-6.3.2-linux-x86_64.tar.gz -C /usr/local/

    # mv /usr/local/filebeat-6.3.2-linux-x86_64/ /usr/local/filebeat

    # egrep -v "#|^$" filebeat.yml

    filebeat.prospectors         #用于定义数据原型,检测日志或是发现日志
    - input_type: log            #指定数据的输入类型,默认为log,另外还可以指定stdin
      paths:                      #自定需要监控的日志文件路径;可以是完整的日志路径也可以是模糊的匹配格式
       - /var/log/messages        #指定系统日志位置
      fields:                      #定义日志来源,可以添加自定义字段,其实就是定义Kafka消息队列的topic主题名称,如果kafka消息队列中没有该名称,会自动生成
        log_topic: test
      paths:                    #与上述一样定义需要监控的日志文件路径,不够这次是定义apache-web服务的日志
       - /etc/httpd/logs/*
      fields:                    #定义日志来源,生成kafka消息队列topic主题
        log_topic: webapache
    processors:            #这个地方需要注意,此配置是将日志输出格式过滤掉,一般情况下,一些无用的日志字段我们可以删除,只看关键性的信息
    - drop_fields:
       fields: ["beat", "input_type", "source", "offset",]
    name: ""         #设置filebeat收集日志中对应的主机名称,,如果设置为空,这使用该机器的主机名称,这里这是本地IP,便于区分多台主机的日志信息
    output.kafka:                  #多种输出类型,可支持想kafka,logstash,elasticsearch输出日志信,在这里是将日志信息输出到Kafka中,
      enabled: true                 启动该模块
      hosts: ["", "", ""]        #指定输出数据到kafka集群上,地址与端口号想对应
      version: "0.10"          
      topic: '%{[fields][log_topic]}'    #指定要发送数据到kafka集群的哪个topic,与上述的"fields: log_topic:"相对应,这是6.x的配置
      partition.round_robin:         #开启kafka的partition分区
        reachable_only: true   
      worker: 2
      required_acks: 1
      compression: gzip      #压缩格式
      max_message_bytes: 10000000    #压缩格式字节大小
    logging.level: debug        #日志类型为debug

    root@es-node1 bin]#nohup ./filebeat -e -c filebeat.yml &

    [root@es-node1 bin]# ./kafka-topics.sh --zookeeper,, --list

    【Kafka节点 】启动消费,本次消费是

    [root@es-node3 bin]# ./kafka-console-consumer.sh --bootstrap-server,, --topic test --from-beginning

        "@timestamp": "2018-08-16T04:24:19.871Z",
        "@metadata": {
            "beat": "filebeat",
            "type": "doc",
            "version": "6.3.2",
            "topic": "test"
        "message": "Aug 16 12:24:13 es-node1 dbus[623]: [system] Successfully activated service 'org.freedesktop.nm_dispatcher'",
        "fields": {
            "log_topic": "test"
        "beat": {
            "name": "",
            "hostname": "es-node1",
            "version": "6.3.2"
        "host": {
            "name": ""
        "source": "/var/log/messages",
        "offset": 290635


     1 {
     2     "@timestamp": "2018-08-16T04:19:34.153Z",
     3     "@metadata": {
     4         "beat": "filebeat",
     5         "type": "doc",
     6         "version": "6.3.2",
     7         "topic": "webapache"
     8     },
     9     "beat": {
    10         "name": "",
    11         "hostname": "es-node1",
    12         "version": "6.3.2"
    13     },
    14     "host": {
    15         "name": ""
    16     },
    17     "source": "/etc/httpd/logs/access_log",
    18     "offset": 17968,
    19     "message": " - - [16/Aug/2018:12:19:33 +0800] "GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1" 404 238 "" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"",
    20     "fields": {
    21         "log_topic": "webapache"
    22     }
    23 }


    - drop_fields:
       fields: ["beat", "input_type", "source", "offset",]

    过滤到之后 的apache日志输出如下所示

     1 {
     2     "@timestamp": "2018-08-16T05:10:02.261Z",
     3     "@metadata": {
     4         "beat": "filebeat",
     5         "type": "doc",
     6         "version": "6.3.2",
     7         "topic": "webapache"
     8     },
     9     "message": " - - [16/Aug/2018:13:09:53 +0800] "GET /noindex/css/fonts/Bold/OpenSans-Bold.ttf HTTP/1.1" 404 238 "" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"",
    10     "fields": {
    11         "log_topic": "webapache"
    12     },
    13     "host": {
    14         "name": ""
    15     }
    16 }




     error initializing publisher: missing required field accessing 'output.kafka.hosts'

    提示缺少访问“输出. Kafka . hosts”的必需字段,就这个,我纠结了一天,好在问题解决了,心累~

    [root@es-node1 filebeat]# egrep -v "#|^$" filebeat.yml
    - input_type: log
       - /var/log/messages
        log_topic: test
       - /etc/httpd/logs/*
        log_topic: webapache
    - drop_fields:
       fields: ["beat", "input_type", "source", "offset",]
    name: ""
      enabled: true
      hosts: ["", "", ""]
      version: "0.10"
      topic: '%{[fields][log_topic]}'
        reachable_only: true
      worker: 2
      required_acks: 1
      compression: gzip
      max_message_bytes: 10000000
    logging.level: debug
    ###################### Filebeat Configuration Example #########################
    # This file is an example configuration file highlighting only the most common
    # options. The filebeat.full.yml file from the same directory contains all the
    # supported options with more comments. You can use it as a reference.
    # You can find the full configuration reference here:
    # https://www.elastic.co/guide/en/beats/filebeat/index.html
    #=========================== Filebeat prospectors =============================
    # Each - is a prospector. Most options can be set at the prospector level, so
    # you can use different prospectors for various configurations.
    # Below are the prospector specific configurations.
    - input_type: log
      # Paths that should be crawled and fetched. Glob based paths.
       - /var/log/messages
        log_topic: test
       - /etc/httpd/logs/*
        log_topic: webapache
        #- c:programdataelasticsearchlogs*
      # Exclude lines. A list of regular expressions to match. It drops the lines that are
      # matching any regular expression from the list.
      #exclude_lines: ["^DBG"]
      # Include lines. A list of regular expressions to match. It exports the lines that are
      # matching any regular expression from the list.
      #include_lines: ["^ERR", "^WARN"]
      # Exclude files. A list of regular expressions to match. Filebeat drops the files that
      # are matching any regular expression from the list. By default, no files are dropped.
      #exclude_files: [".gz$"]
      # Optional additional fields. These field can be freely picked
      # to add additional information to the crawled log files for filtering
      #  level: debug
      #  review: 1
      ### Multiline options
      # Mutiline can be used for log messages spanning multiple lines. This is common
      # for Java Stack Traces or C-Line Continuation
      # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
      #multiline.pattern: ^[
      # Defines if the pattern set under pattern should be negated or not. Default is false.
      #multiline.negate: false
      # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
      # that was (not) matched before or after or as long as a pattern is not matched based on negate.
      # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
      #multiline.match: after
    - drop_fields:
       fields: ["beat", "input_type", "source", "offset",]
    #================================ General =====================================
    # The name of the shipper that publishes the network data. It can be used to group
    # all the transactions sent by a single shipper in the web interface.
    name: ""
    # The tags of the shipper are included in their own field with each
    # transaction published.
    #tags: ["service-X", "web-tier"]
    # Optional fields that you can specify to add additional information to the
    # output.
    #  env: staging
    #================================ Outputs =====================================
    # Configure what outputs to use when sending the data collected by the beat.
    # Multiple outputs may be used.
      enabled: true
      hosts: ["", "", ""]
      version: "0.10"
      topic: '%{[fields][log_topic]}'
        reachable_only: true
      worker: 2
      required_acks: 1
      compression: gzip
      max_message_bytes: 10000000
    #-------------------------- Elasticsearch output ------------------------------
      # Array of hosts to connect to.
      #hosts: []
      # Optional protocol and basic auth credentials.
      #protocol: "https"
      #username: "elastic"
      #password: "changeme"
    #----------------------------- Logstash output --------------------------------
      # The Logstash hosts
      #hosts: [""]
      # Optional SSL. By default is off.
      # List of root certificates for HTTPS server verifications
      #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
      # Certificate for SSL client authentication
      #ssl.certificate: "/etc/pki/client/cert.pem"
      # Client Certificate Key
      #ssl.key: "/etc/pki/client/cert.key"
    #================================ Logging =====================================
    # Sets log level. The default log level is info.
    # Available log levels are: critical, error, warning, info, debug
    logging.level: debug
    # At debug level, you can selectively enable logging only for some components.
    # To enable all selectors use ["*"]. Examples of other selectors are "beat",
    # "publish", "service".
    #logging.selectors: ["*"]
  • 相关阅读:
    HDU 5171
    HDU 3709
    HDU 3652
    HDU 3555
  • 原文地址:https://www.cnblogs.com/bixiaoyu/p/9487539.html
Copyright © 2011-2022 走看看