zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记九之铭文升级版

    铭文一级:

    核心概念:
    StreamingContext

    def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
    }

    def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
    }

    batch interval可以根据你的应用程序需求的延迟要求以及集群可用的资源情况来设置


    一旦StreamingContext定义好之后,就可以做一些事情

    Discretized Streams (DStreams)
    Internally, a DStream is represented by a continuous series of RDDs
    Each RDD in a DStream contains data from a certain interval

    对DStream操作算子,比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作;
    因为一个DStream是由不同批次的RDD所构成的。


    Input DStreams and Receivers

    Every input DStream (except file stream, discussed later in this section)
    is associated with a Receiver object which
    receives the data from a source and stores it
    in Spark’s memory for processing.

    铭文二级:

    第七章:Spark Streaming核心概念与编程

    DStream、Transformations、Output operation

    IDEA右上角的放大镜可以搜索类,查看源码

    this为附属构造方法

    Context开始后无法设置或者添加

    停止Streaming Context也可以通过停Spark Context来实现:

    stop()

    stopSparkContext()

    DStream->其实是一系列的RDDs

    来源:1.流进来  2.其他DStream转化过来

    实战之处理Socket数据:

    创建类NetworkWordCount

    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")  //双引号勿忘,val定义!!!

    val ssc = new StreamingContext(sparkConf,Seconds(5))  //Seconds

    val lines = ssc.socketTextStream("localhost",6789)     //lines此时就是DStream

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceBykey(_+_)

    result.print

    ssc.start()

    ssc.awaitTermination()

    启动:nc -lk 6789

    不能使用local[1]或者local,因为receiver自己operation也要使用一个,否则没有输出内容

    运行会报错,提示缺少依赖,可以打开maven project按要求导入相对应的依赖

    还可能会提示缺少LZ4 And XxHash的依赖,去maven repository网址引入即可

  • 相关阅读:
    手机也需“绿色环保”,省电类APP或将成为“标配”?
    Netty入门实例及分析
    ios开发经常使用RGB色值
    poj Kaka's Matrix Travels
    C++ 继承体系中的名称覆盖
    spring-framework-3.2.4与hibernate-release-4.3.5下使用HibernateDaoSupport抛出异常
    Codeforces Round #131 Div1 B
    URAL 1837. Isenbaev's Number (map + Dijkstra || BFS)
    数据库升级代码学习
    delphi:临界区对象TCriticalSection(Delphi) 与 TRtlCriticalSection 的区别
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8372651.html
Copyright © 2011-2022 走看看