zoukankan      html  css  js  c++  java
  • 自定义实现spark的分区函数

    有时自己的业务需要自己实现spark的分区函数

    以下代码是实现一个自定义spark分区的demo

    实现的功能是根据key值的最后一位数字,写到不同的文件

    例如:

    10写入到part-00000

    11写入到part-00001

    .

    .

    .

    19写入到part-00009

    自定义分区:

    import org.apache.spark.{Partitioner, SparkContext, SparkConf}
    
    //自定义分区类,需继承Partitioner类
    class UsridPartitioner(numParts:Int) extends Partitioner{
      //覆盖分区数
      override def numPartitions: Int = numParts
      
      //覆盖分区号获取函数
      override def getPartition(key: Any): Int = {
        key.toString.toInt%10
      }
    }
    
    object Test {
      def main(args: Array[String]) {
        val conf=new SparkConf()
        val sc=new SparkContext(conf)
    
        //模拟5个分区的数据
        val data=sc.parallelize(1 to 10,5)
        
        //根据尾号转变为10个分区,分写到10个文件
        data.map((_,1)).partitionBy(new UsridPartitioner(10)).saveAsTextFile("/chenm/partition")
      }
    }
  • 相关阅读:
    HDU 1016 Prime Ring Problem
    POJ 1724 ROADS(bfs最短路)
    HDU 1033 Edge
    IE 兼容模式
    HDU 1263 水果
    数据结构之图详解
    继续过中等难度.0309
    排序的稳定性
    Java+7入门经典
    哈希链表及其变种
  • 原文地址:https://www.cnblogs.com/bonelee/p/6055450.html
Copyright © 2011-2022 走看看