zoukankan      html  css  js  c++  java
  • ELK+kafka日志处理

    此次使用kafka代替redis,elk集群搭建过程请参考:https://www.cnblogs.com/dmjx/p/9120474.html

    kafka名词解释:

    • 1、话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名;
    • 2、生产者(Producer):是能够发布消息到话题的任何对象;
    • 3、服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群;
    • 4、消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息;

    系统环境信息:

    CentOS Linux release 7.3.1611 (Core) 

    基础环境准备:

    关闭防火墙:systemctl stop firewalld

    SeLinux设为disabled: setenforce 0

    jdk版本:jdk_1.8

    kafka版本:kafka_2.12-2.1.0

    本次搭建使用了三个节点,kafka下载地址:http://mirror.rise.ph/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz

    环境搭建

    zookeeoer配置部署:

    1. 修改配置文件:zookeeper.properties

    dataDir=/apps/product/zookeeper/data
    clientPort=2181
    maxClientCnxns=10
    tickTime=2000
    initLimit=10
    syncLimit=5
    
    server.1=10.20.88.199:2888:3888
    server.2=10.20.88.200:2888:3888
    server.3=10.20.88.201:2888:3888

    2. 创建zookeeper所需要的目录

    mkdir /apps/product/zookeeper/data

    3. 创建myid文件用于标识主机

    echo 1 > /apps/product/zookeeper/data/myid

    以上就时zookeeper的配置,将上边的配置复制到其他两个节点就可以了,注意修改myid文件。

    4. 启动zookeeper并查看状态

    ./bin/zookeeper-server-start.sh config/zookeeper.properties >> /apps/product/kafka_2.12-2.1.0/logs/zookeeper.log &
    
    netstat -nlpt | grep -E "2181|2888|3888"
    tcp        0      0 10.20.88.200:3888       0.0.0.0:*               LISTEN      17711/java          
    tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN      17711/java          
    tcp        0      0 10.100.67.62:2888       0.0.0.0:*               LISTEN      17711/java     

    kafka的配置部署:

    1. 修改配置文件:server.properties(以下为本次的配置)

    broker.id=1
    prot=9092
    host.name=10.20.88.199
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/apps/product/kafka_2.12-2.1.0/kafka_data
    num.partitions=3
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.cleanup.policy=delete
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=10.20.88.199:2181,10.20.88.200:2181,10.20.88.201:2181
    zookeeper.connection.timeout.ms=6000
    log.cleaner.backoff.ms=15000
    group.initial.rebalance.delay.ms=0

    2. 创建kafka所需要的数据目录

    /apps/product/kafka_2.12-2.1.0/kafka_data

    3. kafka启动

    ./bin/kafka-server-start.sh config/server.properties >> /dev/null &

    kafka的操作:

    1. 创建一个topic

    kafka-topics.sh --create --zookeeper 10.20.88.200:2181 --replication-factor 3 --partitions 4 --topic test_wx
    --partitions 4          # 4个分区
    --replication-factor 3            # 3个副本
    # 将分区被复制到三个broker上

    2. 查看有哪些topic

    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

    3. 查看主题的详情

    ./bin/kafka-topics.sh --describe --zookeeper 10.20.88.199:2181 --topic test_wx
    Topic:test_wx   PartitionCount:4        ReplicationFactor:3     Configs:
            Topic: test_wx  Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
            Topic: test_wx  Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
            Topic: test_wx  Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
            Topic: test_wx  Partition: 3    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3

    4. 输入输出测试

    # 输入
    ./bin/kafka-console-producer.sh --topic test_wx --broker-list 10.20.88.199:9092
    # 输出
    ./bin/kafka-console-consumer.sh --bootstrap-server 10.20.88.199:9092 --topic test_wx --from-beginning

    5.  修改操作

    # 增加分区数
    ./bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --partitions 10
    # 增加配置
    ./bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --config flush.messages=1
    # 删除配置
    ./bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --delete-config flush.messages

    6. 删除topic

    # 删除topic
    目前删除操作默认情况下只是打上一个删除的标记,再重新启动kafka后才删除。如果需要立即删除,则需要在server.properties中配置:delete.topic.enable=true
    
    ./bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name

    filebeat中输出至kafka配置

    filebeat:
      prospectors:
        -
          paths:
            - /var/log/named/namequery.log
    
    output.kafka:
        hosts: ["10.20.88.199:9092","10.20.88.200:9092","10.20.88.201:9092"]
        topic: 'dns_ct'
        partition.round_robin:
            reachable_only: false
        required_acks: 1
        compression: gzip
        max_message_bytes: 1000000

    logstash取kafka中的数据

    input {
      kafka {
        bootstrap_servers => ["10.20.88.199:9092,10.20.88.200:9092,10.20.88.201:9092"]
        topics => ["dns_ct"]
        codec => "plain"
        auto_offset_reset => "latest"
        consumer_threads => 1
        decorate_events => true
        type => "dns-ct"
      }
    }
    
    output {
      file { path => "/apps/logs/output-dns" }
      stdout { codec => rubydebug }
    }
  • 相关阅读:
    5月18日InterlliJ IDea快捷键
    5月17日-集合构架Collection学习
    十一java作业1
    十一java作业2
    第一周,java模拟ATMdos界面程序源代码及感想
    8.27-9.2第八周
    8.20-8.26第七周
    8.13-8.19第六周
    8.6-8.12第五周
    7.30-8.5第四周
  • 原文地址:https://www.cnblogs.com/dmjx/p/10248313.html
Copyright © 2011-2022 走看看