zoukankan      html  css  js  c++  java
  • Spark Streaming

    Spark Streaming

    一、 介绍

    Spark StreamingSpark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持多种数据源获取数据,包括kafkaflumetwitter等。

    Spark的各个框架核心都是Spark Core

    Spark Streaming内部处理机制是:接受实时流的数据,根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这批数据,最终得到处理后的一批批结果数据。对应的批数据,在Spark内核对应一个RDD实例,因此对应流数据的DStream可以看成是一组RDDs,RDD的一个序列。

     

    Spark Streaming的基本原理是将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,基本原理图如下:

     

    首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。

    二、 容错

    DStream基于RDD组成,RDD的容错性依旧有效,我们首先回忆一下SparkRDD的基本特性。

    1RDD是一个不可变的、确定性的可重复计算的分布式数据集。RDD的某些partition丢失了,可以通过血统(lineage)信息重新计算恢复;

    2如果RDD任何分区因worker节点故障而丢失,那么这个分区可以从原来依赖的容错数据集中恢复;

    3由于Spark中所有的数据的转换操作都是基于RDD的,即使集群出现故障,只要输入数据集存在,所有的中间结果都是可以被计算的。

    Spark Streaming是可以从HDFSS3这样的文件系统读取数据的,这种情况下所有的数据都可以被重新计算,不用担心数据的丢失。但是在大多数情况下,Spark Streaming是基于网络来接受数据的,此时为了实现相同的容错处理,在接受网络的数据时会在集群的多个Worker节点间进行数据的复制(默认的复制数是2),这导致产生在出现故障时被处理的两种类型的数据:

    1Data received and replicated :一旦一个Worker节点失效,系统会从另一份还存在的数据中重新计算。

    2Data received but buffered for replication :一旦数据丢失,可以通过RDD之间的依赖关系,从HDFS这样的外部文件系统读取数据。

    三、 构建Spark Streaming

    作为构建于Spark之上的应用框架,Spark Streaming承袭了Spark的编程风格,对于已经了解Spark的用户来说能够快速地上手。接下来以Spark Streaming官方提供的WordCount代码为例来介绍Spark Streaming的使用方式。

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    
    val ssc = new StreamingContext(conf, Seconds(1))
    
    // Create a DStream that will connect to hostname:port, like localhost:9999
    
    val lines = ssc.socketTextStream("localhost", 9999)
    
    // Split each line into words
    
    val words = lines.flatMap(_.split(" "))
    
    import org.apache.spark.streaming.StreamingContext._
    
    // Count each word in each batch
    
    val pairs = words.map(word => (word, 1))
    
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Print the first ten elements of each RDD generated in this DStream to the console
    
    wordCounts.print()
    
    ssc.start()              // Start the computation
    
    ssc.awaitTermination()  // Wait for the computation 

    1.创建StreamingContext对象 同Spark初始化需要创建SparkContext对象一样,使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称(NetworkWordCount)。需要注意的是参数Seconds(1)Spark Streaming需要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置;

    2.创建InputDStream如同StormSpoutSpark Streaming需要指明数据源。如上例所示的socketTextStreamSpark Streamingsocket连接作为数据源读取数据。当然Spark Streaming支持多种不同的数据源,包括KafkaFlumeHDFS/S3KinesisTwitter等数据源;同样包括自己重写Receiver方法来实现自己的消息处理例如metaq

    3.操作DStream对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用MapReduceByKey方法进行计算,当然最后还有使用print()方法输出结果;

    4.启动Spark Streaming之前所作的所有步骤只是创建了执行流程,程序没有真正连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划,当ssc.start()启动后程序才真正进行所有预期的操作。

    四、 DStream的输入源

    Spark Streaming中所有的操作都是基于流的,而输入源是这一系列操作的起点。输入 DStreams DStreams 接收的流都代表输入数据流的来源,在Spark Streaming提供两种内置数据流来源:

    基础来源 StreamingContext API 中直接可用的来源。例如:文件系统、Socket(套接字)连接和 Akka actors

    高级来源 KafkaFlumeKinesisTwitter

  • 相关阅读:
    SpringBoot多数据源启动器
    数据结构模拟器
    mysql5.7查询今天、昨天、本周、上周、本月、上月数据
    SpringBoot项目本地可以发送邮件,部署到阿里云服务器发送邮件失败的解决方法
    Centos7搭建Maven私服-Nexus3.19.1-01
    Linux中部署jar包并指定日志输出文件
    ThreadLocal是什么?谈谈你对他的理解
    leetcode-双指针遍历
    不要再纠结css/js/html有没有必要放在WEB-INF下了
    数据库的表的字段名称与实体类(pojo)不对应解决方案
  • 原文地址:https://www.cnblogs.com/Tonyzczc/p/10429003.html
Copyright © 2011-2022 走看看