zoukankan      html  css  js  c++  java
  • 使用Java编写并运行Spark应用程序

    我们首先提出这样一个简单的需求:
    现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:

    1 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
    2 121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
    3 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
    4 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
    5 121.205.241.229 - - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
    6 121.205.241.229 - - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"

    Java实现Spark应用程序(Application)

    我们实现的统计分析程序,有如下几个功能点:

    • 从HDFS读取日志数据文件
    • 将每行的第一个字段(IP地址)抽取出来
    • 统计每个IP地址出现的次数
    • 根据每个IP地址出现的次数进行一个降序排序
    • 根据IP地址,调用GeoIP库获取IP所属国家
    • 打印输出结果,每行的格式:[国家代码] IP地址 频率

    下面,看我们使用Java实现的统计分析应用程序代码,如下所示:

    001 package org.shirdrn.spark.job;
    002  
    003 import java.io.File;
    004 import java.io.IOException;
    005 import java.util.Arrays;
    006 import java.util.Collections;
    007 import java.util.Comparator;
    008 import java.util.List;
    009 import java.util.regex.Pattern;
    010  
    011 import org.apache.commons.logging.Log;
    012 import org.apache.commons.logging.LogFactory;
    013 import org.apache.spark.api.java.JavaPairRDD;
    014 import org.apache.spark.api.java.JavaRDD;
    015 import org.apache.spark.api.java.JavaSparkContext;
    016 import org.apache.spark.api.java.function.FlatMapFunction;
    017 import org.apache.spark.api.java.function.Function2;
    018 import org.apache.spark.api.java.function.PairFunction;
    019 import org.shirdrn.spark.job.maxmind.Country;
    020 import org.shirdrn.spark.job.maxmind.LookupService;
    021  
    022 import scala.Serializable;
    023 import scala.Tuple2;
    024  
    025 public class IPAddressStats implements Serializable {
    026  
    027      private static final long serialVersionUID = 8533489548835413763L;
    028      private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
    029      private static final Pattern SPACE = Pattern.compile(" ");
    030      private transient LookupService lookupService;
    031      private transient final String geoIPFile;
    032      
    033      public IPAddressStats(String geoIPFile) {
    034           this.geoIPFile = geoIPFile;
    035           try {
    036                // lookupService: get country code from a IP address
    037                File file = new File(this.geoIPFile);
    038                LOG.info("GeoIP file: " + file.getAbsolutePath());
    039                lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
    040           catch (IOException e) {
    041                throw new RuntimeException(e);
    042           }
    043      }
    044      
    045      @SuppressWarnings("serial")
    046      public void stat(String[] args) {
    047           JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
    048                     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
    049           JavaRDD<String> lines = ctx.textFile(args[1], 1);
    050  
    051           // splits and extracts ip address filed
    052           JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    053                @Override
    054                public Iterable<String> call(String s) {
    055                     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
    056                     // ip address
    057                     return Arrays.asList(SPACE.split(s)[0]);
    058                }
    059           });
    060  
    061           // map
    062           JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
    063                @Override
    064                public Tuple2<String, Integer> call(String s) {
    065                     return new Tuple2<String, Integer>(s, 1);
    066                }
    067           });
    068  
    069           // reduce
    070           JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    071                @Override
    072                public Integer call(Integer i1, Integer i2) {
    073                     return i1 + i2;
    074                }
    075           });
    076           
    077           List<Tuple2<String, Integer>> output = counts.collect();
    078           
    079           // sort statistics result by value
    080           Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {
    081                @Override
    082                public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
    083                     if(t1._2 < t2._2) {
    084                          return 1;
    085                     else if(t1._2 > t2._2) {
    086                          return -1;
    087                     }
    088                     return 0;
    089                }
    090           });
    091           
    092           writeTo(args, output);
    093           
    094      }
    095      
    096      private void writeTo(String[] args, List<Tuple2<String, Integer>> output) {
    097           for (Tuple2<?, ?> tuple : output) {
    098                Country country = lookupService.getCountry((String) tuple._1);
    099                LOG.info("[" + country.getCode() + "] " + tuple._1 + " " + tuple._2);
    100           }
    101      }
    102      
    103      public static void main(String[] args) {
    104           // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
    105           if (args.length < 3) {
    106                System.err.println("Usage: IPAddressStats <master> <inFile> <GeoIPFile>");
    107                System.err.println("    Example: org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
    108                System.exit(1);
    109           }
    110           
    111           String geoIPFile = args[2];
    112           IPAddressStats stats = new IPAddressStats(geoIPFile);
    113           stats.stat(args);
    114           
    115           System.exit(0);
    116  
    117      }
    118  
    119 }

    具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:

    01 <dependencies>
    02           <dependency>
    03                <groupId>org.apache.spark</groupId>
    04                <artifactId>spark-core_2.10</artifactId>
    05                <version>0.9.0-incubating</version>
    06           </dependency>
    07           <dependency>
    08                <groupId>log4j</groupId>
    09                <artifactId>log4j</artifactId>
    10                <version>1.2.16</version>
    11           </dependency>
    12           <dependency>
    13                <groupId>dnsjava</groupId>
    14                <artifactId>dnsjava</artifactId>
    15                <version>2.1.1</version>
    16           </dependency>
    17           <dependency>
    18                <groupId>commons-net</groupId>
    19                <artifactId>commons-net</artifactId>
    20                <version>3.1</version>
    21           </dependency>
    22           <dependency>
    23                <groupId>org.apache.hadoop</groupId>
    24                <artifactId>hadoop-client</artifactId>
    25                <version>1.2.1</version>
    26           </dependency>
    27      </dependencies>

    需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用 transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出 现类似如下的错误:

    01 14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76
    02 Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats
    03      at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    04      at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    05      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    06      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    07      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    08      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
    09      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
    10      at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741)
    11      at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740)
    12      at scala.collection.immutable.List.foreach(List.scala:318)
    13      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740)
    14      at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
    15      at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    16      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    17      at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    18      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    19      at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    20      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    21      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    22      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    23      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    24      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    在Spark集群上运行Java程序

    这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:

    01 <plugin>
    02      <artifactId>maven-assembly-plugin</artifactId>
    03      <configuration>
    04           <archive>
    05                <manifest>
    06                     <mainClass>org.shirdrn.spark.job.UserAgentStats</mainClass>
    07                </manifest>
    08           </archive>
    09           <descriptorRefs>
    10                <descriptorRef>jar-with-dependencies</descriptorRef>
    11           </descriptorRefs>
    12           <excludes>
    13                <exclude>*.properties</exclude>
    14                <exclude>*.xml</exclude>
    15           </excludes>
    16      </configuration>
    17      <executions>
    18           <execution>
    19                <id>make-assembly</id>
    20                <phase>package</phase>
    21                <goals>
    22                     <goal>single</goal>
    23                </goals>
    24           </execution>
    25      </executions>
    26 </plugin>

    将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark 的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们 实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:

    01 cygwin=false
    02 case "`uname`" in
    03     CYGWIN*) cygwin=true;;
    04 esac
    05  
    06 SCALA_VERSION=2.10
    07  
    08 # Figure out where the Scala framework is installed
    09 FWDIR="$(cd `dirname $0`/..; pwd)"
    10  
    11 # Export this as SPARK_HOME
    12 export SPARK_HOME="$FWDIR"
    13  
    14 # Load environment variables from conf/spark-env.sh, if it exists
    15 if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
    16   . $FWDIR/conf/spark-env.sh
    17 fi
    18  
    19 if [ -z "$1" ]; then
    20   echo "Usage: run-example <example-class> [<args>]" >&2
    21   exit 1
    22 fi
    23  
    24 # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
    25 # to avoid the -sources and -doc packages that are built by publish-local.
    26 EXAMPLES_DIR="$FWDIR"/java-examples
    27 SPARK_EXAMPLES_JAR=""
    28 if [ -e "$EXAMPLES_DIR"/*.jar ]; then
    29   export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar`
    30 fi
    31 if [[ -z $SPARK_EXAMPLES_JAR ]]; then
    32   echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
    33   echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
    34   exit 1
    35 fi
    36  
    37  
    38 # Since the examples JAR ideally shouldn't include spark-core (that dependency should be
    39 # "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
    40 CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
    41 CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
    42  
    43 if $cygwin; then
    44     CLASSPATH=`cygpath -wp $CLASSPATH`
    45     export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
    46 fi
    47  
    48 # Find java binary
    49 if [ -n "${JAVA_HOME}" ]; then
    50   RUNNER="${JAVA_HOME}/bin/java"
    51 else
    52   if [ `command -v java` ]; then
    53     RUNNER="java"
    54   else
    55     echo "JAVA_HOME is not set" >&2
    56     exit 1
    57   fi
    58 fi
    59  
    60 # Set JAVA_OPTS to be able to load native libraries and to set heap size
    61 JAVA_OPTS="$SPARK_JAVA_OPTS"
    62 JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
    63 # Load extra JAVA_OPTS from conf/java-opts, if it exists
    64 if [ -e "$FWDIR/conf/java-opts" ] ; then
    65   JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
    66 fi
    67 export JAVA_OPTS
    68  
    69 if "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
    70   echo -n "Spark Command: "
    71   echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
    72   echo "========================================"
    73   echo
    74 fi
    75  
    76 exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

    在Spark上运行我们开发的Java程序,执行如下命令:

    1 cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
    2 ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat

    我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:

    1. Spark集群主节点URL:例如我的是spark://m1:7077
    2. 输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
    3. GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件

    如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:

    01 14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
    02 SLF4J: Class path contains multiple SLF4J bindings.
    03 SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    04 SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    05 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    06 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    07 14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
    08 14/03/10 22:17:25 INFO Remoting: Starting remoting
    09 14/03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379]
    10 14/03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379]
    11 14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster
    12 14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb
    13 14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB.
    14 14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189)
    15 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
    16 14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM
    17 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager
    18 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
    19 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
    20 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186
    21 14/03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started athttp://10.95.3.56:49186
    22 14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker
    23 14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370
    24 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
    25 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
    26 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073
    27 14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT
    28 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
    29 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
    30 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
    31 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
    32 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
    33 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
    34 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
    35 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
    36 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
    37 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
    38 14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    39 14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040
    40 14/03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1394515046396
    41 14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to masterspark://m1:7077...
    42 14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657
    43 14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB)
    44 14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000
    45 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores
    46 14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM
    47 14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    48 14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded
    49 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING
    50 14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1
    51 14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77
    52 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70)
    53 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false)
    54 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77)
    55 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
    56 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
    57 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents
    58 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70)
    59 14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
    60 14/03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811] with ID 0
    61 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL)
    62 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms
    63 14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM
    64 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1)
    65 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
    66 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s
    67 14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
    68 14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set()
    69 14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
    70 14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set()
    71 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
    72 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
    73 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable
    74 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70)
    75 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    76 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL)
    77 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms
    78 14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534
    79 14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes
    80 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1)
    81 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
    82 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s
    83 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
    84 14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s
    85 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218     312
    86 14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77     300
    87 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16     212
    88 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254     207
    89 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134     185
    90 14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181     181
    91 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212     180
    92 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83     176
    93 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205     169
    94 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19     165
    95 ...

    我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。
    另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。

    参考链接

  • 相关阅读:
    [LeetCode] 148. Sort List 解题思路
    [LeetCode] 21. Merge Two Sorted Lists 解题思路
    [LeetCode] 160. Intersection of Two Linked Lists 解题思路
    [LeetCode] 203. Remove Linked List Elements 解题思路
    是否是最美的6年
    Apache的Order Allow,Deny 详解
    apache 2.4 访问权限配置
    apache如何设置http自动跳转到https
    linux ssh_config和sshd_config配置文件
    mysql命令查询表的个数
  • 原文地址:https://www.cnblogs.com/MarchThree/p/5024656.html
Copyright © 2011-2022 走看看