zoukankan      html  css  js  c++  java
  • flume+kafka+spark streaming整合

    1.安装好flume
    2.安装好kafka
    3.安装好spark
    4.流程说明:
      日志文件->flume->kafka->spark streaming
      flume输入:文件
      flume输出:kafka的输入
      kafka输出:spark 输入
    5.整合步骤:
      (1).将插件jar拷贝到flume的lib目录下
        a. flumeng-kafka-plugin.jar
        b. metrics-annotation-2.2.0.jar

      (2).将配置文件producer.properties拷贝到flume的conf目录下
        配置文件内容如下:
          #agentsection
          producer.sources=s
          producer.channels=c
          producer.sinks=r

          #sourcesection
          producer.sources.s.type=exec
          producer.sources.s.command=tail -f -n+1 /opt/apache-flume-1.6.0/data/testFlumeKafka.txt
          producer.sources.s.channels=c

          # Eachsink's type must be defined
          producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
          producer.sinks.r.metadata.broker.list=namenode:19092,datanode1:19092,datanode2:19092
          producer.sinks.r.partition.key=0
          producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
          producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
          producer.sinks.r.request.required.acks=0
          producer.sinks.r.max.message.size=1000000
          producer.sinks.r.producer.type=sync
          producer.sinks.r.custom.encoding=UTF-8
          producer.sinks.r.custom.topic.name=test //需建好对应topic

          #Specifythe channel the sink should use
          producer.sinks.r.channel=c

          # Eachchannel's type is defined.
          producer.channels.c.type=memory
          producer.channels.c.capacity=1000
          producer.channels.c.transactionCapacity=100

        (3).启动flume-ng
          命令如下:flume-ng agent -c . -f /opt/apache-flume-1.6.0/conf/producer.conf -n producer

        (4).启动kafka-server
          命令如下:bin/kafka-server-start.sh config/server.properties

        (5).启动kafka-consumer(默认已经创建了test topic)
          命令如下:bin/kafka-console-consumer.sh --zookeeper namenode:12181,datanode1:12181,datanode2:12181 --topic test --from-beginning

        (6).启动spark
          命令如下:sbin/start-all.sh

        (7).运行spark streaming Demo
          命令如下:run-example org.apache.spark.examples.streaming.JavaKafkaWordCount namenode:12181 test-consumer-group test 3 >> test.log

        (8).在对应的日志文件中输入内容,则可以在test.log文件看到单词计数的结果

  • 相关阅读:
    清理git提交记录并不能达到真正硬盘“瘦身”
    virtualbox虚拟机异常暂停
    百兆带宽升千兆-番外篇
    流水文之百兆带宽升千兆
    OSS设置静态网站托管+CDN加速OSS域名
    每日一坑-exsi中win虚拟机调分辨率
    nginx筛选字段+excel统计
    树莓派使用Samba共享文件
    chrome无法使用独显解决
    解决jdk16安装后无jre目录的问题
  • 原文地址:https://www.cnblogs.com/ciade/p/6221361.html
Copyright © 2011-2022 走看看