zoukankan      html  css  js  c++  java
  • Eclipse提交代码到Spark集群上运行

    Spark集群master节点:      192.168.168.200

    Eclipse运行windows主机: 192.168.168.100

    场景:

            为了测试在Eclipse上开发的代码在Spark集群上运行的情况,比如:内存、cores、stdout以及相应的变量传递是否正常!

    生产环境是把在Eclipse上开发的代码打包放到Spark集群上,然后使用spark-submit提交运行。当然我们也可以启动远程调试,

    但是这样就会造成每次测试代码,我们都需要把jar包复制到Spark集群机器上,十分的不方便。因此,我们希望能够在Eclipse直接

    模拟spark-submit提交程序运行,便于调试!

    一、准备words.txt文件

    words.txt :

     

    1. HelloHadoop
    2. HelloBigData
    3. HelloSpark
    4. HelloFlume
    5. HelloKafka

     

    上传到HDFS文件系统中,如图:

    二、创建Spark测试类

    1. package com.spark.test;
    2.  
    3. import java.util.Arrays;
    4. import java.util.Iterator;
    5.  
    6. import org.apache.spark.SparkConf;
    7. import org.apache.spark.api.java.JavaPairRDD;
    8. import org.apache.spark.api.java.JavaRDD;
    9. import org.apache.spark.api.java.JavaSparkContext;
    10. import org.apache.spark.api.java.function.FlatMapFunction;
    11. import org.apache.spark.api.java.function.Function2;
    12. import org.apache.spark.api.java.function.PairFunction;
    13. import org.apache.spark.api.java.function.VoidFunction;
    14.  
    15. import scala.Tuple2;
    16.  
    17. publicclassJavaWordCount{
    18. publicstaticvoid main(String[] args){
    19. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("local[2]");
    20. JavaSparkContext jsc =newJavaSparkContext(sparkConf);
    21. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/test/words.txt");
    22. JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String,String>(){
    23. publicIterator<String> call(String line){
    24. returnArrays.asList(line.split(" ")).iterator();
    25. }
    26. });
    27. JavaPairRDD<String,Integer> pairs = words.mapToPair(newPairFunction<String,String,Integer>(){
    28. publicTuple2<String,Integer> call(String word)throwsException{
    29. returnnewTuple2<String,Integer>(word,1);
    30. }
    31. });
    32. JavaPairRDD<String,Integer> wordCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
    33. publicInteger call(Integer v1,Integer v2)throwsException{
    34. return v1 + v2;
    35.  
    36. }
    37. });
    38. wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
    39. publicvoid call(Tuple2<String,Integer> pairs)throwsException{
    40. System.out.println(pairs._1()+":"+ pairs._2());
    41. }
    42. });
    43. jsc.close();
    44. }
    45.  
    46. }

    日志输出:


    访问spark的web ui : http://192.168.168.200:8080

    从中看出spark的master地址为: spark://master:7077

    1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("local[2]");​

    修改为:

    1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");

    运行,发现会有报org.apache.spark.SparkException的错:

    1. Exceptionin thread "main" org.apache.spark.SparkException:Job aborted due to stage failure:Task1in stage 0.0 failed 4 times, most recent failure:Lost task 1.3in stage 0.0(TID 6,192.168.168.200): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seqin instance of org.apache.spark.rdd.MapPartitionsRDD
    2. at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
    3. at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
    4. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
    5. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    6. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    7. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    8. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    9. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    10. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    11. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    12. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    13. at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    14. at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    15. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
    16. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    17. at org.apache.spark.scheduler.Task.run(Task.scala:86)
    18. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    19. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    20. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    21. at java.lang.Thread.run(Thread.java:745)
    22.  
    23. Driver stacktrace:
    24. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
    25. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
    26. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
    27. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    28. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    29. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
    30. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    31. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    32. at scala.Option.foreach(Option.scala:257)
    33. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    34. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
    35. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
    36. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
    37. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    38. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    39. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
    40. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
    41. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
    42. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
    43. at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:894)
    44. at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:892)
    45. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    46. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    47. at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    48. at org.apache.spark.rdd.RDD.foreach(RDD.scala:892)
    49. at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:350)
    50. at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
    51. at com.spark.test.JavaWordCount.main(JavaWordCount.java:39)
    52. Causedby: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seqin instance of org.apache.spark.rdd.MapPartitionsRDD
    53. at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
    54. at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
    55. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
    56. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    57. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    58. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    59. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    60. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    61. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    62. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    63. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    64. at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    65. at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    66. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
    67. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    68. at org.apache.spark.scheduler.Task.run(Task.scala:86)
    69. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    70. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    71. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    72. at java.lang.Thread.run(Thread.java:745)

    在网上找到的解决办法是配置jar包的路径即可,先用maven install把程序打包成jar包,然后setJars方法。

    1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");
    2. String[] jars ={"I:\TestSpark\target\TestSpark-0.0.1-jar-with-dependencies.jar"};
    3. sparkConf.setJars(jars);

    最终源码如下:

    1. package com.spark.test;
    2.  
    3. import java.util.Arrays;
    4. import java.util.Iterator;
    5.  
    6. import org.apache.spark.SparkConf;
    7. import org.apache.spark.api.java.JavaPairRDD;
    8. import org.apache.spark.api.java.JavaRDD;
    9. import org.apache.spark.api.java.JavaSparkContext;
    10. import org.apache.spark.api.java.function.FlatMapFunction;
    11. import org.apache.spark.api.java.function.Function2;
    12. import org.apache.spark.api.java.function.PairFunction;
    13. import org.apache.spark.api.java.function.VoidFunction;
    14.  
    15. import scala.Tuple2;
    16.  
    17. publicclassJavaWordCount{
    18. publicstaticvoid main(String[] args){
    19. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");
    20. String[] jars ={"I:\TestSpark\target\TestSpark-0.0.1-jar-with-dependencies.jar"};
    21. sparkConf.setJars(jars);
    22. JavaSparkContext jsc =newJavaSparkContext(sparkConf);
    23. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/test/words.txt");
    24. JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String,String>(){
    25. publicIterator<String> call(String line){
    26. returnArrays.asList(line.split(" ")).iterator();
    27. }
    28. });
    29. JavaPairRDD<String,Integer> pairs = words.mapToPair(newPairFunction<String,String,Integer>(){
    30. publicTuple2<String,Integer> call(String word)throwsException{
    31. returnnewTuple2<String,Integer>(word,1);
    32. }
    33. });
    34. JavaPairRDD<String,Integer> wordCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
    35. publicInteger call(Integer v1,Integer v2)throwsException{
    36. return v1 + v2;
    37.  
    38. }
    39. });
    40. wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
    41. publicvoid call(Tuple2<String,Integer> pairs)throwsException{
    42. System.out.println(pairs._1()+":"+ pairs._2());
    43. }
    44. });
    45. jsc.close();
    46. }
    47.  
    48. }

    运行正常,没有出现报错!

    查看stdout是否统计正确:

    至此,你可以很方便的在Eclipse上开发调试你的代码啦!

  • 相关阅读:
    css3 边框、背景、文本效果
    Java JDBC连接MYSQL数据库教程
    waf平台常用方法总结
    java比较两个日期大小
    js控制的弹出层
    js时间大小判断写法demo
    PL/SQL Developer技巧
    杀Oracle死锁进程方法
    查看oracle数据库的连接数以及用户
    Oracle分散问题记录
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723807.html
Copyright © 2011-2022 走看看