zoukankan      html  css  js  c++  java
  • 通过SparkListener监控spark应用

     监控spark应用的方式比较多,比如spark on yarn可以通过yarnClient api监控。这里介绍的是spark内置的一种监控方式

    如果是sparkStreaming,对应的则是streamingListener

    package cn.com.kong;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.scheduler.*;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.status.AppStatusStore;
    import org.apache.spark.status.api.v1.ApplicationInfo;
    
    public class CustomSparkListener {
    
        public static void main(String[] args) {
    
            System.setProperty("HADOOP_USER_NAME","etluser");
    
            SparkConf conf = new SparkConf();
            conf.set("spark.hadoopRDD.ignoreEmptySplits", "true");
            conf.set("spark.sql.adaptive.enabled", "true");
            conf.set("spark.sql.adaptive.join.enabled", "true");
            conf.set("spark.executor.memoryOverhead", "1024");
            conf.set("spark.driver.memoryOverhead", "1024");
            conf.set("spark.kryoserializer.buffer.max", "256m");
            conf.set("spark.kryoserializer.buffer", "64m");
            conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -Dlog4j.configuration=log4j.properties");
            conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC -Dlog4j.configuration=log4j.properties");
            conf.set("spark.sql.parquet.writeLegacyFormat", "true");
    
            SparkSession spark = SparkSession
                    .builder()
                    .appName("testSparkListener")
                    .master("local")
                    .config(conf)
                    .enableHiveSupport()
                    .getOrCreate();
    
            spark.sql("use coveroptimize");
    
    //        AppStatusStore appStatusStore = spark.sparkContext().statusStore();
    //        ApplicationInfo applicationInfo = appStatusStore.applicationInfo();
    //        applicationInfo.memoryPerExecutorMB();
    
            //可以创建一个类实现Listener接口,然后调用该类实例。
            //这里测试,直接创建
            spark.sparkContext().addSparkListener(new SparkListenerInterface() {
                @Override
                public void onExecutorRemoved( SparkListenerExecutorRemoved executorRemoved) {
                }
    
                /**
                 * Called when a stage completes successfully or fails, with information on the completed stage.
                 */
                @Override
                public void onStageCompleted( SparkListenerStageCompleted stageCompleted) {
    
                }
    
                @Override
                public void onStageSubmitted( SparkListenerStageSubmitted stageSubmitted) {
    
                }
    
                @Override
                public void onTaskStart(SparkListenerTaskStart taskStart) {
    
                }
                /**
                 * Called when a job ends
                 */
                @Override
                public void onJobEnd(SparkListenerJobEnd jobEnd) {
                    JobResult jobResult = jobEnd.jobResult();
                    System.err.println("自定义监听器jobEnd jobResult:"+jobResult);
                }
                /**
                 * Called when a job starts
                 */
                @Override
                public void onJobStart(SparkListenerJobStart jobStart) {
                    System.err.println("自定义监听器jobStart,jobId:"+jobStart.jobId());
                    System.err.println("自定义监听器jobStart,该job下stage数量:"+jobStart.stageInfos().size());
                }
    
                @Override
                public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
    
                }
    
                @Override
                public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
    
                }
    
                @Override
                public void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
    
                }
                /**
                 * Called when the application ends
                 */
                @Override
                public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
                    System.err.println("Application结束,时间:"+applicationEnd.time());
                }
    
                @Override
                public void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
    
                }
    
                @Override
                public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
    
                }
    
                @Override
                public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
    
                }
    
                @Override
                public void onOtherEvent(SparkListenerEvent event) {
    
                }
    
                @Override
                public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
    
                }
    
                @Override
                public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted speculativeTask) {
    
                }
    
                @Override
                public void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
    
                }
    
                @Override
                public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
    
                }
                /**
                 * Called when the application starts
                 */
                @Override
                public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
                    System.err.println("Application启动,appName:"+applicationStart.appName()+",appID"+
                            applicationStart.appId());
                }
    
                @Override
                public void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
    
                }
    
                @Override
                public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
    
                }
    
                @Override
                public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
    
                }
    
                @Override
                public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
    
                }
            });
    
            String sql1 = "select roadid,count(1) cn from gridmappingroad group by roadid";
            spark.sql(sql1).repartition(2).write().mode(SaveMode.Overwrite)
                    .saveAsTable("test_listener_table");
    
            spark.stop();
        }
    }

    运行日志:

    20/01/17 13:50:51 INFO spark.SparkContext: Running Spark version 2.3.0
    20/01/17 13:50:52 INFO spark.SparkContext: Submitted application: testSparkListener
    20/01/17 13:50:52 INFO spark.SecurityManager: Changing view acls to: kongshuaiwei,etluser
    20/01/17 13:50:52 INFO spark.SecurityManager: Changing modify acls to: kongshuaiwei,etluser
    20/01/17 13:50:52 INFO spark.SecurityManager: Changing view acls groups to: 
    20/01/17 13:50:52 INFO spark.SecurityManager: Changing modify acls groups to: 
    20/01/17 13:50:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(kongshuaiwei, etluser); groups with view permissions: Set(); users  with modify permissions: Set(kongshuaiwei, etluser); groups with modify permissions: Set()
    20/01/17 13:50:52 INFO util.Utils: Successfully started service 'sparkDriver' on port 59545.
    20/01/17 13:50:52 INFO spark.SparkEnv: Registering MapOutputTracker
    20/01/17 13:50:52 INFO spark.SparkEnv: Registering BlockManagerMaster
    20/01/17 13:50:52 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    20/01/17 13:50:52 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    20/01/17 13:50:52 INFO storage.DiskBlockManager: Created local directory at C:UserskongshuaiweiAppDataLocalTemplockmgr-b8c578de-2661-4cf3-9e8d-928159f3aecd
    20/01/17 13:50:52 INFO memory.MemoryStore: MemoryStore started with capacity 898.5 MB
    20/01/17 13:50:52 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    20/01/17 13:50:52 INFO util.log: Logging initialized @1729ms
    20/01/17 13:50:52 INFO server.Server: jetty-9.3.z-SNAPSHOT
    20/01/17 13:50:52 INFO server.Server: Started @1788ms
    20/01/17 13:50:52 INFO server.AbstractConnector: Started ServerConnector@5cc5b667{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    20/01/17 13:50:52 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@410954b{/jobs,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@10b892d5{/jobs/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3d3f761a{/jobs/job,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@579d011c{/jobs/job/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3670f00{/stages,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@452e26d0{/stages/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@46ab18da{/stages/stage,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7689ddef{/stages/stage/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@687a762c{/stages/pool,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a2e2935{/stages/pool/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@733c423e{/storage,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4b629f13{/storage/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@70925b45{/storage/rdd,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b9ea3e3{/storage/rdd/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@aa22f1c{/environment,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@55e7a35c{/environment/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@37cd92d6{/executors,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5922ae77{/executors/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4263b080{/executors/threadDump,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2af616d3{/executors/threadDump/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71f67a79{/static,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@34abdee4{/,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71a9b4c7{/api,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@21ca139c{/jobs/job/kill,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@226f885f{/stages/stage/kill,null,AVAILABLE,@Spark}
    20/01/17 13:50:52 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://sl1-43087-b01.BJ.DATANGMOBILE.com:4040
    20/01/17 13:50:52 INFO executor.Executor: Starting executor ID driver on host localhost
    20/01/17 13:50:53 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59558.
    20/01/17 13:50:53 INFO netty.NettyBlockTransferService: Server created on sl1-43087-b01.BJ.DATANGMOBILE.com:59558
    20/01/17 13:50:53 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    20/01/17 13:50:53 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, sl1-43087-b01.BJ.DATANGMOBILE.com, 59558, None)
    20/01/17 13:50:53 INFO storage.BlockManagerMasterEndpoint: Registering block manager sl1-43087-b01.BJ.DATANGMOBILE.com:59558 with 898.5 MB RAM, BlockManagerId(driver, sl1-43087-b01.BJ.DATANGMOBILE.com, 59558, None)
    20/01/17 13:50:53 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, sl1-43087-b01.BJ.DATANGMOBILE.com, 59558, None)
    20/01/17 13:50:53 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, sl1-43087-b01.BJ.DATANGMOBILE.com, 59558, None)
    20/01/17 13:50:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@33a630fa{/metrics/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:53 INFO internal.SharedState: loading hive config file: file:/D:/ideaIC/workspace/spark-project/coverOptimize/target/classes/hive-site.xml
    20/01/17 13:50:53 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
    20/01/17 13:50:53 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
    20/01/17 13:50:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2fb5fe30{/SQL,null,AVAILABLE,@Spark}
    20/01/17 13:50:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@456be73c{/SQL/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@41a6d121{/SQL/execution,null,AVAILABLE,@Spark}
    20/01/17 13:50:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4f449e8f{/SQL/execution/json,null,AVAILABLE,@Spark}
    20/01/17 13:50:53 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@27e32fe4{/static/sql,null,AVAILABLE,@Spark}
    20/01/17 13:50:53 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    20/01/17 13:50:53 INFO hive.HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
    20/01/17 13:50:54 INFO hive.metastore: Trying to connect to metastore with URI thrift://worker03.xxx.xxx.cn:908320/01/17 13:50:54 INFO hive.metastore: Connected to metastore.
    20/01/17 13:50:54 INFO session.SessionState: Created local directory: C:/Users/KONGSH~1/AppData/Local/Temp/f35b7531-c964-4d2e-8ba5-b5ade205d12a_resources
    20/01/17 13:50:54 INFO session.SessionState: Created HDFS directory: /tmp/hive/etluser/f35b7531-c964-4d2e-8ba5-b5ade205d12a
    20/01/17 13:50:54 INFO session.SessionState: Created local directory: C:/Users/KONGSH~1/AppData/Local/Temp/kongshuaiwei/f35b7531-c964-4d2e-8ba5-b5ade205d12a
    20/01/17 13:50:54 INFO session.SessionState: Created HDFS directory: /tmp/hive/etluser/f35b7531-c964-4d2e-8ba5-b5ade205d12a/_tmp_space.db
    20/01/17 13:50:54 INFO client.HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
    20/01/17 13:50:56 INFO parquet.ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:50:56 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:50:56 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:50:56 INFO codegen.CodeGenerator: Code generated in 194.663548 ms
    20/01/17 13:50:56 INFO codegen.CodeGenerator: Code generated in 42.50705 ms
    20/01/17 13:50:57 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 249.3 KB, free 898.3 MB)
    20/01/17 13:50:57 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.6 KB, free 898.2 MB)
    20/01/17 13:50:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on sl1-43087-b01.BJ.DATANGMOBILE.com:59558 (size: 24.6 KB, free: 898.5 MB)
    20/01/17 13:50:57 INFO spark.ContextCleaner: Cleaned accumulator 0
    20/01/17 13:50:57 INFO spark.ContextCleaner: Cleaned accumulator 2
    20/01/17 13:50:57 INFO spark.SparkContext: Created broadcast 0 from 
    20/01/17 13:50:57 INFO spark.ContextCleaner: Cleaned accumulator 3
    20/01/17 13:50:57 INFO spark.ContextCleaner: Cleaned accumulator 4
    20/01/17 13:50:57 WARN security.UserGroupInformation: No groups available for user etluser
    20/01/17 13:50:57 WARN security.UserGroupInformation: No groups available for user etluser
    20/01/17 13:50:57 INFO mapred.FileInputFormat: Total input paths to process : 1
    .....
    20/01/17 13:50:57 INFO scheduler.DAGScheduler: Registering RDD 6 (saveAsTable at SparkTest.java:176)
    20/01/17 13:50:57 INFO scheduler.DAGScheduler: Got map stage job 0 (saveAsTable at SparkTest.java:176) with 31 output partitions
    20/01/17 13:50:57 INFO scheduler.DAGScheduler: Final stage: ShuffleMapStage 0 (saveAsTable at SparkTest.java:176)
    20/01/17 13:50:57 INFO scheduler.DAGScheduler: Parents of final stage: List()
    20/01/17 13:50:57 INFO scheduler.DAGScheduler: Missing parents: List()
    自定义监听器jobStart,jobId:0
    自定义监听器jobStart,该job下stage数量:1
    20/01/17 13:50:57 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[6] at saveAsTable at SparkTest.java:176), which has no missing parents
    20/01/17 13:50:57 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 30.1 KB, free 898.2 MB)
    20/01/17 13:50:57 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 13.7 KB, free 898.2 MB)
    20/01/17 13:50:57 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on sl1-43087-b01.BJ.DATANGMOBILE.com:59558 (size: 13.7 KB, free: 898.5 MB)
    20/01/17 13:50:57 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
    ...
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Finished task 30.0 in stage 0.0 (TID 30) in 12199 ms on localhost (executor driver) (31/31)
    20/01/17 13:56:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (saveAsTable at SparkTest.java:176) finished in 359.468 s
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: looking for newly runnable stages
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: running: Set()
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: waiting: Set()
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: failed: Set()
    20/01/17 13:56:57 INFO exchange.ExchangeCoordinator: advisoryTargetPostShuffleInputSize: 67108864, targetPostShuffleInputSize 67108864.
    自定义监听器jobEnd jobResult:JobSucceeded
    20/01/17 13:56:57 INFO spark.SparkContext: Starting job: saveAsTable at SparkTest.java:176
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Registering RDD 10 (saveAsTable at SparkTest.java:176)
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Got job 1 (saveAsTable at SparkTest.java:176) with 2 output partitions
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (saveAsTable at SparkTest.java:176)
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 2)
    自定义监听器jobStart,jobId:1
    自定义监听器jobStart,该job下stage数量:3
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[10] at saveAsTable at SparkTest.java:176), which has no missing parents
    20/01/17 13:56:57 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 22.2 KB, free 898.2 MB)
    20/01/17 13:56:57 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 10.5 KB, free 898.2 MB)
    20/01/17 13:56:57 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on sl1-43087-b01.BJ.DATANGMOBILE.com:59558 (size: 10.5 KB, free: 898.5 MB)
    20/01/17 13:56:57 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[10] at saveAsTable at SparkTest.java:176) (first 15 tasks are for partitions Vector(0))
    20/01/17 13:56:57 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 31, localhost, executor driver, partition 0, PROCESS_LOCAL, 7743 bytes)
    20/01/17 13:56:57 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 31)
    20/01/17 13:56:57 INFO storage.ShuffleBlockFetcherIterator: Getting 234 non-empty blocks out of 6200 blocks
    20/01/17 13:56:57 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
    20/01/17 13:56:57 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 31). 2762 bytes result sent to driver
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 31) in 128 ms on localhost (executor driver) (1/1)
    20/01/17 13:56:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: ShuffleMapStage 2 (saveAsTable at SparkTest.java:176) finished in 0.135 s
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: looking for newly runnable stages
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: running: Set()
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 3)
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: failed: Set()
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (ShuffledRowRDD[11] at saveAsTable at SparkTest.java:176), which has no missing parents
    20/01/17 13:56:57 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 148.9 KB, free 898.0 MB)
    20/01/17 13:56:57 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 52.1 KB, free 898.0 MB)
    20/01/17 13:56:57 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on sl1-43087-b01.BJ.DATANGMOBILE.com:59558 (size: 52.1 KB, free: 898.4 MB)
    20/01/17 13:56:57 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1039
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 3 (ShuffledRowRDD[11] at saveAsTable at SparkTest.java:176) (first 15 tasks are for partitions Vector(0, 1))
    20/01/17 13:56:57 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 2 tasks
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 32, localhost, executor driver, partition 0, ANY, 7754 bytes)
    20/01/17 13:56:57 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 32)
    20/01/17 13:56:57 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
    20/01/17 13:56:57 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
    20/01/17 13:56:57 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:56:57 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:56:57 INFO parquet.ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
    {
      "type" : "struct",
      "fields" : [ {
        "name" : "roadid",
        "type" : "string",
        "nullable" : true,
        "metadata" : {
          "comment" : "??id"
        }
      }, {
        "name" : "cn",
        "type" : "long",
        "nullable" : false,
        "metadata" : { }
      } ]
    }
    and corresponding Parquet message type:
    message spark_schema {
      optional binary roadid (UTF8);
      required int64 cn;
    }
    
           
    20/01/17 13:56:57 INFO compress.CodecPool: Got brand-new compressor [.snappy]
    20/01/17 13:56:57 INFO output.FileOutputCommitter: Saved output of task 'attempt_20200117135657_0003_m_000000_0' to hdfs://master01.xxx.xxx.cn:8020/user/hive/warehouse/coveroptimize.db/test_listener_table/_temporary/0/task_20200117135657_0003_m_000000
    20/01/17 13:56:57 INFO mapred.SparkHadoopMapRedUtil: attempt_20200117135657_0003_m_000000_0: Committed
    20/01/17 13:56:57 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 32). 2351 bytes result sent to driver
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 33, localhost, executor driver, partition 1, ANY, 7754 bytes)
    20/01/17 13:56:57 INFO executor.Executor: Running task 1.0 in stage 3.0 (TID 33)
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 32) in 395 ms on localhost (executor driver) (1/2)
    20/01/17 13:56:57 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
    20/01/17 13:56:57 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
    20/01/17 13:56:57 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:56:57 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
    20/01/17 13:56:57 INFO parquet.ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
    {
      "type" : "struct",
      "fields" : [ {
        "name" : "roadid",
        "type" : "string",
        "nullable" : true,
        "metadata" : {
          "comment" : "??id"
        }
      }, {
        "name" : "cn",
        "type" : "long",
        "nullable" : false,
        "metadata" : { }
      } ]
    }
    and corresponding Parquet message type:
    message spark_schema {
      optional binary roadid (UTF8);
      required int64 cn;
    }
    
           
    20/01/17 13:56:57 INFO output.FileOutputCommitter: Saved output of task 'attempt_20200117135657_0003_m_000001_0' to hdfs://master01.xxx.xxx.cn:8020/user/hive/warehouse/coveroptimize.db/test_listener_table/_temporary/0/task_20200117135657_0003_m_000001
    20/01/17 13:56:57 INFO mapred.SparkHadoopMapRedUtil: attempt_20200117135657_0003_m_000001_0: Committed
    20/01/17 13:56:57 INFO executor.Executor: Finished task 1.0 in stage 3.0 (TID 33). 2222 bytes result sent to driver
    20/01/17 13:56:57 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 33) in 49 ms on localhost (executor driver) (2/2)
    20/01/17 13:56:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: ResultStage 3 (saveAsTable at SparkTest.java:176) finished in 0.466 s
    自定义监听器jobEnd jobResult:JobSucceeded
    20/01/17 13:56:57 INFO scheduler.DAGScheduler: Job 1 finished: saveAsTable at SparkTest.java:176, took 0.616115 s
    20/01/17 13:56:57 INFO datasources.FileFormatWriter: Job null committed.
    20/01/17 13:56:57 INFO datasources.FileFormatWriter: Finished processing stats for job null.
    20/01/17 13:56:57 INFO hive.HiveExternalCatalog: Persisting file based data source table `coveroptimize`.`test_listener_table` into Hive metastore in Hive compatible format.
    Application结束,时间:1579240617923
    20/01/17 13:56:57 INFO server.AbstractConnector: Stopped Spark@5cc5b667{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
    20/01/17 13:56:57 INFO ui.SparkUI: Stopped Spark web UI at http://sl1-43087-b01.BJ.DATANGMOBILE.com:4040
    20/01/17 13:56:57 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    20/01/17 13:56:57 INFO memory.MemoryStore: MemoryStore cleared
    20/01/17 13:56:57 INFO storage.BlockManager: BlockManager stopped
    20/01/17 13:56:57 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    20/01/17 13:56:57 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    20/01/17 13:56:57 INFO spark.SparkContext: Successfully stopped SparkContext
    20/01/17 13:56:58 INFO util.ShutdownHookManager: Shutdown hook called
    20/01/17 13:56:58 INFO util.ShutdownHookManager: Deleting directory C:UserskongshuaiweiAppDataLocalTempspark-547cf37e-2d1e-433c-a584-6c5b7365909f
    
    Process finished with exit code 0
  • 相关阅读:
    深入浅出Vue.js(四) 整体流程
    深入浅出Vue.js(三) 模板编译
    实现strStr()--indexOf()方法
    Z字形变换
    最长回文子串
    删除数组中不符合条件的值
    整数反转
    寻找两个正序数组的中位数
    gorm 关系一对一,一对多,多对多查询
    gorm 如何对字段进行comment注释?
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12205412.html
Copyright © 2011-2022 走看看