zoukankan      html  css  js  c++  java
  • 数据采集flume kafka

    需求:采集8类日志数据,并且进入es展示:

    方案制定:目前数据采集通用flume+kafka模式,所以采用原有模式,一类服务进入一个topic,然后通过logstash进行数据清洗,最后进入es进行展示。

    flume采用tadir 读取数据源,memory 进行缓存,kafka进行sink

    a1.sources = s1 s2 s3 s4 s5 s6 s7 s8 
    a1.channels = c1 c2 c3 c4 c5 c6 c7 c8 
    a1.sinks = k1 k2 k3 k4 k5 k6 k7 k8 
    
    a1.sources.s1.type = TAILDIR
    a1.sources.s1.filegroups = f1 f2
    a1.sources.s1.filegroups.f1 = /home/es/.*.log
    a1.sources.s1.channels = c1
    
    a1.sources.s2.type = TAILDIR
    a1.sources.s2.filegroups = f1 f2
    a1.sources.s2.filegroups.f1 = /home/adm/.*.log
    a1.sources.s2.channels = c2
    
    a1.sources.s3.type = TAILDIR
    a1.sources.s3.filegroups = f1 f2
    a1.sources.s3.filegroups.f1 = /home/bas/.*.log
    a1.sources.s3.channels = c3
    
    a1.sources.s4.type = TAILDIR
    a1.sources.s4.filegroups = f1 f2
    a1.sources.s4.filegroups.f1 = /home/cha/.*.log
    a1.sources.s4.channels = c2
    
    a1.sources.s5.type = TAILDIR
    a1.sources.s5.filegroups = f1 f2
    a1.sources.s5.filegroups.f1 = /home/anog/.*.log
    a1.sources.s5.channels = c5
    
    a1.sources.s6.type = TAILDIR
    a1.sources.s6.filegroups = f1 f2
    a1.sources.s6.filegroups.f1 = /home/dip/es_okeano/3.27.20.38/config_log/.*.log
    a1.sources.s6.channels = c6
    
    a1.sources.s7.type = TAILDIR
    a1.sources.s7.filegroups = f1 f2
    a1.sources.s7.filegroups.f1 = /home/oau/.*.log
    a1.sources.s7.channels = c7
    
    a1.sources.s8.type = TAILDIR
    a1.sources.s8.filegroups = f1 f2
    a1.sources.s8.filegroups.f1 = /home/z/.*.log
    a1.sources.s8.channels = c8
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 10000
    a1.channels.c2.transactionCapacity = 1000
    
    a1.channels.c3.type = memory
    a1.channels.c3.capacity = 10000
    a1.channels.c3.transactionCapacity = 1000
    
    a1.channels.c4.type = memory
    a1.channels.c4.capacity = 10000
    a1.channels.c4.transactionCapacity = 1000
    
    a1.channels.c5.type = memory
    a1.channels.c5.capacity = 10000
    a1.channels.c5.transactionCapacity = 1000
    
    a1.channels.c6.type = memory
    a1.channels.c6.capacity = 10000
    a1.channels.c6.transactionCapacity = 1000
    
    a1.channels.c7.type = memory
    a1.channels.c7.capacity = 10000
    a1.channels.c7.transactionCapacity = 1000
    
    a1.channels.c8.type = memory
    a1.channels.c8.capacity = 10000
    a1.channels.c8.transactionCapacity = 1000
    
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = ws_activity
    a1.sinks.k1.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k1.kafka.flumeBatchSize = 5
    a1.sinks.k1.kafka.producer.acks = 1
    
    a1.sinks.k2.channel = c2
    a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k2.kafka.topic = ws_admin
    a1.sinks.k2.kafka.bootstrap.servers = 172.56.10.23:9092
    a1.sinks.k2.kafka.flumeBatchSize = 5
    a1.sinks.k2.kafka.producer.acks = 1
    
    a1.sinks.k3.channel = c3
    a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k3.kafka.topic = ws_authorization_oauth
    a1.sinks.k3.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k3.kafka.flumeBatchSize = 5
    a1.sinks.k3.kafka.producer.acks = 1
    
    a1.sinks.k4.channel = c4
    a1.sinks.k4.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k4.kafka.topic = ws_base
    a1.sinks.k4.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k4.kafka.flumeBatchSize = 5
    a1.sinks.k4.kafka.producer.acks = 1
    
    a1.sinks.k5.channel = c5
    a1.sinks.k5.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k5.kafka.topic = ws_channel
    a1.sinks.k5.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k5.kafka.flumeBatchSize = 5
    a1.sinks.k5.kafka.producer.acks = 1
    
    a1.sinks.k6.channel = c6
    a1.sinks.k6.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k6.kafka.topic = ws_config
    a1.sinks.k6.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k6.kafka.flumeBatchSize = 5
    a1.sinks.k6.kafka.producer.acks = 1
    
    a1.sinks.k7.channel = c7
    a1.sinks.k7.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k7.kafka.topic = ws_material
    a1.sinks.k7.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k7.kafka.flumeBatchSize = 5
    a1.sinks.k7.kafka.producer.acks = 1
    
    a1.sinks.k8.channel = c8
    a1.sinks.k8.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k8.kafka.topic = ws_zuul
    a1.sinks.k8.kafka.bootstrap.servers =172.56.10.23:9092
    a1.sinks.k8.kafka.flumeBatchSize = 5
    a1.sinks.k8.kafka.producer.acks = 1
  • 相关阅读:
    sqlite
    c++primer
    c++ std find_last_of
    c语言
    boost serialization
    ssh autologin
    c/c++文件相关
    AndroidTreeView等例子
    and
    解决Gradle 依赖下载慢以及android开发释放c盘空间及android虚拟机访问网络--以及访问本机
  • 原文地址:https://www.cnblogs.com/students/p/13196380.html
Copyright © 2011-2022 走看看