zoukankan      html  css  js  c++  java
  • Operator_repartitionAndSortWithinPartition

    package com.bjsxt.spark.transformations;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    
    import scala.Tuple2;
    class MySort implements Serializable,Comparator<Integer>{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        @Override
        public int compare(Integer o1, Integer o2) {
            return o2-o1;
        }
        
    }
    
    public class Operator_repartitionAndSortWithinPartition {
        
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("RepartitionAndSortWithinPartitionsOperator").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            
            
            List<Tuple2<Integer,Integer>> list = Arrays.asList(
                    new Tuple2<Integer,Integer>(2, 3),
                    new Tuple2<Integer,Integer>(1, 2),
                    new Tuple2<Integer,Integer>(6, 7),
                    new Tuple2<Integer,Integer>(3, 4),
                    new Tuple2<Integer,Integer>(5, 6),
                    new Tuple2<Integer,Integer>(4, 5)
                    );
            
            
            JavaPairRDD<Integer,Integer> rdd = sc.parallelizePairs(list,1);
             
            JavaPairRDD<Integer, Integer> rdd1 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                /**
                 * 设置分区数据
                 * 0 1 2
                 * @return
                 */
                @Override
                public int numPartitions() {
                    return 3;
                }
                
                @Override
                public int getPartition(Object key) {
                    return Integer.valueOf(key+"") % numPartitions();
                }
            },new MySort());
            
            System.out.println("rdd1.partitions().size():" + rdd1.partitions().size());
            
            
            rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<Tuple2<Integer,Integer>>>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {
                    while(v2.hasNext()){
                        System.out.println("partitionId:" + v1 + " value:" + v2.next());
                    }
                    return v2;
                }
            }, true).count();
            
        }
    }
  • 相关阅读:
    里氏代换原则
    依赖倒转原则
    开放-封闭原则
    如何判断对象是否死亡和类是无用的类
    Java内存区域
    Zookeeper使用场景
    zookeeper知识点总结
    前端小技术总结
    lambda表达式的使用
    Comparator进行List集合排序
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194809.html
Copyright © 2011-2022 走看看