zoukankan      html  css  js  c++  java
  • Spark Streaming揭秘 Day28 在集成开发环境中详解Spark Streaming的运行日志内幕

    Spark Streaming揭秘 Day28

    在集成开发环境中详解Spark Streaming的运行日志内幕

    今天会逐行解析一下SparkStreaming运行的日志,运行的是WordCountOnline这个Demo。

    启动过程

    SparkStreaming启动是从如下日志开始:

    16/06/16 21:26:44 INFO ReceiverTracker: Starting 1 receivers
    16/06/16 21:26:44 INFO ReceiverTracker: ReceiverTracker started
    16/06/16 21:26:44 INFO ReceiverTracker: Receiver 0 started

    ReceiverTracker启动了一个receiver,完成启动

    16/06/16 21:26:44 INFO ForEachDStream: metadataCleanupDelay = -1
    16/06/16 21:26:44 INFO ShuffledDStream: metadataCleanupDelay = -1
    16/06/16 21:26:44 INFO MappedDStream: metadataCleanupDelay = -1
    16/06/16 21:26:44 INFO FlatMappedDStream: metadataCleanupDelay = -1
    16/06/16 21:26:44 INFO SocketInputDStream: metadataCleanupDelay = -1

    从日志的顺序,可以看出从Driver的角度讲,DStreamGraph是从后往前构造的,一直到最开始的地方SocketInputDStream

    16/06/16 21:26:44 INFO SocketInputDStream: Slide time = 15000 ms
    16/06/16 21:26:44 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
    16/06/16 21:26:44 INFO SocketInputDStream: Checkpoint interval = null
    16/06/16 21:26:44 INFO SocketInputDStream: Remember duration = 15000 ms
    16/06/16 21:26:44 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@2d14d3a1
    ....
    16/06/16 21:26:44 INFO ForEachDStream: Slide time = 15000 ms
    16/06/16 21:26:44 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
    16/06/16 21:26:44 INFO ForEachDStream: Checkpoint interval = null
    16/06/16 21:26:44 INFO ForEachDStream: Remember duration = 15000 ms
    16/06/16 21:26:44 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@57481f9

    这部分日志是在SparkStreaming运行前的实例化过程,构建起DStreamGraph中的对象。
    Slide time和Remember duration分别表示Batch Duration和Remember Duration。
    StorageLevel部分的日志有点问题,从源码看应该是StorageLevel(true, true, false, false, 2)。
    Snip20160616_7

    16/06/16 21:26:44 INFO RecurringTimer: Started timer for JobGenerator at time 1466083605000
    16/06/16 21:26:44 INFO JobGenerator: Started JobGenerator at 1466083605000 ms
    16/06/16 21:26:44 INFO JobScheduler: Started JobScheduler
    16/06/16 21:26:44 INFO StreamingContext: StreamingContext started

    这里启动了定时器支持JobGenerator工作,JobGenerator与定期器结合能不断的产生Job,
    定时器产生Job之后,放在线程中,通过JobScheduler提交给SparkCore来进行运行。

    简单总结下:

    1. 在启动时会在Executor上启动receivers。
    2. 在这个基础上,因为要构造出DStreamGraph,会从后往前回溯链条,并从前往后构造对象。
    3. RecurringTimer、JobGenerator、JobScheduler是调度的核心。

    至此,Driver和Receiver都启动完成。

    接收数据

    接下来是接收数据,定时器会根据我们设定的时间开始工作,这里是关键。

    16/06/16 21:26:44 INFO RecurringTimer: Started timer for BlockGenerator at time 1466083605000
    16/06/16 21:26:44 INFO BlockGenerator: Started BlockGenerator
    16/06/16 21:26:44 INFO BlockGenerator: Started block pushing thread
    启动线程存数据
    16/06/16 21:26:44 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.120:51324
    对Receiver进行注册

    下面开始在Executor上执行,启动Receiver
    16/06/16 21:26:44 INFO ReceiverSupervisorImpl: Starting receiver
    16/06/16 21:26:44 INFO ReceiverSupervisorImpl: Called receiver onStart
    16/06/16 21:26:44 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
    16/06/16 21:26:44 INFO SocketReceiver: Connecting to localhost:9999
    16/06/16 21:26:44 INFO SocketReceiver: Connected to localhost:9999

    下面是具体存储数据
    16/06/16 21:26:45 INFO MemoryStore: Block input-0-1466083604800 stored as bytes in memory (estimated size 10.0 B, free 45.1 KB)
    16/06/16 21:26:45 INFO BlockManagerInfo: Added input-0-1466083604800 in memory on localhost:51326 (size: 10.0 B, free: 1140.4 MB)
    16/06/16 21:26:45 WARN BlockManager: Block input-0-1466083604800 replicated to only 0 peer(s) instead of 1 peers
    16/06/16 21:26:45 INFO BlockGenerator: Pushed block input-0-1466083604800

    下面回到Driver,产生了Job
    16/06/16 21:26:45 INFO JobScheduler: Added jobs for time 1466083605000 ms
    16/06/16 21:26:45 INFO JobScheduler: Starting job streaming job 1466083605000 ms.0 from job set of time 1466083605000 ms

    再看下执行完成后日志:
    16/06/16 21:26:45 INFO JobScheduler: Finished job streaming job 1466083605000 ms.0 from job set of time 1466083605000 ms
    16/06/16 21:26:45 INFO JobScheduler: Total delay: 0.188 s for time 1466083605000 ms (execution: 0.127 s)
    这里说明了延时时间。

    清理数据

    16/06/16 21:26:45 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
    16/06/16 21:26:45 INFO InputInfoTracker: remove old batch metadata:

    之后是删除数据,我们发现,执行清理RDD的动作是有点滞后的,差了一个Batch Duration,也就是说在下一个Batch Duration的时候,清理前一次的。
    16/06/16 21:27:00 INFO ShuffledRDD: Removing RDD 4 from persistence list
    16/06/16 21:27:00 INFO MapPartitionsRDD: Removing RDD 3 from persistence list
    16/06/16 21:27:00 INFO MapPartitionsRDD: Removing RDD 2 from persistence list
    16/06/16 21:27:00 INFO BlockRDD: Removing RDD 1 from persistence list
    16/06/16 21:27:00 INFO BlockManager: Removing RDD 4
    16/06/16 21:27:00 INFO BlockManager: Removing RDD 3
    16/06/16 21:27:00 INFO BlockManager: Removing RDD 2
    16/06/16 21:27:00 INFO BlockManager: Removing RDD 1
    16/06/16 21:27:00 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[1] at socketTextStream at WordCountOnline.java:58 of time 1466083620000 ms
    完成数据删除后,是删除元数据。
    16/06/16 21:27:00 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
    16/06/16 21:27:00 INFO InputInfoTracker: remove old batch metadata:

    想补充一点的是,这个清理策略会和rememberDuration(window操作时使用)的设置有关,而rememberDuration默认和BatchDuration是一致的,代码也可以印证这点。

    Snip20160624_60

    欲知后事如何,且听下回分解!

    DT大数据每天晚上20:00YY频道现场授课频道68917580

  • 相关阅读:
    Unity文件操作路径
    自定义协议封装包头、包体
    完全卸载删除gitlab
    shell脚本报错:syntax error: unexpected end of file
    Shell脚本创建的文件夹末尾有两个问号怎么回事?
    您与此网站之间建立的连接并非完全安全
    linux 查看磁盘文件大小
    mysql连接问题
    Linux查看当前开放的端口
    本地Linux备份服务器[Client]定期备份云服务器[Server]上的文件(下)
  • 原文地址:https://www.cnblogs.com/dt-zhw/p/5596971.html
Copyright © 2011-2022 走看看