zoukankan      html  css  js  c++  java
  • Operator_repartitionAndSortWithinPartition

    package com.bjsxt.spark.transformations;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    
    import scala.Tuple2;
    class MySort implements Serializable,Comparator<Integer>{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        @Override
        public int compare(Integer o1, Integer o2) {
            return o2-o1;
        }
        
    }
    
    public class Operator_repartitionAndSortWithinPartition {
        
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("RepartitionAndSortWithinPartitionsOperator").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            
            
            List<Tuple2<Integer,Integer>> list = Arrays.asList(
                    new Tuple2<Integer,Integer>(2, 3),
                    new Tuple2<Integer,Integer>(1, 2),
                    new Tuple2<Integer,Integer>(6, 7),
                    new Tuple2<Integer,Integer>(3, 4),
                    new Tuple2<Integer,Integer>(5, 6),
                    new Tuple2<Integer,Integer>(4, 5)
                    );
            
            
            JavaPairRDD<Integer,Integer> rdd = sc.parallelizePairs(list,1);
             
            JavaPairRDD<Integer, Integer> rdd1 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                /**
                 * 设置分区数据
                 * 0 1 2
                 * @return
                 */
                @Override
                public int numPartitions() {
                    return 3;
                }
                
                @Override
                public int getPartition(Object key) {
                    return Integer.valueOf(key+"") % numPartitions();
                }
            },new MySort());
            
            System.out.println("rdd1.partitions().size():" + rdd1.partitions().size());
            
            
            rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<Tuple2<Integer,Integer>>>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {
                    while(v2.hasNext()){
                        System.out.println("partitionId:" + v1 + " value:" + v2.next());
                    }
                    return v2;
                }
            }, true).count();
            
        }
    }
  • 相关阅读:
    Python 学习日记 第七天
    Python 学习日记 第六天
    Python 学习日记 第五天
    Python 学习日记 第四天
    Redis 中的数据类型及基本操作
    Asp.net mvc 中View 的呈现(二)
    Asp.net mvc 中View的呈现(一)
    Asp.net mvc 中Action 方法的执行(三)
    Asp.net mvc 中Action 方法的执行(二)
    Asp.net mvc 中Action 方法的执行(一)
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194809.html
Copyright © 2011-2022 走看看