zoukankan      html  css  js  c++  java
  • SparkSQL之UDF使用

    package cn.piesat.test

    import org.apache.spark.sql.SparkSession

    import scala.collection.mutable.ArrayBuffer


    object SparkSQLTest {

    def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("sparkSql").master("local[4]")
    .config("spark.serializer","org.apache.spark.serializer.KryoSerializer").getOrCreate()
    val sc=spark.sparkContext
    val sqlContext=spark.sqlContext
    val workerRDD=sc.textFile("F://Workers.txt").mapPartitions(itor=>{
    val array=new ArrayBuffer[Worker]()
    while(itor.hasNext){
    val splited=itor.next().split(",")
    array.append(new Worker(splited(0),splited(2).toInt,splited(2)))
    }
    array.toIterator
    })
    import spark.implicits._
    //注册UDF
    spark.udf.register("strLen",(str:String,addr:String)=>str.length+addr.length)
    val workDS=workerRDD.toDS()
    workDS.createOrReplaceTempView("worker")
    val resultDF=spark.sql("select strLen(name,addr) from worker")
    val resultDS=resultDF.as("WO")
    resultDS.show()

    spark.stop()

    }

    }
  • 相关阅读:
    Difference between sendRedirect, include, forward
    Selenium Study Notes
    Java基础知识2
    JSP上的JavaBeans
    Java Socket 7部曲
    Javascript版本排序1
    Java基础知识1
    JavaBean的属性
    测试基础
    annotation本质
  • 原文地址:https://www.cnblogs.com/runnerjack/p/10661870.html
Copyright © 2011-2022 走看看