zoukankan      html  css  js  c++  java
  • PartitionerTest

    package com.bjsxt.spark.others.partitioner;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;

    import org.apache.spark.Partitioner;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;

    import scala.Tuple2;
    /**
    * 自定义分区器
    * @author root
    *
    */
    public class PartitionerTest {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf();
    conf.setMaster("local").setAppName("partitioner");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaPairRDD<Integer, String> nameRDD = sc.parallelizePairs(Arrays.asList(
    new Tuple2<Integer, String>(1,"zhangsan"),
    new Tuple2<Integer, String>(2,"lisi"),
    new Tuple2<Integer, String>(3,"wangwu"),
    new Tuple2<Integer, String>(4,"zhaoliu"),
    new Tuple2<Integer, String>(5,"shunqi"),
    new Tuple2<Integer, String>(6,"zhouba")
    ), 2);

    nameRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,String>>, Iterator<String>>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, String>> iter) throws Exception {
    List<String> list = new ArrayList<String>();
    while(iter.hasNext()){
    System.out.println("nameRDD partitionID = "+index+" , value = "+iter.next());
    }
    return list.iterator();
    }
    }, true).collect();
    System.out.println("******************************");

    JavaPairRDD<Integer, String> partitionRDD = nameRDD.partitionBy(new Partitioner() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    /**
    * 返回你想要创建分区的个数
    */
    public int numPartitions() {
    return 2;
    }

    @Override
    /**
    * 对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
    */
    public int getPartition(Object key) {
    int i = (int)key;
    if(i%2==0){
    return 0;
    }
    return 1;
    }
    });
    partitionRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,String>>, Iterator<String>>() {

    /**
    *
    */
    private static final long serialVersionUID = 1L;

    @Override
    public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, String>> iter) throws Exception {
    List<String> list = new ArrayList<String>();
    while(iter.hasNext()){
    System.out.println("partitionRDD partitionID = "+index+" , value = "+iter.next());
    }
    return list.iterator();
    }
    }, true).collect();

    sc.stop();
    }
    }

  • 相关阅读:
    JAVA 使用 POI进行读取Excel表格示例
    问题解决:Maven execution terminated abnormally (exit code 1)
    oracle总结: INTERVAL DAY TO SECOND, 但却获得 NUMBER
    SpringMvc返回JSON出现"$.result.currentLevel"
    Spring+SpringMVC+mybatis maven pom文件
    redis在Linux上的安装
    Jsoup访问https网址异常SSLHandshakeException(已解决)
    不同版本2.5的Servlet web.xml 头信息
    Spring管理事物两种方式
    Error:too many padding sections on bottom border.
  • 原文地址:https://www.cnblogs.com/huiandong/p/9194630.html
Copyright © 2011-2022 走看看