zoukankan      html  css  js  c++  java
  • Operator_repartitionAndSortWithinPartition

    package com.bjsxt.spark.transformations;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    
    import scala.Tuple2;
    class MySort implements Serializable,Comparator<Integer>{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        @Override
        public int compare(Integer o1, Integer o2) {
            return o2-o1;
        }
        
    }
    
    public class Operator_repartitionAndSortWithinPartition {
        
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("RepartitionAndSortWithinPartitionsOperator").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            
            
            List<Tuple2<Integer,Integer>> list = Arrays.asList(
                    new Tuple2<Integer,Integer>(2, 3),
                    new Tuple2<Integer,Integer>(1, 2),
                    new Tuple2<Integer,Integer>(6, 7),
                    new Tuple2<Integer,Integer>(3, 4),
                    new Tuple2<Integer,Integer>(5, 6),
                    new Tuple2<Integer,Integer>(4, 5)
                    );
            
            
            JavaPairRDD<Integer,Integer> rdd = sc.parallelizePairs(list,1);
             
            JavaPairRDD<Integer, Integer> rdd1 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                /**
                 * 设置分区数据
                 * 0 1 2
                 * @return
                 */
                @Override
                public int numPartitions() {
                    return 3;
                }
                
                @Override
                public int getPartition(Object key) {
                    return Integer.valueOf(key+"") % numPartitions();
                }
            },new MySort());
            
            System.out.println("rdd1.partitions().size():" + rdd1.partitions().size());
            
            
            rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<Tuple2<Integer,Integer>>>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {
                    while(v2.hasNext()){
                        System.out.println("partitionId:" + v1 + " value:" + v2.next());
                    }
                    return v2;
                }
            }, true).count();
            
        }
    }
  • 相关阅读:
    kernel structure
    linux cmd fuser/screen
    arm 指令架构
    udev 学习
    grup 2 ubuntu
    tiny6410 3.8.1 内核kgdb调试
    make 选项
    lfs 无知
    数据归一化的两种方法:最值归一化和0均值归一化
    使用随机队列实现生成迷宫
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194809.html
Copyright © 2011-2022 走看看