zoukankan      html  css  js  c++  java
  • spark1.x和spark2.x兼容Iterable和Iterator问题【未解决】

     转载于:https://www.cnblogs.com/huiandong/p/9982516.html

    本文是转载,但是经过验证,该文中的方法在编译的时候没问题,运行就报版本问题。尚未找到解决方法,仍需手动维护两个版本spark程序。

    1. spark 1.x 升级到spark 2.x

    对于普通的spark来说,变动不大 :
    举一个最简单的实例:
    spark1.x
     1 public static JavaRDD<String> workJob(JavaRDD<String> spark1Rdd) {
     2  
     3         JavaPairRDD<String, Integer> testRdd = spark1Rdd
     4                 .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
     5  
     6             @Override
     7             public Iterable<Tuple2<String, Integer>> call(String str)
     8                     throws Exception {
     9                 ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
    10                 return list;
    11  
    12  
    13             }
    14         });
    15  
    16         return spark1Rdd;
    17     }
    spark2.x
     1 public static JavaRDD<String> workJob(JavaRDD<String> spark2Rdd) {
     2  
     3         JavaPairRDD<String, Integer> testRdd2 = spark2Rdd
     4                 .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
     5  
     6             @Override
     7             public Iterator<Tuple2<String, Integer>> call(String str)
     8                     throws Exception {
     9                 ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
    10  
    11  
    12                 return list.iterator();
    13             }
    14         });
    15  
    16         return spark2Rdd;
    17     }
    需要说明的是:
    上面的返回的rdd就直接用输入的 RDD显然是不合理的! 只是为了用最简洁的方式介绍代码的转换而已!
     
    可以看到 : 区别主要在于
    1. spark 1.x中的Iterable对象 变成了 spark2.x中的Iterator对象
    2. 相应的,对于返回值为list的RDD,  spark2.x中要返回list.iterator();
     
    还是很简单的吧
     
    问题在于 : 如果你有几个spark程序要运行在不同的环境下,(有的现场用1.x,有的现场用2.x)
    你需要同时维护两种不同版本的spark,是不是耗时又耗力呢?
     
    这个时候就需要考虑到 spark版本的兼容性,使你的程序能成功的运行在各种集群环境下
     
    2. spark版本的兼容
    写一个简单的工具类如下 :
     1 import java.util.Iterator;
     2  
     3 public class MyIterator<T> implements Iterator, Iterable
     4 {
     5     private Iterator myIterable;
     6  
     7     public MyIterator(Iterable iterable)
     8     {
     9         myIterable = iterable.iterator();
    10     }
    11  
    12     @Override
    13     public boolean hasNext()
    14     {
    15         return myIterable.hasNext();
    16     }
    17  
    18     @Override
    19     public Object next()
    20     {
    21         return myIterable.next();
    22     }
    23  
    24     @Override
    25     public void remove()
    26     {
    27         myIterable.remove();
    28     }
    29  
    30     @Override
    31     public Iterator iterator()
    32     {
    33         return myIterable;
    34     }
    只需要进行如上设计就可以实现版本的兼容了
    那么应该如何应用呢?
    1  JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
    2             @Override
    3             public MyIterator<String> call(String s) throws Exception {
    4                 String[] split = s.split("\s+");
    5                 MyIterator myIterator = new MyIterator(Arrays.asList(split));
    6                 return myIterator;
    7             }
    8 });
  • 相关阅读:
    架构与模式11
    Unix/Linux命令
    Winform的Excel表格
    B/S结构一机多屏实现
    Effective C++函数参数传递方式
    Django & Tornado
    ThoughtWorks读书路线图
    善用泛型 委托
    SQL 关于with cube ,with rollup 和 grouping
    测试工程师实习笔试题
  • 原文地址:https://www.cnblogs.com/carsonwuu/p/11549373.html
Copyright © 2011-2022 走看看