zoukankan      html  css  js  c++  java
  • hadoop 之 kafka 安装与 flume -> kafka 整合

    62-kafka 安装 : flume 整合 kafka

    一.kafka 安装

    1.下载

    http://kafka.apache.org/downloads.html

    2. 解压

    tar -zxvf kafka_2.10-0.8.1.1.tgz

    3.启动服务

    3.1 首先启动zookeeper服务

    bin/zookeeper-server-start.sh config/zookeeper.properties

    3.2启动Kafka

    bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

    3.3创建topic

    创建一个"test"的topic,一个分区一个副本
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    查看主题
    bin/kafka-topics.sh --list --zookeeper localhost:2181
    查看主题详情
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    删除主题
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

    二. flume -整合 --kafka

    1.启动 flume (配置文件)

    flume-ng agent --conf conf -f /bigdata/flume-1.6/conf/kafka.conf -name producer -Dlume.root.logger=DEBUG,console

    2.启动 kafka

    bin/zookeeper-server-start.sh config/zookeeper.properties

    bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

    3. 发送 消息

    echo 'wo l g q .' |nc -u hadoop1 8285

    4.--启动consumer查看是否接受到信息

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    三.flume - kafka 错误

    
    java.lang.ClassNotFoundException: org.apache.flume.plugins.KafkaSink
    
    jar -tf flume-ng-kafka-sink-1.6.0.jar | fgrep KafkaSink,你就能确定这里面有没有KafkaSink了
    
    producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    
      (一定要参考官网配置 agent)
     
    

      flume 官网
      

    四. kafka.conf

      

    producer agent 配置

    #memory channel called ch1 on agent1
    producer.channels.channel1.type = memory
    
    # Define an Avro source called avro-source1 on agent1 and tell it
    # to bind to 0.0.0.0:41414. Connect it to channel ch1.
    producer.sources.source1.channels = channel1
    producer.sources.source1.type = syslogudp
    producer.sources.source1.bind = 127.0.0.1
    producer.sources.source1.port = 8285
    
    # Define a logger sink that simply logs all events it receives
    # and connect it to the other end of the same channel.
    producer.sinks.sink1.channel = channel1
    
    producer.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
    producer.sinks.sink1.brokerList=127.0.0.1:9092
    producer.sinks.sink1.topic=test
    producer.sinks.sink1.batchSize=20
     
    
    
    # Finally, now that we've defined all of our components, tell
    # agent1 which ones we want to activate.
    producer.channels = channel1
    producer.sources = source1
    producer.sinks = sink1
    
    
  • 相关阅读:
    字典的两种访问方式
    python列表
    字符串基础操作
    C#解leetcode 11. Container With Most Water
    c# hasvalue属性
    C#解leetcode 238. Product of Array Except Self
    win7/win8 64位系统注册TeeChart8.ocx 控件---以及dllregisterserver调用失败问题解决办法
    C#解leetcode 219. Contains Duplicate II
    转载:C# HashSet 用法
    C#解leetcode:119. Pascal's Triangle II
  • 原文地址:https://www.cnblogs.com/chaoren399/p/5479563.html
Copyright © 2011-2022 走看看