zoukankan      html  css  js  c++  java
  • Operator_repartitionAndSortWithinPartition

    package com.bjsxt.spark.transformations;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.Comparator;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    
    import scala.Tuple2;
    class MySort implements Serializable,Comparator<Integer>{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        @Override
        public int compare(Integer o1, Integer o2) {
            return o2-o1;
        }
        
    }
    
    public class Operator_repartitionAndSortWithinPartition {
        
        public static void main(String[] args) {
            SparkConf sparkConf = new SparkConf().setAppName("RepartitionAndSortWithinPartitionsOperator").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            
            
            List<Tuple2<Integer,Integer>> list = Arrays.asList(
                    new Tuple2<Integer,Integer>(2, 3),
                    new Tuple2<Integer,Integer>(1, 2),
                    new Tuple2<Integer,Integer>(6, 7),
                    new Tuple2<Integer,Integer>(3, 4),
                    new Tuple2<Integer,Integer>(5, 6),
                    new Tuple2<Integer,Integer>(4, 5)
                    );
            
            
            JavaPairRDD<Integer,Integer> rdd = sc.parallelizePairs(list,1);
             
            JavaPairRDD<Integer, Integer> rdd1 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {
                
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                /**
                 * 设置分区数据
                 * 0 1 2
                 * @return
                 */
                @Override
                public int numPartitions() {
                    return 3;
                }
                
                @Override
                public int getPartition(Object key) {
                    return Integer.valueOf(key+"") % numPartitions();
                }
            },new MySort());
            
            System.out.println("rdd1.partitions().size():" + rdd1.partitions().size());
            
            
            rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<Tuple2<Integer,Integer>>>() {
    
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
    
                @Override
                public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {
                    while(v2.hasNext()){
                        System.out.println("partitionId:" + v1 + " value:" + v2.next());
                    }
                    return v2;
                }
            }, true).count();
            
        }
    }
  • 相关阅读:
    无重复字符的最长子串
    最长公共前缀
    项目开发的 工程化
    包管理 import debug 模块管理 module
    Third Party Browser Drivers NOT DEVELOPED by seleniumhq
    任何不看源码的代码引入都是存在定时爆炸的可能
    博客数计数
    lineage 世系 血缘 容错机制 DAG
    查源码分析 游标 写 需要 cursors 一切不看源码的代码引入都是定时炸弹的启动
    8核 16g 及时释放内存空间
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194809.html
Copyright © 2011-2022 走看看