zoukankan      html  css  js  c++  java
  • kafka的配置,kafka和flume的配置

    参考文档:  https://files.cnblogs.com/files/han-guang-xue/kafka.zip

    其中实现如图的效果详细步骤如下:

    #han01.conf

    a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /logs a1.sinks.k1.type=avro a1.sinks.k1.hostname=han01 a1.sinks.k1.port=22222 a1.channels.c1.type=file a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
    #han02-1.conf
    a1.sources
    =r1 a1.channels=c1 a1.sinks=k1

    a1.sources.r1.type=exec
    a1.sources.r1.command = tail  -F  /logs/a.log
    a1.sinks.k1.type=avro
    a1.sinks.k1.hostname=han01
    a1.sinks.k1.port=22222
    
    a1.channels.c1.type=file
    a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint
    a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
    
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    #han02-2.conf

    b1.sources=r1 b1.channels=c1 b1.sinks=k1 b1.sources.r1.type=spooldir b1.sources.r1.spoolDir = /logs b1.sinks.k1.type=avro b1.sinks.k1.hostname=han01 b1.sinks.k1.port=22222 b1.channels.c1.type=file b1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint b1.channels.c1.dataDirs = /home/uplooking/data/flume/data b1.sources.r1.channels=c1 b1.sinks.k1.channel=c1
    # han03.conf
    
    a1.sources = r1
    
    a1.sinks = k1
    
    a1.channels = c1
    
     
    
    #对于source的配置描述 监听avro(表示flume的类型)
    
    a1.sources.r1.type = avro
    
    a1.sources.r1.bind = han01
    
    a1.sources.r1.port = 22222
    
     
    
    #sink到kafka里面
    
    a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
    
    #设置Kafka的Topic
    
    a1.sinks.k1.kafka.topic = haha1
    
    #设置Kafka的broker地址和端口号
    
    a1.sinks.k1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092
    
    #配置批量提交的数量
    
    a1.sinks.k1.kafka.flumeBatchSize = 20
    
    a1.sinks.k1.kafka.producer.acks = 1
    
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    a1.sinks.ki.kafka.producer.compression.type= snappy
    
     
    
    #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
    
    a1.channels.c1.type = file
    
    a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
    
    a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
    
     
    
    #通过channel c1将source r1和sink k1关联起来
    
    a1.sources.r1.channels = c1

    先开启 han03 机器上的flume; 在开启其他的

    开启flume命令:

    bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf  --name a1  -Dflume.root.logger=INFO,console

    然后开启消费者:

    ./bin/kafka-console-consumer.sh --bootstrap-server zhiyou01:9092, zhiyou02:9092, zhiyou03:9092 --from-beginning --topic test3

    创建话题命令:

    ./bin/kafka-topics.sh --create --zookeeper zhiyou01:2181,zhiyou02:2181,zhiyou03:2181 --replication-factor 2 --partitions 3 --topic test3

    
    
     
       
  • 相关阅读:
    LeetCode_Search Insert Position
    LeetCode_Two Sum
    LeetCode_Merge Two Sorted Lists
    LeetCode_Pascal's Triangle
    spring中方法级验证参数
    Curator Recipes(Cache&Counter)
    [译]ZOOKEEPER RECIPES-Leader Election
    [译]ZOOKEEPER RECIPES-TWO PHASED COMMIT
    [译]ZOOKEEPER RECIPES-Locks
    [译]ZOOKEEPER RECIPES-Queues
  • 原文地址:https://www.cnblogs.com/han-guang-xue/p/9960481.html
Copyright © 2011-2022 走看看