zoukankan      html  css  js  c++  java
  • Spark Streaming官方文档学习--上

    官方文档地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html

    Spark Streaming是spark api的扩展
    能实现可扩展,高吞吐,可容错,的流式处理
    从外接数据源接受数据流,处理数据流使用的是复杂的高度抽象的算法函数map reduce join window等
    输出的数据可以存储到文件系统和数据库甚至是直接展示在命令行
    也可以应用ml 和graph processing在这些数据流上

    spark streaming本质还是spark只是实现了所谓的微批量
     spark streaming中连续数据流用DStream表示,DStream可以从输入数据创建,也可以从其他的DStream转化来
    本质上DStream是一组RDD组成的序列

    一个迅速上手的例子:
    1. # coding: utf-8
    2. # In[ ]:
    3. from pyspark import SparkContext
    4. from pyspark.streaming import StreamingContext
    5. # In[ ]:
    6. #创建两个工作线程,将这两个线程喂给StreamingContext,时间间隔是1秒
    7. #这里有个错误Cannot run multiple SparkContexts at once
    8. #参考:http://stackoverflow.com/questions/28259756/how-to-create-multiple-sparkcontexts-in-a-console
    9. #先要尝试关闭sc才能创建多个SparkContext
    10. try:
    11. sc.stop()
    12. except:
    13. pass
    14. sc = SparkContext("local[2]", "NetworkWordCount")
    15. ssc = StreamingContext(sc, 1)
    16. #sc.stop()
    17. # In[ ]:
    18. #创建一个DStream,从本机的这个端口取数据
    19. lines = ssc.socketTextStream("localhost", 9999)
    20. # In[ ]:
    21. #lines中的数据记录是一行行的文本,下面将文本切割成字
    22. words = lines.flatMap(lambda line: line.split(" "))
    23. #这里的flatMap是一对多DStream操作,生成一个新的DStream,就是变成了字流了
    24. # In[ ]:
    25. #下面数一数每一批次的字的个数
    26. # Count each word in each batch
    27. pairs = words.map(lambda word: (word, 1))
    28. wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    29. # In[ ]:
    30. # 打印DStream中的每个RDD的前十个元素
    31. wordCounts.pprint()
    32. # In[ ]:
    33. ssc.start() # 开始计算
    34. ssc.awaitTermination() #等待计算停止
    35. # In[ ]:
    36. #将这个文件的py脚本提交计算: ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
    37. #在命令行输入nc -lk 9999 然后模拟输入字符串文本,那么在pyspark命令行会打印出每秒钟输入的数据的统计结果


    基本概念

        要想写自己的streaming程序,首先要添加maven或者sbt的依赖
        
    1. <dependency>
    2. <groupId>org.apache.spark</groupId>
    3. <artifactId>spark-streaming_2.11</artifactId>
    4. <version>2.0.0</version>
    5. </dependency>

    对于外部输入流的依赖现在不在核心api中了,需要单独添加依赖。

    初始化StreamingContext
    可以从SparkContext对象中创建
    1. from pyspark import SparkContext
    2. from pyspark.streaming import StreamingContext
    3. sc = SparkContext(master, appName)
    4. ssc = StreamingContext(sc, 1)
    appName是程序名字可以在UI中显示
    master是Spark,Mesos,YARN cluster URL    或者是 声明的Local[*]字符串使得运行在本地模式
    当运行在集群上的时候,不要写死在代码里面,而是要从spark-submit启动,传递进去。
    对于本地测试或者单元测试,可以传递local[*]

    在context被定义之后,要做下面的事情:
    1. 通过创建DStream去定义输入资源
    2. 通过对DStream的转换和输出操作定义流的计算
    3. 使用streamingContext.start()开始接收数据并处理数据
    4. 使用streamingContext.awaitTermination()等待处理过程停止(手动或者因为错误)
    5. 可以使用streamingContext.stop()手动停止处理过程

    要点:
    1. 一旦一个context被启动,不能再添加任何新的流进去了
    2. 一旦context被停止,就不能重启了
    3. 在JVM中只能有一个StreamingContext被激活
    4. 在streamingContext上使用stop()也会停止SparkContext(),要想单独停止前者,设置stop()的可选的参数 stopSparkContext()参数为false
    5. 一个sparkContext可以被重复使用,去创建多个StreamingContext,只要前一个StreamingContext被单独停止,下一个就可以接着创建。

    Discretized Streams(DStreams)

    是Spark Streaming的基本的抽象,代表了一个连续的数据流
    可以是从数据源接受的输入数据流,也可以是转换输入数据流得到的数据流
    一个DStream代表一串RDD,RDD是不可分割的基本数据单元抽象。
    每一个在DStream中的RDD包含特定时间间隔的数据
     
    如果时间是1秒的话,从0-1秒的很多RDD,与从1-2的RDD等,组成了DStream

    任何对DStream的操作,都会被翻译成对底层的RDD的操作,例如,将Lines转换成words的操作
     
    对DStream的操作,隐藏了很多细节,给开发者提供高度抽象的API

    输入DStreams和Receivers
    每一个输出DStream(除了file strem)都关联一个Receiver对象,这个对象从数据源接受数据存储在spark的内存中等待处理。
    Spark Streaming支持两种类型的内建数据源
        1. Basic Sources:直接在StreamingContext API中可用的Sources,比如file systems和socket connections
        2. Advances Sources:从外部工具类中调过来的例如Kafka,Flume,Kinesis等,需要链接一些外部依赖。

    关键点:
        1. 注意在本地运行SparkStreaming的时候,不要使用local或者local[1]作为主机的URL,因为这些都是意味着开一个线程,因为如果只是输入一个数据源,那么这个单一的线程会用来运行receiver,那么没有线程去处理接收到的数据了。所以本地运行的时候,参数local[n]中的n最好大于运行中的receiver。
        2. 相应的在集群上运行的时候,分配的核心数要比接收者的数目多,否则的话系统能接收数据,但是不能处理。

    Basic Sources 基础数据源

    在基础例子中已经看到过ssc.socketTextStream(),下面看file streams
    1. streamingContext.textFileStream(dataDirectory)
    可以创建一个DStream
    spark会监控这个路径,处理在那个路径中的任何文件
    注意:
        1. 路径中的文件要有相同的数据格式
        2. 文件必须通过自动专业或者重命名进入到这个路径的
        3. 一旦进入,这些文件不能被改变,所以如果文件被连续附加,那么新的数据不能被读取的
    针对简单的文本文件,有个简单的方法streamingContext.textFileStream(dataDirectory)
    因为file stream不需要运行receiver所以不需要分配核心或者线程去处理
    Python API不支持fileStream只是支持textFileStream

    可以使用RDD的queue去创建DStream,使用streamingContext.queueStream(queueofRDDs)

    Advanced Souces 高级数据源

    As of Spark 2.0.0, Kafka, Kinesis and Flume are available in the Python API.
    因为这些高级的数据源的支持比较复杂,需要依赖单独的包,现在被转移出了核心的API,所以不能再shell中使用,也就不能在shell中测试这些数据源。如果非要的话,需要下载对应的maven jar包,和对应的依赖,然后添加到classpath

    Custom Sources自定义数据源

    现在python还不支持,但是要想从自定义的数据源创建DStream,就要自己实现用户定义的receiver,这可以接受自定义的数据,并且发送到spark中

    Receiver Reliability 接收者的可靠性

    按照可靠性可以把数据分为两种,有的数据源例如Kafka和Flume运行传送被回复的数据、
    如果系统正确接受到这些要被确认的数据,可以保证不会因为某种失败而导致数据丢失。这导致两种类型的接收者。
        1. 可靠的接收者:当数据被接受并存储到spark之后,必须回复确认消息给可靠数据源。
        2. 不可靠的接收者:不用回复确认,针对没有确认机制的数据源,或者有确认机制但是不需要执行复杂确认机制的数据源。


    Transformations on DStreams DStream的转换
    map(fun)这个函数将输入的DStream的每一个元素传递给func得到一个新的DStream
    flatMap(func)同上,只是每个输入可以map到多个输出项
    filter(func)选择func返回结果为true的DStream中的记录组成新的DStream
    reparitition(numPartitions)通过改变划分去改变DStream的并行水平
    union(otherStream)合并
    count()返回一个新的DStream,是原始的DStream中的每个RDD的元素的数目
    reduce(func)使用函数func聚合原始数据汇总的每个RDD得到一个新的单一元素RDD组成的DStream
    countByValue()调用类型K的DStream时候返回一个新的DStream有(K,long)对,其中long是k在每个RDD中出现的频率
    reduceByKey(func,[numTasks])将(k,v)中的v按照k使用func进行聚合
    join(otherStream,[numTasks])(k,v)(k,w)得到(k,(v,w))
    cogroup(otherStream,[numTasks])(k,v)(k,w)得到(k,Seq[V],Seq[W])
    tansform(func)作用在每个RDD上得到新的RDD组成的DStream
    updateStateByKey(func)每个键都是通过将给定的函数作用在其值上得到的新的DStream

    下面是对某些转换的详细的讨论
    UpdateStateByKey Operation
    允许当使用新的信息连续更新的时候,维护任意的状态
        1. 定义状态,这个状态可以是任意的数据类型
        2. 定义状态的更新函数
    不管有没有数据,spark都会更新状态,如果更新函数返回为none那么键值对就会被消除
    假设想要维持一个运行时数目,那么运行时数目就是一个状态,是个整数,下面是一个更新函数
    1. def updateFunction(newValues, runningCount):
    2. if runningCount is None:
    3. runningCount = 0
    4. return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
    假设使用前面的paris DStream包含(word,1)对
    1. runningCounts = pairs.updateStateByKey(updateFunction)


    转换操作:
    tranform操作允许任意的RDD-to-RDD函数应用到DStream,下面是一个例子:
    1. spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
    2. # join data stream with spam information to do data cleaning
    3. cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))


    窗口操作:
       允许我们应用transformation到一个滑动窗口的数据上
     上面的例子说明了每个窗口操作要声明下面的两个参数
        windows length:窗口的长度,上面的例子是3
        sliding interval:窗口被执行的时间间隔,例子中的书2
    上面的两个参数都应该是元素DStream批间隔(上面的间隔是1)的整数倍

    下面是一个窗口操作的例子,假设我们想生成过去的30秒的数据的wordcounts,每10秒钟一次
    1. # Reduce last 30 seconds of data, every 10 seconds
    2. windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
    下面是一个常见的窗口操作的描述,所有的操作都传递两个参数,窗口的长度和时间间隔
    window(长度,间隔)原来的DStream按照新的指定窗口进行切分返回新的DStream
    countByWindow(长度,间隔)返回滑动窗口的元素个数
    reduceByWindow(func, windowLength, slideInterval)
    读原来的DStream数据进行聚合得到新的DStream
    reduceByKeyAndWindow(func,长度,间隔,[numtasks])(k,v)中的k被函数合并得到新的DStream
    reduceByKeyAndWindow(func,invFunc,长度,间隔,[numtasks])比上面的更高效,对窗口内的数据增量聚合和逐步移去得到聚合后新的DStream
    countByValueAndWindow(windowLength, slideInterval, [numTasks])根据窗口计算每个元素的频次

    Join Operations
    下面是简单的流的join
    1. stream1 = ...
    2. stream2 = ...
    3. joinedStream = stream1.join(stream2)
    You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin
    下面是基于窗口的流的Join
    1. windowedStream1 = stream1.window(20)
    2. windowedStream2 = stream2.window(60)
    3. joinedStream = windowedStream1.join(windowedStream2)
    sream-dataset的join
    流和数据集的join操作是使用lambda表达式实现的
    1. dataset = ... # some RDD
    2. windowedStream = stream.window(20)
    3. joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))


    DStream的输出操作

    print()
    前十个元素打印出来
    saveAsTextFiles(prefix, [suffix])
    将DStream中的内容以文本方式保存成文件,每次批处理间隔内产生的文件按照prefix-TIME_IN_MS[.suffix]命名
    saveAsObjectFiles(prefix, [suffix])
    将DStream中的内容按对象序列化并且以SequenceFile格式保存,每次批处理间隔文件按照上面的命名
    saveAsHadoopFiles(prefix, [suffix])
    将DStream中的内容按对象序列化并且以hadoop格式保存,每次批处理间隔文件按照上面的命名
    foreachRDD(func)
    对每个RDD应用这个函数,将RDD保存在外部文件中

    Design Patterns for using foreachRDD
    foreachRDD的设计模式
    dstream.foreachRDD非常强大,但是容易出错
    将数据写到外部系统需要创建一个连接对象,使用这个对象例如Tcp Connection发送数据到远程的系统
    开发者可能会错误的连接到Spark Driver,然后试图在worker中使用将数据保存到RDD中
    例如:
    1. def sendRecord(rdd):
    2. connection = createNewConnection() # executed at the driver
    3. rdd.foreach(lambda record: connection.send(record))
    4. connection.close()
    5. dstream.foreachRDD(sendRecord)
    这是错误的,因为这要求连接对象序列化并且从driver发送到worker
    这样的连接对象很少能跨机器转让
    正确的做法是在worker中创建连接对象
    1. def sendRecord(record):
    2. connection = createNewConnection()
    3. connection.send(record)
    4. connection.close()
    5. dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
    通常的,创建一个对象需要时间和资源的管理费用,因此,为每个记录创建和摧毁连接对象可能会带来不必要的管理费用,这可能会显著降低系统的吞吐量,一个更好的解决方案是使用rdd.foreachPartition去创造唯一连接对象,并且用这个对象发送所有的RDD。
    1. def sendPartition(iter):
    2. connection = createNewConnection()
    3. for record in iter:
    4. connection.send(record)
    5. connection.close()
    6. dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
    最终的优化是,跨RDD或者批次,重用连接对象
    程序员可以维护一个连接对象的静态的池。
    1. def sendPartition(iter):
    2. # ConnectionPool is a static, lazily initialized pool of connections
    3. connection = ConnectionPool.getConnection()
    4. for record in iter:
    5. connection.send(record)
    6. # return to the pool for future reuse
    7. ConnectionPool.returnConnection(connection)
    8. dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))







    万事走心 精益求美


  • 相关阅读:
    hdu 5524 Subtrees 递推
    一些数论函数
    hdu 5480 Conturbatio (前缀和)
    hdu 5479 Scaena Felix (好坑的简单题)
    hdu 5465 Clarke and puzzle(树状数组 或 前缀和 + Nim游戏)
    uva 10534 Wavio Sequence(LIS)
    MFC简单绘制安卓机器人
    解决kubuntu(KDE4.8.5桌面环境)找不到中文语言包
    Windows系统完全退出VMware方法
    【VC6.0】getline需要输入2次回车才会结束的BUG修复方法
  • 原文地址:https://www.cnblogs.com/kongchung/p/5778796.html
Copyright © 2011-2022 走看看