zoukankan      html  css  js  c++  java
  • Flume推送数据到SparkStreaming案例实战和内幕源码解密

    本期内容:

    1. Flume on HDFS案例回顾

    2. Flume推送数据到Spark Streaming实战

    3. 原理绘图剖析

    1. Flume on HDFS案例回顾

    上节课要求大家自己安装配置Flume,并且测试数据的传输。我昨天是要求传送的HDFS上。

    文件配置:

    ~/.bashrc:

    export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin

    export FLUME_CONF_DIR=$FLUME_HOME/conf

    PATH中增加:${FLUME_HOME}/bin;

    拷贝conf/flume-conf.properties.template,更名为conf/flume-cong.properties,只写以下内容:

    agent1表示代理名称

    agent1.sources=source1

    agent1.sinks=sink1

    agent1.channels=channel1

    #配置source1

    agent1.sources.source1.type=spooldir

    agent1.sources.source1.spoolDir=/usr/local/flume/tmp/TestDir

    agent1.sources.source1.channels=channel1

    agent1.sources.source1.fileHeader = false

    agent1.sources.source1.interceptors = i1

    agent1.sources.source1.interceptors.i1.type = timestamp

    #配置sink1

    agent1.sinks.sink1.type=hdfs

    agent1.sinks.sink1.hdfs.path=hdfs://master:9000/library/flume

    agent1.sinks.sink1.hdfs.fileType=DataStream

    agent1.sinks.sink1.hdfs.writeFormat=TEXT

    agent1.sinks.sink1.hdfs.rollInterval=1

    agent1.sinks.sink1.channel=channel1

    agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d

    #agent1.sinks.sink1.type=avro

    #agent1.sinks.sink1.channel=channel1

    #agent1.sinks.sink1.hostname=Master

    #agent1.sinks.sink1.port=9999

    #配置channel1

    agent1.channels.channel1.type=file

    agent1.channels.channel1.checkpointDir=/usr/local/flume/tmp/checkpointDir

    agent1.channels.channel1.dataDirs=/usr/local/flume/tmp/dataDirs

    flume-env.sh配置:

    # export JAVA_HOME=/usr/lib/jvm/java-6-sun

    export JAVA_HOME=/usr/local/jdk/jdk1.8.0_60

    # Give Flume more memory and pre-allocate, enable remote monitoring via JMX

    # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

    建立文件夹 /usr/local/flume/tmp/TestDir。

    在hdfs上建立/library/flume文件夹。

    flume的bin文件夹下启动Flume:

    ./flume-ng agent -n agent1 -c conf -f /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console

    在/usr/local/flume/tmp/TestDir下,拷入测试用的文件,比如:NOTICE

    flume 控制台会有一些相关信息:

    16/04/22 11:03:49 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/flume/tmp/TestDir/NOTICE to /usr/local/flume/tmp/TestDir/NOTICE.COMPLETED

    16/04/22 11:03:51 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false

    16/04/22 11:03:51 INFO hdfs.BucketWriter: Creating hdfs://master:9000/library/flume/2016-04-22.1461294231806.tmp

    16/04/22 11:03:52 INFO hdfs.BucketWriter: Closing hdfs://master:9000/library/flume/2016-04-22.1461294231806.tmp

    16/04/22 11:03:52 INFO hdfs.BucketWriter: Renaming hdfs://master:9000/library/flume/2016-04-22.1461294231806.tmp to hdfs://master:9000/library/flume/2016-04-22.1461294231806

    可以发现本地的NOTICE文件更名为NOTICE.COMPLETED。

    用浏览器查询:http://localhost:50070/explorer.html#/library/flume,可看到Flume把NOTICE文件传送到 HDFS的/library/flume下,文件名为 2016-04-22.1461294231806。打开文件看内容可以验证。说明当Flume指定的源文件夹中有新文件时,Flume会自动将此文件,导入到Flume配置时指定的HDFS文件夹中。

    一般正常业务情况下,应该是把Flume的数据放到Kafka中,然后让不同的数据消费者去消费数据。如果要在Flume和Kafka两者间做选择的话,则要看业务中数据是否持续不断的产生。如果是这样,则应该选择Kafka。如果产生的数据时大时小,甚至有些时间没有数据,则没必要用Kafka,可以节省资源。

    2. Flume推送数据到Spark Streaming实战

    现在我们不把Flume的数据导入到HDFS中,而是把数据推送到Spark Streaming中。

    修改conf/flume-cong.properties文件,改导入到HDFS,变为推送到Spark Streaming。

    #配置sink1

    #agent1.sinks.sink1.type=hdfs

    #agent1.sinks.sink1.hdfs.path=hdfs://master:9000/library/flume

    #agent1.sinks.sink1.hdfs.fileType=DataStream

    #agent1.sinks.sink1.hdfs.writeFormat=TEXT

    #agent1.sinks.sink1.hdfs.rollInterval=1

    #agent1.sinks.sink1.channel=channel1

    #agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d

    agent1.sinks.sink1.type=avro

    agent1.sinks.sink1.channel=channel1

    agent1.sinks.sink1.hostname=Master

    agent1.sinks.sink1.port=9999

    编写Spark Streaming应用的Java程序:

    public class FlumePushData2SparkStreaming {

        public static void main(String[] args) {

            SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("FlumePushDate2SparkStreaming");

            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(30));

            JavaReceiverInputDStream lines = FlumeUtils.createStream(jsc,"Master", 9999);

            // 注意此处输入的event类型是SparkFlumeEvent类型。

            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {

                @Override

                public Iterable<String> call(SparkFlumeEvent event) throws Exception {

                    String line = new String(event.event().getBody().array());

                    return Arrays.asList(line.split(" "));

                }

            });

            JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

                @Override

                public Tuple2<String, Integer> call(String word) throws Exception {

                    return new Tuple2<String, Integer>(word, 1);

                }

            });

            JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

                @Override

                public Integer call(Integer v1, Integer v2) throws Exception {

                    return v1 + v2;

                }

            });

            wordsCount.print();

            jsc.start();

            jsc.awaitTermination();

            jsc.close();

        }

    }

    代码中用到了FlumeUtils。我们剖析一下代码中用到的FlumeUtils。

    以上代码中FlumeUtil的方法createStream:

    实际是调用以下createStream方法:

    可以看到流处理默认的存储方式是,既在内存,又在磁盘中,同时做序列化,而且用两台机器。

    继续看调用的createStream方法:

    实际上返回的是FlumeInputDStream对象,而且事件是Flume所定义的事件SparkFlumeEvent。所以要注意,在以上Java代码做flatMap时,FlatMapFunction的输入类型必须是SparkFlumeEvent类型。

    再看看FlumeInputDStream的代码:

    可以看到getReceiver返回的是用于接收数据的FlumeReceiver对象。再看FlumeReceiver:

    可以发现Flume使用了Netty。如果搞分布式编程,要重视使用Netty。

     

    把以上的应用Spark Streaming的Java程序运行起来。确认Flume也在运行。

    我们找若干文件拷入TestDir文件夹,比如:flume下的若干文本文件。那么在Java运行的控制台,可以发现以下信息:

    说明Flume推送数据到了Spark Streaming,Spark Streaming对数据及时进行了处理。

    3. 原理绘图剖析

    总结:

    备注:87课

    更多私密内容,请关注微信公众号:DT_Spark

  • 相关阅读:
    计算机网络学习目录
    手把手教你玩微信小程序跳一跳
    (三)python函数式编程
    跟托福说分手
    (二)python高级特性
    BitCoin工作原理
    反向传播的工作原理(深度学习第三章)
    1.22计划
    梯度下降——神经网络如何学习?(深度学习第二章)
    什么是神经网络 (深度学习第一章)?
  • 原文地址:https://www.cnblogs.com/sparkbigdata/p/5433364.html
Copyright © 2011-2022 走看看