zoukankan      html  css  js  c++  java
  • 自定义实现spark的分区函数

    有时自己的业务需要自己实现spark的分区函数

    以下代码是实现一个自定义spark分区的demo

    实现的功能是根据key值的最后一位数字,写到不同的文件

    例如:

    10写入到part-00000

    11写入到part-00001

    .

    .

    .

    19写入到part-00009

    自定义分区:

    import org.apache.spark.{Partitioner, SparkContext, SparkConf}
    
    //自定义分区类,需继承Partitioner类
    class UsridPartitioner(numParts:Int) extends Partitioner{
      //覆盖分区数
      override def numPartitions: Int = numParts
      
      //覆盖分区号获取函数
      override def getPartition(key: Any): Int = {
        key.toString.toInt%10
      }
    }
    
    object Test {
      def main(args: Array[String]) {
        val conf=new SparkConf()
        val sc=new SparkContext(conf)
    
        //模拟5个分区的数据
        val data=sc.parallelize(1 to 10,5)
        
        //根据尾号转变为10个分区,分写到10个文件
        data.map((_,1)).partitionBy(new UsridPartitioner(10)).saveAsTextFile("/chenm/partition")
      }
    }
  • 相关阅读:
    机器学习——模型评估与选择
    论文等级
    python简介
    记忆力
    PyQt 5控件
    PyQt5对话框
    PyQt 5事件和信号
    PyQt 5菜单和工具栏
    PyQt 5布局管理
    PyQt 5的基本功能
  • 原文地址:https://www.cnblogs.com/bonelee/p/6055450.html
Copyright © 2011-2022 走看看