zoukankan      html  css  js  c++  java
  • Spark Streaming笔记整理(三):DS的transformation与output操作

    DStream的各种transformation

    Transformation  Meaning
    map(func)           对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.
    flatMap(func)         与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
    filter(func)          过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
    repartition(numPartitions)  增加或减少DStream中的分区数,从而改变DStream的并行度
    union(otherStream)      将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
    count()             通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
    reduce(func)          对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
    countByValue()        对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
    reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
    join(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
    cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
    transform(func)         通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
    updateStateByKey(func)    根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的Dstream
    Window 函数: 

    可以看到很多都是在RDD中已经有的transformation算子操作,所以这里只关注transform、updateStateByKey和window函数

    transformation之transform操作

    DStream transform

    1、transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。

    2、DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。

    案例

    测试代码如下:

    package cn.xpleaf.bigdata.spark.scala.streaming.p1
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    
    /**
      * 使用Transformation之transform来完成在线黑名单过滤
      * 需求:
      *     将日志数据中来自于ip["27.19.74.143", "110.52.250.126"]实时过滤掉
      * 数据格式
      *     27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127
      */
    object _06SparkStreamingTransformOps {
        def main(args: Array[String]): Unit = {
            if (args == null || args.length < 2) {
                System.err.println(
                    """Parameter Errors! Usage: <hostname> <port>
                      |hostname: 监听的网络socket的主机名或ip地址
                      |port:    监听的网络socket的端口
                    """.stripMargin)
                System.exit(-1)
            }
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
            val conf = new SparkConf()
                .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
                .setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(2))
    
            val hostname = args(0).trim
            val port = args(1).trim.toInt
    
            //黑名单数据
            val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))
    //        val blacklist = List("27.19.74.143", "110.52.250.126")
            val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist)
    
            val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
    
            // 如果用到一个DStream和rdd进行操作,无法使用dstream直接操作,只能使用transform来进行操作
            val filteredDStream:DStream[String] = linesDStream.transform(rdd => {
                val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => {
                    (line.split("##")(0), line)
                }}
                /** A(M) B(N)两张表:
                  * across join
                  *     交叉连接,没有on条件的连接,会产生笛卡尔积(M*N条记录) 不能用
                  * inner join
                  *     等值连接,取A表和B表的交集,也就是获取在A和B中都有的数据,没有的剔除掉 不能用
                  * left outer join
                  *     外链接:最常用就是左外连接(将左表中所有的数据保留,右表中能够对应上的数据正常显示,在右表中对应不上,显示为null)
                  *         可以通过非空判断是左外连接达到inner join的结果
                  */
                val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD)
    
                joinedInfoRDD.filter{case (ip, (line, joined)) => {
                    joined == None
                }}//执行过滤操作
                    .map{case (ip, (line, joined)) => line}
            })
    
            filteredDStream.print()
    
            ssc.start()
            ssc.awaitTermination()
            ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身
        }
    }

    nc中产生数据:

    [uplooking@uplooking01 ~]$ nc -lk 4893
    27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582
    110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603
    8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

    输出结果如下:

    -------------------------------------------
    Time: 1526006084000 ms
    -------------------------------------------
    8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

    transformation之updateStateByKey操作

    概述

    1、Spark Streaming的updateStateByKey可以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加。

    2、 updateStateByKey 解释

    以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步:

    1) 定义状态:可以是任意数据类型

    2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大

    3、要思考的是如果数据量很大的时候,或者对性能的要求极为苛刻的情况下,可以考虑将数据放在Redis或者tachyon或者ignite上

    4、注意,updateStateByKey操作,要求必须开启Checkpoint机制。

    案例

    Scala版

    测试代码如下:

    package cn.xpleaf.bigdata.spark.scala.streaming.p1
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 状态函数updateStateByKey
      *     更新key的状态(就是key对应的value)
      *
      * 通常的作用,计算某个key截止到当前位置的状态
      *     统计截止到目前为止的word对应count
      * 要想完成截止到目前为止的操作,必须将历史的数据和当前最新的数据累计起来,所以需要一个地方来存放历史数据
      * 这个地方就是checkpoint目录
      *
      */
    object _07SparkStreamingUpdateStateByKeyOps {
        def main(args: Array[String]): Unit = {
            if (args == null || args.length < 2) {
                System.err.println(
                    """Parameter Errors! Usage: <hostname> <port>
                      |hostname: 监听的网络socket的主机名或ip地址
                      |port:    监听的网络socket的端口
                    """.stripMargin)
                System.exit(-1)
            }
            val hostname = args(0).trim
            val port = args(1).trim.toInt
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
            val conf = new SparkConf()
                .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName)
                .setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(2))
    
            ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb")
    
            // 接收到的当前批次的数据
            val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
            // 这是记录下来的当前批次的数据
            val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    
            val usbDStream:DStream[(String, Int)]  = rbkDStream.updateStateByKey(updateFunc)
    
            usbDStream.print()
    
            ssc.start()
            ssc.awaitTermination()
            ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身
        }
    
        /**
          * @param seq 当前批次的key对应的数据
          * @param history 历史key对应的数据,可能有可能没有
          * @return
          */
        def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = {
            var sum = seq.sum
            if(history.isDefined) {
                sum += history.get
            }
            Option[Int](sum)
        }
    }

    nc产生数据:

    [uplooking@uplooking01 ~]$ nc -lk 4893
    hello hello
    hello you hello he hello me

    输出结果如下:

    -------------------------------------------
    Time: 1526009358000 ms
    -------------------------------------------
    (hello,2)
    
    18/05/11 11:29:18 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: 
    -------------------------------------------
    Time: 1526009360000 ms
    -------------------------------------------
    (hello,5)
    (me,1)
    (you,1)
    (he,1)
    
    18/05/11 11:29:20 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: 
    -------------------------------------------
    Time: 1526009362000 ms
    -------------------------------------------
    (hello,5)
    (me,1)
    (you,1)
    (he,1)

    Java版

    用法略有不同,主要是 状态更新函数的写法上有区别,如下:

    package cn.xpleaf.bigdata.spark.java.streaming.p1;
    
    import com.google.common.base.Optional;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class _02SparkStreamingUpdateStateByKeyOps {
        public static void main(String[] args) {
            if(args == null || args.length < 2) {
                System.err.println("Parameter Errors! Usage: <hostname> <port>");
                System.exit(-1);
            }
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
            SparkConf conf = new SparkConf()
                    .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName())
                    .setMaster("local[2]");
    
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));
            jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb");
    
            String hostname = args[0].trim();
            int port = Integer.valueOf(args[1].trim());
            JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);//默认的持久化级别:MEMORY_AND_DISK_SER_2
    
            JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            });
    
            JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
                return new Tuple2<String, Integer>(word, 1);
            });
    
            JavaPairDStream<String, Integer> rbkDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            // 做历史的累计操作
            JavaPairDStream<String, Integer> usbDStream = rbkDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                @Override
                public Optional<Integer> call(List<Integer> current, Optional<Integer> history) throws Exception {
    
                    int sum = 0;
                    for (int i : current) {
                        sum += i;
                    }
    
                    if (history.isPresent()) {
                        sum += history.get();
                    }
                    return Optional.of(sum);
                }
            });
    
            usbDStream.print();
    
            jsc.start();//启动流式计算
            jsc.awaitTermination();//等待执行结束
            jsc.close();
        }
    }

    transformation之window操作

    DStream window 滑动窗口

    Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

    Spark Streaming笔记整理(三):DS的transformation与output操作

    1.红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

    2.这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

    所以基于窗口的操作,需要指定2个参数:

    window length - The duration of the window (3 in the figure)
    slide interval - The interval at which the window-based operation is performed (2 in the figure). 
    
    1.窗口大小,个人感觉是一段时间内数据的容器。
    2.滑动间隔,就是我们可以理解的cron表达式吧。
    举个例子吧:
    还是以最著名的wordcount举例,每隔10秒,统计一下过去30秒过来的数据。
    // Reduce last 30 seconds of data, every 10 seconds  
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

    DSstream window滑动容器功能

    window 对每个滑动窗口的数据执行自定义的计算
    countByWindow 对每个滑动窗口的数据执行count操作
    reduceByWindow 对每个滑动窗口的数据执行reduce操作
    reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
    countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作

    案例

    测试代码如下:

    package cn.xpleaf.bigdata.spark.scala.streaming.p1
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      *窗口函数window
      *   每隔多长时间(滑动频率slideDuration)统计过去多长时间(窗口长度windowDuration)中的数据
      * 需要注意的就是窗口长度和滑动频率
      * windowDuration = M*batchInterval,
        slideDuration = N*batchInterval
      */
    object _08SparkStreamingWindowOps {
        def main(args: Array[String]): Unit = {
            if (args == null || args.length < 2) {
                System.err.println(
                    """Parameter Errors! Usage: <hostname> <port>
                      |hostname: 监听的网络socket的主机名或ip地址
                      |port:    监听的网络socket的端口
                    """.stripMargin)
                System.exit(-1)
            }
            val hostname = args(0).trim
            val port = args(1).trim.toInt
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
            val conf = new SparkConf()
                .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName)
                .setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(2))
    
            // 接收到的当前批次的数据
            val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
            val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1))
    
            // 每隔4s,统计过去6s中产生的数据
            val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))
    
            retDStream.print()
    
            ssc.start()
            ssc.awaitTermination()
            ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身
        }
    }

    nc产生数据:

    [uplooking@uplooking01 ~]$ nc -lk 4893
    hello you
    hello he
    hello me
    hello you
    hello he

    输出结果如下:

    -------------------------------------------
    Time: 1526016316000 ms
    -------------------------------------------
    (hello,4)
    (me,1)
    (you,2)
    (he,1)
    
    -------------------------------------------
    Time: 1526016320000 ms
    -------------------------------------------
    (hello,5)
    (me,1)
    (you,2)
    (he,2)
    
    -------------------------------------------
    Time: 1526016324000 ms
    -------------------------------------------

    DStream的output操作以及foreachRDD

    DStream output操作

    1、print 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。

    2、saveAsTextFile(prefix, [suffix]) 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]

    3、saveAsObjectFile 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。

    4、saveAsHadoopFile 同上,将数据保存到Hadoop文件中

    5、foreachRDD 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。

    DStream foreachRDD详解

    相关内容其实在Spark开发调优中已经有相关的说明。

    通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。

    误区一:在RDD的foreach操作外部,创建Connection

    这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。

    dstream.foreachRDD { rdd =>
      val connection = createNewConnection() 
      rdd.foreach { record => connection.send(record)
      }
    }

    误区二:在RDD的foreach操作内部,创建Connection

    这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。

    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }

    DStream foreachRDD合理使用

    合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createNewConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        connection.close()
      }
    }

    合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  
      }
    }

    foreachRDD 与foreachPartition实现实战

    需要注意的是:

    (1)、你最好使用forEachPartition函数来遍历RDD,并且在每台Work上面创建数据库的connection。

    (2)、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。

    (3)、在插入MySQL的时候最好使用批量插入。

    (4),确保你写入的数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能导致数据插入数据库失败。

    (5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

    这部分内容其实可以参考开发调优部分的案例,只是那里并没有foreachRDD,因为其并没有使用DStream,但是原理是一样的,因为最终都是针对RDD来进行操作的。

    原文链接:http://blog.51cto.com/xpleaf/2115343

  • 相关阅读:
    Codeforces 992C(数学)
    Codeforces 990C (思维)
    Codeforces 989C (构造)
    POJ 1511 Invitation Cards(链式前向星,dij,反向建边)
    Codeforces 1335E2 Three Blocks Palindrome (hard version)(暴力)
    POJ 3273 Monthly Expense(二分)
    POJ 2566 Bound Found(尺取前缀和)
    POJ 1321 棋盘问题(dfs)
    HDU 1506 Largest Rectangle in a Histogram(单调栈)
    POJ 2823 Sliding Window(单调队列)
  • 原文地址:https://www.cnblogs.com/zzmmyy/p/9390946.html
Copyright © 2011-2022 走看看