zoukankan      html  css  js  c++  java
  • Spark Streaming笔记整理(三):DS的transformation与output操作

    DStream的各种transformation

    Transformation  Meaning
    map(func)           对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.
    flatMap(func)         与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
    filter(func)          过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
    repartition(numPartitions)  增加或减少DStream中的分区数,从而改变DStream的并行度
    union(otherStream)      将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
    count()             通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
    reduce(func)          对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
    countByValue()        对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
    reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
    join(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
    cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
    transform(func)         通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
    updateStateByKey(func)    根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的Dstream
    Window 函数: 

    可以看到很多都是在RDD中已经有的transformation算子操作,所以这里只关注transform、updateStateByKey和window函数

    transformation之transform操作

    DStream transform

    1、transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。

    2、DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。

    案例

    测试代码如下:

    package cn.xpleaf.bigdata.spark.scala.streaming.p1
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    
    /**
      * 使用Transformation之transform来完成在线黑名单过滤
      * 需求:
      *     将日志数据中来自于ip["27.19.74.143", "110.52.250.126"]实时过滤掉
      * 数据格式
      *     27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127
      */
    object _06SparkStreamingTransformOps {
        def main(args: Array[String]): Unit = {
            if (args == null || args.length < 2) {
                System.err.println(
                    """Parameter Errors! Usage: <hostname> <port>
                      |hostname: 监听的网络socket的主机名或ip地址
                      |port:    监听的网络socket的端口
                    """.stripMargin)
                System.exit(-1)
            }
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
            val conf = new SparkConf()
                .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
                .setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(2))
    
            val hostname = args(0).trim
            val port = args(1).trim.toInt
    
            //黑名单数据
            val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))
    //        val blacklist = List("27.19.74.143", "110.52.250.126")
            val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist)
    
            val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
    
            // 如果用到一个DStream和rdd进行操作,无法使用dstream直接操作,只能使用transform来进行操作
            val filteredDStream:DStream[String] = linesDStream.transform(rdd => {
                val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => {
                    (line.split("##")(0), line)
                }}
                /** A(M) B(N)两张表:
                  * across join
                  *     交叉连接,没有on条件的连接,会产生笛卡尔积(M*N条记录) 不能用
                  * inner join
                  *     等值连接,取A表和B表的交集,也就是获取在A和B中都有的数据,没有的剔除掉 不能用
                  * left outer join
                  *     外链接:最常用就是左外连接(将左表中所有的数据保留,右表中能够对应上的数据正常显示,在右表中对应不上,显示为null)
                  *         可以通过非空判断是左外连接达到inner join的结果
                  */
                val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD)
    
                joinedInfoRDD.filter{case (ip, (line, joined)) => {
                    joined == None
                }}//执行过滤操作
                    .map{case (ip, (line, joined)) => line}
            })
    
            filteredDStream.print()
    
            ssc.start()
            ssc.awaitTermination()
            ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身
        }
    }

    nc中产生数据:

    [uplooking@uplooking01 ~]$ nc -lk 4893
    27.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582
    110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##603
    8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

    输出结果如下:

    -------------------------------------------
    Time: 1526006084000 ms
    -------------------------------------------
    8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

    transformation之updateStateByKey操作

    概述

    1、Spark Streaming的updateStateByKey可以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加。

    2、 updateStateByKey 解释

    以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步:

    1) 定义状态:可以是任意数据类型

    2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大

    3、要思考的是如果数据量很大的时候,或者对性能的要求极为苛刻的情况下,可以考虑将数据放在Redis或者tachyon或者ignite上

    4、注意,updateStateByKey操作,要求必须开启Checkpoint机制。

    案例

    Scala版

    测试代码如下:

    package cn.xpleaf.bigdata.spark.scala.streaming.p1
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * 状态函数updateStateByKey
      *     更新key的状态(就是key对应的value)
      *
      * 通常的作用,计算某个key截止到当前位置的状态
      *     统计截止到目前为止的word对应count
      * 要想完成截止到目前为止的操作,必须将历史的数据和当前最新的数据累计起来,所以需要一个地方来存放历史数据
      * 这个地方就是checkpoint目录
      *
      */
    object _07SparkStreamingUpdateStateByKeyOps {
        def main(args: Array[String]): Unit = {
            if (args == null || args.length < 2) {
                System.err.println(
                    """Parameter Errors! Usage: <hostname> <port>
                      |hostname: 监听的网络socket的主机名或ip地址
                      |port:    监听的网络socket的端口
                    """.stripMargin)
                System.exit(-1)
            }
            val hostname = args(0).trim
            val port = args(1).trim.toInt
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
            val conf = new SparkConf()
                .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName)
                .setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(2))
    
            ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb")
    
            // 接收到的当前批次的数据
            val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
            // 这是记录下来的当前批次的数据
            val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    
            val usbDStream:DStream[(String, Int)]  = rbkDStream.updateStateByKey(updateFunc)
    
            usbDStream.print()
    
            ssc.start()
            ssc.awaitTermination()
            ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身
        }
    
        /**
          * @param seq 当前批次的key对应的数据
          * @param history 历史key对应的数据,可能有可能没有
          * @return
          */
        def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = {
            var sum = seq.sum
            if(history.isDefined) {
                sum += history.get
            }
            Option[Int](sum)
        }
    }

    nc产生数据:

    [uplooking@uplooking01 ~]$ nc -lk 4893
    hello hello
    hello you hello he hello me

    输出结果如下:

    -------------------------------------------
    Time: 1526009358000 ms
    -------------------------------------------
    (hello,2)
    
    18/05/11 11:29:18 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: 
    -------------------------------------------
    Time: 1526009360000 ms
    -------------------------------------------
    (hello,5)
    (me,1)
    (you,1)
    (he,1)
    
    18/05/11 11:29:20 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: 
    -------------------------------------------
    Time: 1526009362000 ms
    -------------------------------------------
    (hello,5)
    (me,1)
    (you,1)
    (he,1)

    Java版

    用法略有不同,主要是 状态更新函数的写法上有区别,如下:

    package cn.xpleaf.bigdata.spark.java.streaming.p1;
    
    import com.google.common.base.Optional;
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class _02SparkStreamingUpdateStateByKeyOps {
        public static void main(String[] args) {
            if(args == null || args.length < 2) {
                System.err.println("Parameter Errors! Usage: <hostname> <port>");
                System.exit(-1);
            }
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
            SparkConf conf = new SparkConf()
                    .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName())
                    .setMaster("local[2]");
    
            JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));
            jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb");
    
            String hostname = args[0].trim();
            int port = Integer.valueOf(args[1].trim());
            JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);//默认的持久化级别:MEMORY_AND_DISK_SER_2
    
            JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterable<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" "));
                }
            });
    
            JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
                return new Tuple2<String, Integer>(word, 1);
            });
    
            JavaPairDStream<String, Integer> rbkDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            // 做历史的累计操作
            JavaPairDStream<String, Integer> usbDStream = rbkDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
                @Override
                public Optional<Integer> call(List<Integer> current, Optional<Integer> history) throws Exception {
    
                    int sum = 0;
                    for (int i : current) {
                        sum += i;
                    }
    
                    if (history.isPresent()) {
                        sum += history.get();
                    }
                    return Optional.of(sum);
                }
            });
    
            usbDStream.print();
    
            jsc.start();//启动流式计算
            jsc.awaitTermination();//等待执行结束
            jsc.close();
        }
    }

    transformation之window操作

    DStream window 滑动窗口

    Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

    Spark Streaming笔记整理(三):DS的transformation与output操作

    1.红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

    2.这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

    所以基于窗口的操作,需要指定2个参数:

    window length - The duration of the window (3 in the figure)
    slide interval - The interval at which the window-based operation is performed (2 in the figure). 
    
    1.窗口大小,个人感觉是一段时间内数据的容器。
    2.滑动间隔,就是我们可以理解的cron表达式吧。
    举个例子吧:
    还是以最著名的wordcount举例,每隔10秒,统计一下过去30秒过来的数据。
    // Reduce last 30 seconds of data, every 10 seconds  
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

    DSstream window滑动容器功能

    window 对每个滑动窗口的数据执行自定义的计算
    countByWindow 对每个滑动窗口的数据执行count操作
    reduceByWindow 对每个滑动窗口的数据执行reduce操作
    reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
    countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作

    案例

    测试代码如下:

    package cn.xpleaf.bigdata.spark.scala.streaming.p1
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      *窗口函数window
      *   每隔多长时间(滑动频率slideDuration)统计过去多长时间(窗口长度windowDuration)中的数据
      * 需要注意的就是窗口长度和滑动频率
      * windowDuration = M*batchInterval,
        slideDuration = N*batchInterval
      */
    object _08SparkStreamingWindowOps {
        def main(args: Array[String]): Unit = {
            if (args == null || args.length < 2) {
                System.err.println(
                    """Parameter Errors! Usage: <hostname> <port>
                      |hostname: 监听的网络socket的主机名或ip地址
                      |port:    监听的网络socket的端口
                    """.stripMargin)
                System.exit(-1)
            }
            val hostname = args(0).trim
            val port = args(1).trim.toInt
            Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    
            val conf = new SparkConf()
                .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName)
                .setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(2))
    
            // 接收到的当前批次的数据
            val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
            val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1))
    
            // 每隔4s,统计过去6s中产生的数据
            val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))
    
            retDStream.print()
    
            ssc.start()
            ssc.awaitTermination()
            ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身
        }
    }

    nc产生数据:

    [uplooking@uplooking01 ~]$ nc -lk 4893
    hello you
    hello he
    hello me
    hello you
    hello he

    输出结果如下:

    -------------------------------------------
    Time: 1526016316000 ms
    -------------------------------------------
    (hello,4)
    (me,1)
    (you,2)
    (he,1)
    
    -------------------------------------------
    Time: 1526016320000 ms
    -------------------------------------------
    (hello,5)
    (me,1)
    (you,2)
    (he,2)
    
    -------------------------------------------
    Time: 1526016324000 ms
    -------------------------------------------

    DStream的output操作以及foreachRDD

    DStream output操作

    1、print 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。

    2、saveAsTextFile(prefix, [suffix]) 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]

    3、saveAsObjectFile 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。

    4、saveAsHadoopFile 同上,将数据保存到Hadoop文件中

    5、foreachRDD 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。

    DStream foreachRDD详解

    相关内容其实在Spark开发调优中已经有相关的说明。

    通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。

    误区一:在RDD的foreach操作外部,创建Connection

    这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。

    dstream.foreachRDD { rdd =>
      val connection = createNewConnection() 
      rdd.foreach { record => connection.send(record)
      }
    }

    误区二:在RDD的foreach操作内部,创建Connection

    这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。

    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }

    DStream foreachRDD合理使用

    合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createNewConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        connection.close()
      }
    }

    合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  
      }
    }

    foreachRDD 与foreachPartition实现实战

    需要注意的是:

    (1)、你最好使用forEachPartition函数来遍历RDD,并且在每台Work上面创建数据库的connection。

    (2)、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。

    (3)、在插入MySQL的时候最好使用批量插入。

    (4),确保你写入的数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能导致数据插入数据库失败。

    (5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

    这部分内容其实可以参考开发调优部分的案例,只是那里并没有foreachRDD,因为其并没有使用DStream,但是原理是一样的,因为最终都是针对RDD来进行操作的。

    原文链接:http://blog.51cto.com/xpleaf/2115343

  • 相关阅读:
    Access Update 不支持子查询 用查询解决
    vs2005中文乱码
    清理sql日志
    VS2005快捷键使用
    如何用C#改文件名
    C#中使用DirectSound录音
    VS2005打包 到没有.NETFramework2.0的目标机器上安装
    Access中iif,isnull的用法
    水晶报表切换字段视图不能用的问题。
    VS2005中TextBox的ReadOnly属性
  • 原文地址:https://www.cnblogs.com/zzmmyy/p/9390946.html
Copyright © 2011-2022 走看看