zoukankan      html  css  js  c++  java
  • spark教程(15)-Streaming

    Spark Streaming 是一个分布式数据流处理框架,它可以近乎实时的处理流数据,它易编程,可以处理大量数据,并且能把实时数据与历史数据结合起来处理。

    Streaming 使得 spark 具有了流式处理的能力,它为数据流式处理提供了高层抽象,底层仍然是 spark,所以它具有 spark 的可扩展、可容错、高吞吐量的特点,而且它可以与 spark 的各种库结合使用,如 sparkSQL、MLib、ml 等

    总体架构

    Spark Streaming 是一个伪实时的流处理框架,它处理的是一个微批次的数据流,就是说他把数据流按照非常小的时间间隔切分成一批一批的数据,然后每次处理一批数据;

    每一批数据仍然以 RDD 方式存储,然后使用 spark core 进行处理;

    RDD 操作结果也是一批一批的输出;

    数据流来源

    Streaming 支持多种数据流来源,比较常用的有 TCP网络传输、Kafa、Flume 等,还有 Twitter、ZeroMQ、MQTT 等,

    它也可以把一个文件当成流来处理,

    也可以自定义数据流来源;

    流数据经过 spark 处理后可以流向各种地方;

    总结一下如下图

     

    接收器

    也叫数据采集器,接收器收到数据后存储在内存中;

    Streaming 在每个 worker 上为每个数据流来源创建一个接收器;

    一个 spark 应用可以同时接收多个数据流来源,然后统一处理;

    API 

    Streaming API 有两个高度抽象 StreamingContext 和 离散流 DStream

    StreamingContext

    他是 Streaming 库的入口点,使得 Streaming 连接到 spark 上;

    每个 Streaming 应用必须先创建一个 StreamingContext 实例;

    创建 StreamingContext 实例

    创建方法和 SparkContext 一样,创建 sc 的方法都能用来创建 StreamingContext;

    不同的是多了一个参数,指定划分数据流的时间间隔;

    from pyspark import SparkContext, StreamingContext, SparkConf
    conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
    ssc = StreamingContext(conf=conf, 10)    # 10s 间隔
    
    ## 或者这样
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)  # 直接传入 sc

    StreamingContext 实例的方法

    ssc.start()     # 启动 流式计算,在这之前,什么也不会发生
    ssc.checkpoint('hdfs path')     # 定期创建检查点数据,输入为 hdfs 的路径
    ssc.stop()      # 停止 流式计算
    ssc.awaitTermination()      # 等待流式计算结束

    checkpoint

    DStream     【内容比较多,故单独一章】

    离散数据流;

    他是 Streaming 处理数据流的一个高度抽象,也是一个抽象类,并且定义了一系列对该类的操作;

    不同数据流来源有不同的 DStream 类;

    DStream 实际上是一个 RDD 序列,Spark Streaming 把对 DStream 的操作转换成对 RDD 的操作;

    因为他是 RDD 序列,所以具有 RDD 的特点:不可变、分区、容错

    创建 DStream 实例

    DStream 创建有两种方式,一种是从数据流来源直接创建,一种是从现有的 DStream 对象转换得到

    socketTextStream:创建一个从 TCP 套接字连接 接收数据流的 DStream

    3 个参数,host,port,第三个可选,指定接收数据的存储等级

    def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)

    默认的存储等级为 StorageLevel.MEMORY_AND_DISK_2,顾名思义,表示接收到的数据先存储在内存中,如果内存放不下,多出来的数据会存放到硬盘上;

    而且他会对接收到的数据 以 spark 序列化的方式 进行序列化操作;

    所以这个存储等级会有序列化的开销,但是减少了 jvm 垃圾回收相关的问题;

    接收到的数据会复制多份,提高容错;

    选择合适的存储等级可以提高性能,比如 Streaming 采集周期很短,如几秒钟,也就是数据量很小,那么可以指定存储等级为只内存存储 StorageLevel.MEMORY_ONLY

    textFileStream:创建一个 DStream 用于监控 hadoop 兼容的文件系统中是否有新文件创建

    输入为被监控的目录;

    如果有新文件创建,则将作为文本文件读出;

    注意,写入被监控目录的文件必须是从同等文件系统中移动过来的,比如 linux 系统,新文件必须是用 mv 命令移动过来的

    def textFileStream(self, directory)

    actorStream:用户自己定义的 Akka actor 接收器的 DStream

    python 好像没这个

    KafkaUtils:创建从 Kafka 接收数据流的 DStream

    class KafkaUtils(object):
    
        @staticmethod
        def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
                         storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                         keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
            """
            Create an input stream that pulls messages from a Kafka Broker.
    
            :param ssc:  StreamingContext object
            :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
            :param groupId:  The group id for this consumer.
            :param topics:  Dict of (topic_name -> numPartitions) to consume.
                            Each partition is consumed in its own thread.
            :param kafkaParams: Additional params for Kafka
            :param storageLevel:  RDD storage level.
            :param keyDecoder:  A function used to decode key (default is utf8_decoder)
            :param valueDecoder:  A function used to decode value (default is utf8_decoder)
            :return: A DStream object

    还有一些其他的,参考 pyspark

    DStream 操作 

    内容比较多,后面会专门写一篇博客 

    DStream 输出

    Dstream 可以输出到各种地方,如文件、数据库或者其他应用程序

    输出到文件

    saveAsTextFiles:将 DStream 保存成文件,他为每个 RDD 创建一个目录,并在每个目录里创建多个副本;

    目录名称为 用户定义的前缀-时间戳-可选后缀

    DStream.saveAsTextFiles('/usr/lib/text')

    saveAsObjectFiles:将 DStream 以序列化对象的形式保存成二进制 SequenceFile 文件,用法同 saveAsTextFiles

    saveAsHadoopFiles:只有由键值对组成的 DStream 才能使用该方法

    saveAsNewAPIHadoopFiles:自己试试

    输出到控制台

    pprint(n):打印指定输出元素个数

    输出到数据库

    foreachRDD:输入一个 func 逐个处理 DStream 中的每一个 RDD;

    该方法无需返回任何东西;

        def foreachRDD(self, func):
            """
            Apply a function to each RDD in this DStream.
            """
            if func.__code__.co_argcount == 1:
                old_func = func
                func = lambda t, rdd: old_func(rdd)
            jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
            api = self._ssc._jvm.PythonDStream
            api.callForeachRDD(self._jdstream, jfunc)

    对于 foreachRDD 需要理解的是

    1. func 是把 RDD 作为输入,并且可以使用 RDD 的所有操作;

    2. foreachRDD 执行在 Driver 中,而 func 执行在 Executor 中;

    3. foreachRDD 不仅仅用于存到数据库

    存储到数据库需要注意的是

    1. 数据库连接比较耗时,不要频繁的连接、关闭

    2. 数据库连接无法序列化,也就是无法从 Driver 发送给 Executor,故数据库连接只能在 worker 上创建,并且复用

    3. RDD 的 foreachPartition 操作可以使用同一个数据库连接保存多个 DStream 中的元素,可在 func 中使用该方法

    4. 利用数据库连接池进行优化

    5. 也可以采用批量写入的方法来优化数据库存储

    示例代码

    数据流来源是 TCP 套接字

    from __future__ import print_function
    import sys
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
            sys.exit(-1)
    
        sc = SparkContext(appName="PythonStreamingNetworkWordCount")
        # 以指定时间为周期采集实时数据
        ssc = StreamingContext(sc, 30)   # 采集周期为 x s
    
        ### 连接已经开启的 socket 服务器,注意必须事先开启 socket server
        lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 采集数据
        counts = lines.flatMap(lambda line: line.split(" "))
                      .map(lambda word: (word, 1))
                      .reduceByKey(lambda a, b: a+b)
        counts.pprint()     # 打印 DStream
        ## 如
        # (u'a', 1)
        # (u'e', 1)
        # (u'd', 1)
        counts.pprint(2)     # 打印 DStream,并指定输出元素个数
        ## 如果 pprint 输出如上,pprint(2)输出如下
        # (u'a', 1)
        # (u'e', 1)
        counts.saveAsTextFiles('/usr/lib/text')     # DStream 保存至文件系统
    
    
        ssc.checkpoint('/spark')
    
        ssc.start()     # 启动采集器
        ssc.awaitTermination()      # 等待 socket ser 终止

    TCP 服务器要事先存在,在 linux 上使用如下命令建立 TCP 服务器

    nc -lk 9999

    注意 nc 命令需要手动安装

    数据流来源是 Kafka

    import time
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from operator import add
    
    
    sc = SparkContext(master="yarn",appName="PythonSparkStreamingRokidDtSnCount")
    ssc = StreamingContext(sc, 2)
    zkQuorum = '172.16.89.80:2181'      # broker list
    topic = {'1125':1}          # topic name : partition
    groupid = "test-consumer-group"     # consumer group
    lines = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    lines1 = lines.flatMap(lambda x: x.split("
    "))
    valuestr = lines1.map(lambda x: x.value.decode())
    valuedict = valuestr.map(lambda x:eval(x))
    message = valuedict.map(lambda x: x["message"])
    rdd2 = message.map(lambda x: (time.strftime("%Y-%m-%d",time.localtime(float(x.split("u0001")[0].split("u0002")[1])/1000))+"|"+x.split("u0001")[1].split("u0002")[1],1)).map(lambda x: (x[0],x[1]))
    rdd3 = rdd2.reduceByKey(add)
    rdd3.saveAsTextFiles("/tmp/wordcount")
    rdd3.pprint()
    ssc.start()
    ssc.awaitTermination()

    其他场景参考 spark 自带的样例

    参考资料:

    《Spark大数据分析核心概念技术及实践OCR-2017》  电子书

  • 相关阅读:
    Memcached:高性能的分布式内存缓存服务器
    MySQL数据库Query的优化
    MySQL数据库的锁定机制及优化
    系统架构及实现对性能的影响(一)
    Mysql数据库的基本结构和存储引擎简介
    Spring事务管理的回滚
    穷举算法实例
    在写完全二叉树的构建及遍历
    Inotify
    Rsync扩展
  • 原文地址:https://www.cnblogs.com/yanshw/p/11929180.html
Copyright © 2011-2022 走看看