zoukankan      html  css  js  c++  java
  • 大数据新手之路四:联合使用Flume和Kafka

    Ubuntu16.04+Kafka1.0.0+Flume1.8.0

     

    1.目标

    ①使用Flume作为Kafka的Producer;

    ②使用Kafka作为Flume的Sink;

    其实以上两点是同一个事情在Flume和Kafka两个立场上的不同描述而已,其实就是同一个事情。

     

    2.启动zookeeper(这里使用kafka自带的zookeeper,也可以独立部署zookeeper使用)

    使用默认的zookeeper.properties配置文件

    zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties

     

    3.启动kafka

    使用默认的server.properties配置文件

    kafka-server-start.sh /usr/local/kafka/config/server.properties

     

    4.增加一个名字为flume的topic

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume

     

    5.创建一个consumer接受flume的消息(后面在这个进程中将接收到消息)

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flume

     

    6.在/usr/local/flume/conf中增加一个kafka_sink.conf文件

    #example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = flume
    a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

     

    7.启动flume

    flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/kafka_sink.conf --name a1 -Dflume.root.logger=INFO,console

     

    8.使用telnet连接到flume并发送数据

    $ telnet localhost 44444
    Trying 127.0.0.1...
    Connected to localhost.localdomain (127.0.0.1).
    Escape character is '^]'.
    Test flume kafka! <ENTER>
    OK

    9.在consumer的进程中可以看到如下信息

    Test flume kafka!

     

    以上。

     

  • 相关阅读:
    OGNL与值栈
    Struts2的数据封装
    Struts2页面配置和访问servlet API
    Struts2入门介绍(二)
    Struts2 入门介绍(一)
    Hibernate批量抓取
    Problem G: STL——整理唱片(list的使用)
    STL详细介绍(更新中~~~)
    Problem E: 数量的类模板
    CF: Long Number
  • 原文地址:https://www.cnblogs.com/chevin/p/8510996.html
Copyright © 2011-2022 走看看