zoukankan      html  css  js  c++  java
  • flume+kafka+spark streaming整合

    1.安装好flume
    2.安装好kafka
    3.安装好spark
    4.流程说明:
      日志文件->flume->kafka->spark streaming
      flume输入:文件
      flume输出:kafka的输入
      kafka输出:spark 输入
    5.整合步骤:
      (1).将插件jar拷贝到flume的lib目录下
        a. flumeng-kafka-plugin.jar
        b. metrics-annotation-2.2.0.jar

      (2).将配置文件producer.properties拷贝到flume的conf目录下
        配置文件内容如下:
          #agentsection
          producer.sources=s
          producer.channels=c
          producer.sinks=r

          #sourcesection
          producer.sources.s.type=exec
          producer.sources.s.command=tail -f -n+1 /opt/apache-flume-1.6.0/data/testFlumeKafka.txt
          producer.sources.s.channels=c

          # Eachsink's type must be defined
          producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
          producer.sinks.r.metadata.broker.list=namenode:19092,datanode1:19092,datanode2:19092
          producer.sinks.r.partition.key=0
          producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
          producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
          producer.sinks.r.request.required.acks=0
          producer.sinks.r.max.message.size=1000000
          producer.sinks.r.producer.type=sync
          producer.sinks.r.custom.encoding=UTF-8
          producer.sinks.r.custom.topic.name=test //需建好对应topic

          #Specifythe channel the sink should use
          producer.sinks.r.channel=c

          # Eachchannel's type is defined.
          producer.channels.c.type=memory
          producer.channels.c.capacity=1000
          producer.channels.c.transactionCapacity=100

        (3).启动flume-ng
          命令如下:flume-ng agent -c . -f /opt/apache-flume-1.6.0/conf/producer.conf -n producer

        (4).启动kafka-server
          命令如下:bin/kafka-server-start.sh config/server.properties

        (5).启动kafka-consumer(默认已经创建了test topic)
          命令如下:bin/kafka-console-consumer.sh --zookeeper namenode:12181,datanode1:12181,datanode2:12181 --topic test --from-beginning

        (6).启动spark
          命令如下:sbin/start-all.sh

        (7).运行spark streaming Demo
          命令如下:run-example org.apache.spark.examples.streaming.JavaKafkaWordCount namenode:12181 test-consumer-group test 3 >> test.log

        (8).在对应的日志文件中输入内容,则可以在test.log文件看到单词计数的结果

  • 相关阅读:
    Python自定义:粒子群优化算法
    deap实战_2017中国数学建模大赛_B题_第二题
    deap实战_2017中国数学建模大赛_B题_第二题
    webpack学习笔记(一) 核心概念
    webpack学习笔记
    CSS学习笔记(九) 居中方案
    CSS学习笔记(八) 弹性布局
    CSS学习笔记(七) 粘性布局
    CSS学习笔记(六) 定位
    CSS学习笔记
  • 原文地址:https://www.cnblogs.com/ciade/p/6221361.html
Copyright © 2011-2022 走看看