zoukankan      html  css  js  c++  java
  • logstash1

    0.拓扑图

    参考:https://www.cnblogs.com/JetpropelledSnake/p/10057545.html

    官网: http://kafka.apache.org/documentation.html#introduction  kafka原理

    https://www.cnblogs.com/jixp/articles/9435699.html    Kafka全解析  https://blog.csdn.net/vinfly_li/article/details/79397201

    1.logstash的配置

    [root@VM_0_4_centos config]# cat wxqyh.yml|egrep -v '^$|^#'
    input {
         file {
              type => "4personal20001"
              path => "/mnt/data/logs/personal_20001/log4j.log"
              start_position => "beginning"
              sincedb_path => "/dev/null"
              codec => multiline {
              pattern => "^%{TIMESTAMP_ISO8601}"
              negate => true
              what => "previous"
        }
    }
         file {
              type => "4personal20002"
              path => "/mnt/data/logs/personal_20002/log4j.log"
              start_position => "beginning"
              sincedb_path => "/dev/null"
              codec => multiline {
              pattern => "^%{TIMESTAMP_ISO8601}"
              negate => true
              what => "previous"
        }
    }
    }
    filter { 
     
    grok   { 
         match => { 
                   "message" => "^%{TIMESTAMP_ISO8601}[%{WORD:level} %{GREEDYDATA:ajpcon}| %{GREEDYDATA:data}" 
                  } 
                  match => { 
                   "message" => "^%{TIMESTAMP_ISO8601}[ %{WORD:level} %{GREEDYDATA:ajpcon}| %{GREEDYDATA:data}" 
                  } 
                  remove_field  => "message" 
           } 
           }
    output {    
        if [type] == "4personal20001" {
        kafka {
    max_request_size => 10485761 bootstrap_servers
    => "10.0.0.134:9092" topic_id => "topic4personal1" compression_type => "snappy" } } if [type] == "4personal20002" { kafka {
    max_request_size => 10485761 bootstrap_servers
    => "10.0.0.134:9092" topic_id => "topic4personal2" compression_type => "snappy" } } }

    2.kafka的配置

    [root@VM_0_134_centos config]# cat server.properties|egrep -v '^$|^#'
    broker.id=0
    listeners=PLAINTEXT://10.0.0.134:9092
    advertised.listeners=PLAINTEXT://10.0.0.134:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    message.max.bytes=20000000
    replica.fetch.max.bytes=20485760
    log.dirs=/mnt/data/monitor01/kafka_2.11-1.1.0/data
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    3.logstash2的配置

    [root@VM_0_134_centos config]# cat haode.yml|egrep -v '^$|^#'
    input {
      kafka {
         type => "topic12wxqyh8"
         codec => "plain"
         topics => ["topic12wxqyh8"]
         client_id => "es1"
         group_id => "es1"
         bootstrap_servers => "10.0.0.134:9092"
      }
     
      kafka {
         type => "topic12wxqyh9"
         codec => "plain"
         topics => ["topic12wxqyh9"]
         client_id => "es2"
         group_id => "es2"
         bootstrap_servers => "10.0.0.134:9092"
      }
      kafka {
         type => "topic24wxqyh6"
         codec => "plain"
         topics => ["topic24wxqyh6"]
         client_id => "es3"
         group_id => "es3"
         bootstrap_servers => "10.0.0.134:9092"
      }
     
      kafka {
         type => "topic24wxqyh7"
         codec => "plain"
         topics => ["topic24wxqyh7"]
         client_id => "es4"
         group_id => "es4"
         bootstrap_servers => "10.0.0.134:9092"
      }
     
      kafka {
         type => "topic4personal1"
         codec => "plain"
         topics => ["topic4personal1"]
         client_id => "es5"
         group_id => "es5"
         bootstrap_servers => "10.0.0.134:9092"
      }
      kafka {
         type => "topic4personal2"
         codec => "plain"
         topics => ["topic4personal2"]
         client_id => "es6"
         group_id => "es6"
         bootstrap_servers => "10.0.0.134:9092"
      }
     
    }
     
    output {
      if [type] == "topic12wxqyh8" {
         elasticsearch {
           index => "topic12wxqyh8"
           hosts => ["10.0.0.7:9200"]
         }
      }
       
      if [type] == "topic12wxqyh9" {
         elasticsearch {
            index => "topic12wxqyh9"
            hosts => ["10.0.0.7:9200"]
         }
      }
      if [type] == "topic24wxqyh6" {
         elasticsearch {
           index => "topic24wxqyh6"
           hosts => ["10.0.0.7:9200"]
         }
      }
      if [type] == "topic24wxqyh7" {
         elasticsearch {
           index => "topic24wxqyh7"
           hosts => ["10.0.0.7:9200"]
         }
      }
      if [type] == "topic4personal1" {
         elasticsearch {
           index => "topic4personal1"
           hosts => ["10.0.0.7:9200"]
         }
      }
      if [type] == "topic4personal2" {
         elasticsearch {
           index => "topic4personal2"
           hosts => ["10.0.0.7:9200"]
         }
      }
    }

    参考:https://www.cnblogs.com/swordfall/p/8860941.html#auto_id_4

    4.kafka的问题:

    4.1zookeeper--选主,提供访问入口

    4.2 Kafka的原理

    生产者将数据发送到Broker代理,Broker代理有多个话题topic,消费者从Broker获取数据。

    参考: https://zhuanlan.zhihu.com/p/97030680

    5.partion--相当于es的shard

  • 相关阅读:
    Spring基础——小的知识点
    Spring总结—— IOC 和 Bean 的总结
    UML 类图
    Spring基础—— 泛型依赖注入
    Spring重点—— IOC 容器中 Bean 的生命周期
    Spring基础—— SpEL
    《大话设计模式》学习笔记13:适配器模式
    《大话设计模式》学习笔记12:状态模式
    《大话设计模式》学习笔记11:抽象工厂模式
    《大话设计模式》学习笔记10:观察者模式
  • 原文地址:https://www.cnblogs.com/hixiaowei/p/11762352.html
Copyright © 2011-2022 走看看