zoukankan      html  css  js  c++  java
  • ELK学习笔记之基于kakfa (confluent)搭建ELK

    0x00 概述

    测试搭建一个使用kafka作为消息队列的ELK环境,数据采集转换实现结构如下:

    F5 HSL–>logstash(流处理)–> kafka –>elasticsearch

    测试中的elk版本为6.3, confluent版本是4.1.1

    希望实现的效果是 HSL发送的日志胫骨logstash进行流处理后输出为json,该json类容原样直接保存到kafka中,kafka不再做其它方面的格式处理。

    0x01 测试

    192.168.214.138: 安装 logstash,confluent环境

    192.168.214.137: 安装ELK套件(停用logstash,只启动es和kibana)

    confluent安装调试备忘:

        1. 像安装elk环境一样,安装java环境先
        2. 首先在不考虑kafka的情形下,实现F5 HSL—Logstash–ES的正常运行,并实现简单的正常kibana的展现。后面改用kafka时候直接将这里output修改为kafka plugin配置即可。
          此时logstash的相关配置
    input {
      udp {
        port => 8514
        type => 'f5-dns'
      }
    }
    
    filter {
     if [type] == 'f5-dns' {
        grok {
              match => { "message" => "%{HOSTNAME:F5hostname} %{IP:clientip} %{POSINT:clientport} %{IP:svrip} %{NUMBER:qid} %{HOSTNAME:qname} %{GREEDYDA
    TA:qtype} %{GREEDYDATA:status} %{GREEDYDATA:origin}" }
        }
        geoip {
             source => "clientip"
             target => "geoip"
        }
        }
    }
    
    
    
    output {
       #stdout{ codec => rubydebug }
       #elasticsearch {
       # hosts => ["192.168.214.137:9200"]
       # index => "f5-dns-%{+YYYY.MM.dd}"
       #template_name => "f5-dns"
      #}
      kafka {
        codec => json
        bootstrap_servers => "localhost:9092"
        topic_id => "f5-dns-kafka"
      }
    }

    发一些测试流量,确认es正常收到数据,查看cerebro上显示的状态。(截图是调试完毕后截图)

    # cd /usr/share/cerebro/cerebro-0.8.1/
    # /bin/cerebro -Dhttp.port=9110 -Dhttp.address=0.0.0.0

    安装confluent,由于是测试环境,直接confluent官方网站下载压缩包,解压后使用。位置在/root/confluent-4.1.1/下

    由于是测试环境,直接用confluent的命令行来启动所有相关服务,发现kakfa启动失败

    [root@kafka-logstash bin]# ./confluent start
    Using CONFLUENT_CURRENT: /tmp/confluent.dA0KYIWj
    Starting zookeeper
    zookeeper is [UP]
    Starting kafka
    /Kafka failed to start
    kafka is [DOWN]
    Cannot start Schema Registry, Kafka Server is not running. Check your deployment

    检查发现由于虚机内存给太少了,导致java无法分配足够内存给kafka

    [root@kafka-logstash bin]# ./kafka-server-start ../etc/kafka/server.properties
    OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)

    扩大虚拟机内存,并将logstash的jvm配置中设置的内存调小

    kafka server配置文件

    [root@kafka-logstash kafka]# pwd
    /root/confluent-4.1.1/etc/kafka
    [root@kafka-logstash kafka]# egrep -v "^#|^$" server.properties
    broker.id=0
    listeners=PLAINTEXT://localhost:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    confluent.support.metrics.enable=true
    confluent.support.customer.id=anonymous
    group.initial.rebalance.delay.ms=0

    connect 配置文件,此配置中,将原来的avro converter替换成了json,同时关闭了key vlaue的schema识别。因为我们输入的内容是直接的json类容,没有相关schema,这里只是希望kafka原样解析logstash输出的json内容到es

    [root@kafka-logstash kafka]# pwd
    /root/confluent-4.1.1/etc/kafka
    [root@kafka-logstash kafka]# egrep -v "^#|^$" connect-standalone.properties
    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    plugin.path=share/java

    如果不做上述修改,connect总会在将日志sink到ES时提示无法反序列化,magic byte错误等。如果使用confluent status命令查看,会发现connect会从up变为down

    [root@kafka-logstash confluent-4.1.1]# ./bin/confluent status
    ksql-server is [DOWN]
    connect is [DOWN]
    kafka-rest is [UP]
    schema-registry is [UP]
    kafka is [UP]
    zookeeper is [UP]

    schema-registry 相关配置

    [root@kafka-logstash schema-registry]# pwd
    /root/confluent-4.1.1/etc/schema-registry
    [root@kafka-logstash schema-registry]# egrep -v "^#|^$"
    connect-avro-distributed.properties  connect-avro-standalone.properties   log4j.properties                     schema-registry.properties
    [root@kafka-logstash schema-registry]# egrep -v "^#|^$" connect-avro-standalone.properties
    bootstrap.servers=localhost:9092
    key.converter.schema.registry.url=http://localhost:8081
    value.converter.schema.registry.url=http://localhost:8081
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    offset.storage.file.filename=/tmp/connect.offsets
    plugin.path=share/java
    [root@kafka-logstash schema-registry]# egrep -v "^#|^$" schema-registry.properties
    listeners=http://0.0.0.0:8081
    kafkastore.connection.url=localhost:2181
    kafkastore.topic=_schemas
    debug=false

    es-connector的配置文件

    [root@kafka-logstash kafka-connect-elasticsearch]# pwd
    /root/confluent-4.1.1/etc/kafka-connect-elasticsearch
    [root@kafka-logstash kafka-connect-elasticsearch]# egrep -v "^#|^$" quickstart-elasticsearch.properties
    name=f5-dns
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=f5-dns-kafka
    key.ignore=true
    value.ignore=true
    schema.ignore=true
    connection.url=http://192.168.214.137:9200
    type.name=doc
    transforms=MyRouter
    transforms.MyRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
    transforms.MyRouter.topic.format=${topic}-${timestamp}
    transforms.MyRouter.timestamp.format=yyyyMMdd

    上述配置中topics配置是希望传输到ES的topic,通过设置transform的timestamp router来实现将topic按天动态映射为ES中的index,这样可以让ES每天产生一个index。注意需要配置schema.ignore=true,否则kafka无法将受收到的数据发送到ES上,connect的 connect.stdout 日志会显示:

    [root@kafka-logstash connect]# pwd
    /tmp/confluent.dA0KYIWj/connect
    
    Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
    	at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)
    	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:221)
    	at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)
    	at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
    	at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
    	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)

    配置修正完毕后,向logstash发送数据,发现日志已经可以正常发送到了ES上,且格式和没有kafka时是一致的。
    没有kafka时:

    {
    	"_index": "f5-dns-2018.06.26",
    	"_type": "doc",
    	"_id": "KrddO2QBXB-i0ay0g5G9",
    	"_version": 1,
    	"_score": 1,
    	"_source": {
    		"message": "localhost.lan 202.202.102.100 53777 172.16.199.136 42487 www.test.com A NOERROR GTM_REWRITE ",
    		"F5hostname": "localhost.lan",
    		"qid": "42487",
    		"clientip": "202.202.102.100",
    		"geoip": {
    			"region_name": "Chongqing",
    			"location": {
    				"lon": 106.5528,
    				"lat": 29.5628
    			},
    			"country_code2": "CN",
    			"timezone": "Asia/Shanghai",
    			"country_name": "China",
    			"region_code": "50",
    			"continent_code": "AS",
    			"city_name": "Chongqing",
    			"country_code3": "CN",
    			"ip": "202.202.102.100",
    			"latitude": 29.5628,
    			"longitude": 106.5528
    		},
    		"status": "NOERROR",
    		"qname": "www.test.com",
    		"clientport": "53777",
    		"@version": "1",
    		"@timestamp": "2018-06-26T09:12:21.585Z",
    		"host": "192.168.214.1",
    		"type": "f5-dns",
    		"qtype": "A",
    		"origin": "GTM_REWRITE ",
    		"svrip": "172.16.199.136"
    	}
    }

    有kafka时:

    {
    	"_index": "f5-dns-kafka-20180628",
    	"_type": "doc",
    	"_id": "f5-dns-kafka-20180628+0+23",
    	"_version": 1,
    	"_score": 1,
    	"_source": {
    		"F5hostname": "localhost.lan",
    		"geoip": {
    			"city_name": "Chongqing",
    			"timezone": "Asia/Shanghai",
    			"ip": "202.202.100.100",
    			"latitude": 29.5628,
    			"country_name": "China",
    			"country_code2": "CN",
    			"continent_code": "AS",
    			"country_code3": "CN",
    			"region_name": "Chongqing",
    			"location": {
    				"lon": 106.5528,
    				"lat": 29.5628
    			},
    			"region_code": "50",
    			"longitude": 106.5528
    		},
    		"qtype": "A",
    		"origin": "DNSX ",
    		"type": "f5-dns",
    		"message": "localhost.lan 202.202.100.100 53777 172.16.199.136 42487 www.myf5.net A NOERROR DNSX ",
    		"qid": "42487",
    		"clientport": "53777",
    		"@timestamp": "2018-06-28T09:05:20.594Z",
    		"clientip": "202.202.100.100",
    		"qname": "www.myf5.net",
    		"host": "192.168.214.1",
    		"@version": "1",
    		"svrip": "172.16.199.136",
    		"status": "NOERROR"
    	}
    }

    相关REST API输出

    http://192.168.214.138:8083/connectors/elasticsearch-sink/tasks
    
    [
      {
        "id": {
          "connector": "elasticsearch-sink",
          "task": 0
        },
        "config": {
          "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
          "type.name": "doc",
          "value.ignore": "true",
          "tasks.max": "1",
          "topics": "f5-dns-kafka",
          "transforms.MyRouter.topic.format": "${topic}-${timestamp}",
          "transforms": "MyRouter",
          "key.ignore": "true",
          "schema.ignore": "true",
          "transforms.MyRouter.timestamp.format": "yyyyMMdd",
          "task.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkTask",
          "name": "elasticsearch-sink",
          "connection.url": "http://192.168.214.137:9200",
          "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter"
        }
      }
    ]
    
    http://192.168.214.138:8083/connectors/elasticsearch-sink/
    {
      "name": "elasticsearch-sink",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "doc",
        "value.ignore": "true",
        "tasks.max": "1",
        "topics": "f5-dns-kafka",
        "transforms.MyRouter.topic.format": "${topic}-${timestamp}",
        "transforms": "MyRouter",
        "key.ignore": "true",
        "schema.ignore": "true",
        "transforms.MyRouter.timestamp.format": "yyyyMMdd",
        "name": "elasticsearch-sink",
        "connection.url": "http://192.168.214.137:9200",
        "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter"
      },
      "tasks": [
        {
          "connector": "elasticsearch-sink",
          "task": 0
        }
      ],
      "type": "sink"
    }
    
    http://192.168.214.138:8083/connectors/elasticsearch-sink/status
    {
      "name": "elasticsearch-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "172.16.150.179:8083"
      },
      "tasks": [
        {
          "state": "RUNNING",
          "id": 0,
          "worker_id": "172.16.150.179:8083"
        }
      ],
      "type": "sink"
    }
    http://192.168.214.138:8082/brokers
    {
      "brokers": [
        0
      ]
    }
    
    http://192.168.214.138:8082/topics
    [
      "__confluent.support.metrics",
      "_confluent-ksql-default__command_topic",
      "_schemas",
      "connect-configs",
      "connect-offsets",
      "connect-statuses",
      "f5-dns-2018.06",
      "f5-dns-2018.06.27",
      "f5-dns-kafka",
      "test-elasticsearch-sink"
    ]
    
    
    
    http://192.168.214.138:8082/topics/f5-dns-kafka
    {
      "name": "f5-dns-kafka",
      "configs": {
        "file.delete.delay.ms": "60000",
        "segment.ms": "604800000",
        "min.compaction.lag.ms": "0",
        "retention.bytes": "-1",
        "segment.index.bytes": "10485760",
        "cleanup.policy": "delete",
        "follower.replication.throttled.replicas": "",
        "message.timestamp.difference.max.ms": "9223372036854775807",
        "segment.jitter.ms": "0",
        "preallocate": "false",
        "segment.bytes": "1073741824",
        "message.timestamp.type": "CreateTime",
        "message.format.version": "1.1-IV0",
        "max.message.bytes": "1000012",
        "unclean.leader.election.enable": "false",
        "retention.ms": "604800000",
        "flush.ms": "9223372036854775807",
        "delete.retention.ms": "86400000",
        "leader.replication.throttled.replicas": "",
        "min.insync.replicas": "1",
        "flush.messages": "9223372036854775807",
        "compression.type": "producer",
        "min.cleanable.dirty.ratio": "0.5",
        "index.interval.bytes": "4096"
      },
      "partitions": [
        {
          "partition": 0,
          "leader": 0,
          "replicas": [
            {
              "broker": 0,
              "leader": true,
              "in_sync": true
            }
          ]
        }
      ]
    }

    测试中kafka的配置基本都为确实配置,没有考虑任何的内存优化,kafka使用磁盘的大小考虑等

    测试参考:

    https://docs.confluent.io/current/installation/installing_cp.html

    https://docs.confluent.io/current/connect/connect-elasticsearch/docs/elasticsearch_connector.html

    https://docs.confluent.io/current/connect/connect-elasticsearch/docs/configuration_options.html

    存储机制参考 https://blog.csdn.net/opensure/article/details/46048589

    kafka配置参数参考 https://blog.csdn.net/lizhitao/article/details/25667831

    更多kafka原理 https://blog.csdn.net/ychenfeng/article/details/74980531

    confluent CLI:

    confluent: A command line interface to manage Confluent services
    
    Usage: confluent <command> [<subcommand>] [<parameters>]
    
    These are the available commands:
    
        acl         Specify acl for a service.
        config      Configure a connector.
        current     Get the path of the data and logs of the services managed by the current confluent run.
        destroy     Delete the data and logs of the current confluent run.
        list        List available services.
        load        Load a connector.
        log         Read or tail the log of a service.
        start       Start all services or a specific service along with its dependencies
        status      Get the status of all services or the status of a specific service along with its dependencies.
        stop        Stop all services or a specific service along with the services depending on it.
        top         Track resource usage of a service.
        unload      Unload a connector.
    
    'confluent help' lists available commands. See 'confluent help <command>' to read about a
    specific command.

    confluent platform 服务端口表

     

    参考

  • 相关阅读:
    路由器基础配置之ospf基础配置
    路由器基础配置之广播多路访问链路上的ospf
    路由器基础设置之ospf
    linux命令之文件系统权限操作常用命令
    路由器基础配置之路由重分布
    路由器配置 之 DHCP+DHCP中继服务配置
    路由器配置 之 PAP与CHAP认证
    基于链路的OSPF MD5口令认证
    压缩和归档操作(16个命令)
    基于链路的OSPF简单口令认证
  • 原文地址:https://www.cnblogs.com/JetpropelledSnake/p/10564492.html
Copyright © 2011-2022 走看看