zoukankan      html  css  js  c++  java
  • spark1.x和2.xIterable和iterator兼容问题

    1. spark 1.x 升级到spark 2.x
    对于普通的spark来说,变动不大 :
    1
    举一个最简单的实例:
    
    spark1.x
    public static JavaRDD<String> workJob(JavaRDD<String> spark1Rdd) {
    
            JavaPairRDD<String, Integer> testRdd = spark1Rdd
                    .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
    
                @Override
                public Iterable<Tuple2<String, Integer>> call(String str)
                        throws Exception {
                    ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
                    return list;
    
    
                }
            });
    
            return spark1Rdd;
        }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    spark2.x
    public static JavaRDD<String> workJob(JavaRDD<String> spark2Rdd) {
    
            JavaPairRDD<String, Integer> testRdd2 = spark2Rdd
                    .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
    
                @Override
                public Iterator<Tuple2<String, Integer>> call(String str)
                        throws Exception {
                    ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
    
    
                    return list.iterator();
                }
            });
    
            return spark2Rdd;
        }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    需要说明的是: 
    上面的返回的rdd就直接用输入的 RDD显然是不合理的! 只是为了用最简洁的方式介绍代码的转换而已!
    
    可以看到 : 区别主要在于
    1. spark 1.x中的Iterable对象 变成了 spark2.x中的Iterator对象
    2. 相应的,对于返回值为list的RDD,  spark2.x中要返回list.iterator();
    1
    2
    3
    还是很简单的吧
    
    问题在于 : 如果你有几个spark程序要运行在不同的环境下,(有的现场用1.x,有的现场用2.x) 
    你需要同时维护两种不同版本的spark,是不是耗时又耗力呢?
    
    这个时候就需要考虑到 spark版本的兼容性,使你的程序能成功的运行在各种集群环境下
    
    2. spark版本的兼容
    写一个简单的工具类如下 :
    
    import java.util.Iterator;
    
    public class MyIterator<T> implements Iterator, Iterable 
    {
        private Iterator myIterable;
    
        public MyIterator(Iterable iterable)
        {
            myIterable = iterable.iterator();
        }
    
        @Override
        public boolean hasNext() 
        {
            return myIterable.hasNext();
        }
    
        @Override
        public Object next() 
        {
            return myIterable.next();
        }
    
        @Override
        public void remove() 
        {
            myIterable.remove();
        }
    
        @Override
        public Iterator iterator() 
        {
            return myIterable;
        }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    只需要进行如上设计就可以实现版本的兼容了 
    那么应该如何应用呢?
    
     JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public MyIterator<String> call(String s) throws Exception {
                    String[] split = s.split("\s+");
                    MyIterator myIterator = new MyIterator(Arrays.asList(split));
                    return myIterator;
                }
    });
    

      

  • 相关阅读:
    第五次作业
    第四次作业
    第三次作业
    第二次作业
    第一次作业
    实验三 算术编码压缩方法
    实验二 统计压缩方法的具体实现
    实验一 建立统计压缩方法理论模型
    第五次作业
    第四次作业
  • 原文地址:https://www.cnblogs.com/huiandong/p/9982516.html
Copyright © 2011-2022 走看看