zoukankan      html  css  js  c++  java
  • spark-2.2.0-bin-hadoop2.6和spark-1.6.1-bin-hadoop2.6发行包自带案例全面详解(java、python、r和scala)之Basic包下的JavaSparkPi.java(图文详解)

      不多说,直接上干货!

    spark-1.6.1-bin-hadoop2.6里Basic包下的JavaSparkPi.java

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    //package org.apache.spark.examples;
    package zhouls.bigdata.Basic;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import java.util.ArrayList;  
    import java.util.List;
    
    /** 
     * Computes an approximation to pi
     * Usage: JavaSparkPi [slices]
     */
    public final class JavaSparkPi {
      public static void main(String[] args) throws Exception {
          
            /*
           * 主函数:进行圆周率的计算  
           * 自己写的博客:http://www.cnblogs.com/zlslch/p/7455363.html
           */
        SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    
        int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;//分片数
        int n = 100000 * slices;//(为避免溢出,n不超过int的最大值  )
        List<Integer> l = new ArrayList<Integer>(n);//List<Integer>类型 
        for (int i = 0; i < n; i++) {
          l.add(i);
        }
    
        JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
        int count = dataSet.map(new Function<Integer, Integer>() {//计数  
          @Override
          public Integer call(Integer integer) {
            double x = Math.random() * 2 - 1;//小于1的随机数  
            double y = Math.random() * 2 - 1;//小于1的随机数  
            return (x * x + y * y < 1) ? 1 : 0;//点到圆心的的值,小于1计数一次,超出1就不计算  
          }
        }).reduce(new Function2<Integer, Integer, Integer>() {//汇总累加落入的圆中的次数
          @Override
          public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
          }
        });
    
        System.out.println("Pi is roughly " + 4.0 * count / n);
    
        jsc.stop();
      }
    }

     

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/D:/SoftWare/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/D:/SoftWare/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    17/08/30 21:25:42 INFO SparkContext: Running Spark version 1.6.1
    17/08/30 21:25:46 INFO SecurityManager: Changing view acls to: Administrator
    17/08/30 21:25:46 INFO SecurityManager: Changing modify acls to: Administrator
    17/08/30 21:25:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Administrator); users with modify permissions: Set(Administrator)
    17/08/30 21:25:51 INFO Utils: Successfully started service 'sparkDriver' on port 54482.
    17/08/30 21:25:53 INFO Slf4jLogger: Slf4jLogger started
    17/08/30 21:25:53 INFO Remoting: Starting remoting
    17/08/30 21:25:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@169.254.28.160:54495]
    17/08/30 21:25:54 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 54495.
    17/08/30 21:25:54 INFO SparkEnv: Registering MapOutputTracker
    17/08/30 21:25:54 INFO SparkEnv: Registering BlockManagerMaster
    17/08/30 21:25:54 INFO DiskBlockManager: Created local directory at C:UsersAdministratorAppDataLocalTemplockmgr-5e8e9432-30ff-424c-9ba7-14df937d809b
    17/08/30 21:25:54 INFO MemoryStore: MemoryStore started with capacity 1131.0 MB
    17/08/30 21:25:55 INFO SparkEnv: Registering OutputCommitCoordinator
    17/08/30 21:25:55 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    17/08/30 21:25:55 INFO SparkUI: Started SparkUI at http://169.254.28.160:4040
    17/08/30 21:25:55 INFO Executor: Starting executor ID driver on host localhost
    17/08/30 21:25:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54502.
    17/08/30 21:25:56 INFO NettyBlockTransferService: Server created on 54502
    17/08/30 21:25:56 INFO BlockManagerMaster: Trying to register BlockManager
    17/08/30 21:25:56 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54502 with 1131.0 MB RAM, BlockManagerId(driver, localhost, 54502)
    17/08/30 21:25:56 INFO BlockManagerMaster: Registered BlockManager
    17/08/30 21:25:58 INFO SparkContext: Starting job: reduce at JavaSparkPi.java:59
    17/08/30 21:25:58 INFO DAGScheduler: Got job 0 (reduce at JavaSparkPi.java:59) with 2 output partitions
    17/08/30 21:25:58 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at JavaSparkPi.java:59)
    17/08/30 21:25:58 INFO DAGScheduler: Parents of final stage: List()
    17/08/30 21:25:58 INFO DAGScheduler: Missing parents: List()
    17/08/30 21:25:58 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at JavaSparkPi.java:52), which has no missing parents
    17/08/30 21:25:59 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.3 KB, free 2.3 KB)
    17/08/30 21:25:59 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1417.0 B, free 3.7 KB)
    17/08/30 21:25:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54502 (size: 1417.0 B, free: 1131.0 MB)
    17/08/30 21:25:59 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    17/08/30 21:26:00 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at JavaSparkPi.java:52)
    17/08/30 21:26:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
    17/08/30 21:26:00 WARN TaskSetManager: Stage 0 contains a task of very large size (978 KB). The maximum recommended task size is 100 KB.
    17/08/30 21:26:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 1002120 bytes)
    17/08/30 21:26:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    17/08/30 21:26:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1031 bytes result sent to driver
    17/08/30 21:26:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 1002120 bytes)
    17/08/30 21:26:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
    17/08/30 21:26:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1286 ms on localhost (1/2)
    17/08/30 21:26:01 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1031 bytes result sent to driver
    17/08/30 21:26:01 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 496 ms on localhost (2/2)
    17/08/30 21:26:01 INFO DAGScheduler: ResultStage 0 (reduce at JavaSparkPi.java:59) finished in 1.581 s
    17/08/30 21:26:01 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    17/08/30 21:26:01 INFO DAGScheduler: Job 0 finished: reduce at JavaSparkPi.java:59, took 2.991489 s
    Pi is roughly 3.13854
    17/08/30 21:26:01 INFO SparkUI: Stopped Spark web UI at http://169.254.28.160:4040
    17/08/30 21:26:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/08/30 21:26:01 INFO MemoryStore: MemoryStore cleared
    17/08/30 21:26:01 INFO BlockManager: BlockManager stopped
    17/08/30 21:26:01 INFO BlockManagerMaster: BlockManagerMaster stopped
    17/08/30 21:26:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/08/30 21:26:01 INFO SparkContext: Successfully stopped SparkContext
    17/08/30 21:26:01 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    17/08/30 21:26:01 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    17/08/30 21:26:01 INFO ShutdownHookManager: Shutdown hook called
    17/08/30 21:26:01 INFO ShutdownHookManager: Deleting directory C:UsersAdministratorAppDataLocalTempspark-e23e8bc3-e6c1-4462-ad2e-7ab0c8ddf341

    spark-2.2.0-bin-hadoop2.6里Basic包下的JavaSparkPi.java

     

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    //package org.apache.spark.examples;
    package zhouls.bigdata.Basic;
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * Computes an approximation to pi
     * Usage: JavaSparkPi [partitions]
     */
    public final class JavaSparkPi {
        
        /*
         * 主函数:进行圆周率的计算  
         * 自己写的博客:http://www.cnblogs.com/zlslch/p/7455363.html
         */
      public static void main(String[] args) throws Exception {
          
        /*
         * 下面代码片段是如何创建SparkSession
         */
        SparkSession spark = SparkSession
          .builder()
          .master("local")
          .appName("JavaSparkPi")
          .getOrCreate();
      
                  
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
    
        int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;//分片数
        int n = 100000 * slices;//(为避免溢出,n不超过int的最大值  )
        List<Integer> l = new ArrayList<>(n);//List<Integer>类型 
        for (int i = 0; i < n; i++) {
          l.add(i);
        }
    
        JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
        int count = dataSet.map(integer -> {//计数  
          double x = Math.random() * 2 - 1;//小于1的随机数  
          double y = Math.random() * 2 - 1;//小于1的随机数  
          return (x * x + y * y <= 1) ? 1 : 0;//点到圆心的的值,小于1计数一次,超出1就不计算  
        }).reduce((integer, integer2) -> integer + integer2);//汇总累加落入的圆中的次数
    
        System.out.println("Pi is roughly " + 4.0 * count / n);
    
        spark.stop();
      }
    }

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    17/08/30 21:29:46 INFO SparkContext: Running Spark version 2.2.0
    17/08/30 21:29:47 INFO SparkContext: Submitted application: JavaSparkPi
    17/08/30 21:29:47 INFO SecurityManager: Changing view acls to: Administrator
    17/08/30 21:29:47 INFO SecurityManager: Changing modify acls to: Administrator
    17/08/30 21:29:47 INFO SecurityManager: Changing view acls groups to: 
    17/08/30 21:29:47 INFO SecurityManager: Changing modify acls groups to: 
    17/08/30 21:29:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Administrator); groups with view permissions: Set(); users  with modify permissions: Set(Administrator); groups with modify permissions: Set()
    17/08/30 21:29:52 INFO Utils: Successfully started service 'sparkDriver' on port 54576.
    17/08/30 21:29:52 INFO SparkEnv: Registering MapOutputTracker
    17/08/30 21:29:53 INFO SparkEnv: Registering BlockManagerMaster
    17/08/30 21:29:53 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    17/08/30 21:29:53 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    17/08/30 21:29:53 INFO DiskBlockManager: Created local directory at C:UsersAdministratorAppDataLocalTemplockmgr-0685a26c-9751-4821-9fc4-4273633fb703
    17/08/30 21:29:53 INFO MemoryStore: MemoryStore started with capacity 904.8 MB
    17/08/30 21:29:53 INFO SparkEnv: Registering OutputCommitCoordinator
    17/08/30 21:29:54 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    17/08/30 21:29:54 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://169.254.28.160:4040
    17/08/30 21:29:54 INFO Executor: Starting executor ID driver on host localhost
    17/08/30 21:29:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54586.
    17/08/30 21:29:54 INFO NettyBlockTransferService: Server created on 169.254.28.160:54586
    17/08/30 21:29:54 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    17/08/30 21:29:54 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 169.254.28.160, 54586, None)
    17/08/30 21:29:54 INFO BlockManagerMasterEndpoint: Registering block manager 169.254.28.160:54586 with 904.8 MB RAM, BlockManagerId(driver, 169.254.28.160, 54586, None)
    17/08/30 21:29:54 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 169.254.28.160, 54586, None)
    17/08/30 21:29:54 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 169.254.28.160, 54586, None)
    17/08/30 21:29:55 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/D:/Code/EclipsePaperCode/Spark220BinHadoop26ShouDongJava/spark-warehouse/').
    17/08/30 21:29:55 INFO SharedState: Warehouse path is 'file:/D:/Code/EclipsePaperCode/Spark220BinHadoop26ShouDongJava/spark-warehouse/'.
    17/08/30 21:29:57 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
    17/08/30 21:29:58 INFO SparkContext: Starting job: reduce at JavaSparkPi.java:65
    17/08/30 21:29:58 INFO DAGScheduler: Got job 0 (reduce at JavaSparkPi.java:65) with 2 output partitions
    17/08/30 21:29:58 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at JavaSparkPi.java:65)
    17/08/30 21:29:58 INFO DAGScheduler: Parents of final stage: List()
    17/08/30 21:29:58 INFO DAGScheduler: Missing parents: List()
    17/08/30 21:29:58 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at JavaSparkPi.java:61), which has no missing parents
    17/08/30 21:29:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.0 KB, free 904.8 MB)
    17/08/30 21:29:59 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1772.0 B, free 904.8 MB)
    17/08/30 21:29:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 169.254.28.160:54586 (size: 1772.0 B, free: 904.8 MB)
    17/08/30 21:29:59 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    17/08/30 21:29:59 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at JavaSparkPi.java:61) (first 15 tasks are for partitions Vector(0, 1))
    17/08/30 21:29:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
    17/08/30 21:29:59 WARN TaskSetManager: Stage 0 contains a task of very large size (981 KB). The maximum recommended task size is 100 KB.
    17/08/30 21:29:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 1004822 bytes)
    17/08/30 21:29:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    17/08/30 21:30:00 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 824 bytes result sent to driver
    17/08/30 21:30:00 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 1004827 bytes)
    17/08/30 21:30:00 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
    17/08/30 21:30:00 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1004 ms on localhost (executor driver) (1/2)
    17/08/30 21:30:00 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 824 bytes result sent to driver
    17/08/30 21:30:00 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 334 ms on localhost (executor driver) (2/2)
    17/08/30 21:30:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    17/08/30 21:30:00 INFO DAGScheduler: ResultStage 0 (reduce at JavaSparkPi.java:65) finished in 1.202 s
    17/08/30 21:30:00 INFO DAGScheduler: Job 0 finished: reduce at JavaSparkPi.java:65, took 1.929985 s
    Pi is roughly 3.14214
    17/08/30 21:30:00 INFO SparkUI: Stopped Spark web UI at http://169.254.28.160:4040
    17/08/30 21:30:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/08/30 21:30:00 INFO MemoryStore: MemoryStore cleared
    17/08/30 21:30:00 INFO BlockManager: BlockManager stopped
    17/08/30 21:30:00 INFO BlockManagerMaster: BlockManagerMaster stopped
    17/08/30 21:30:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/08/30 21:30:00 INFO SparkContext: Successfully stopped SparkContext
    17/08/30 21:30:00 INFO ShutdownHookManager: Shutdown hook called
    17/08/30 21:30:00 INFO ShutdownHookManager: Deleting directory C:UsersAdministratorAppDataLocalTempspark-c6806fd3-2a53-4f00-b285-48751292ff44
  • 相关阅读:
    Android游戏开发22:Android动画的实现J2me游戏类库用于Android开发
    android sqlite SQLiteDatabase 操作大全 不看后悔!必收藏!看后精通SQLITE (第三部分,完整代码)
    使用OGR创建dxf格式矢量数据
    mysql 数据库引擎 MyISAM InnoDB 大比拼 区别
    android sqlite SQLiteDatabase 操作大全 不看后悔!必收藏!看后精通SQLITE (第二部分)
    mysql 更改数据库引擎
    android sqlite SQLiteDatabase 操作大全 不看后悔!必收藏!看后精通SQLITE (第一部分)
    android 数字键盘使用
    MySQL Innodb数据库性能实践
    eclipse : Error while performing database login with the driver null
  • 原文地址:https://www.cnblogs.com/zlslch/p/7455363.html
Copyright © 2011-2022 走看看