zoukankan      html  css  js  c++  java
  • spark操作总结

    一、sparkContext与sparkSession区别

    任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,sparkContext只能在driver机器上面启动;
    SparkSession: SparkSession实质上是SQLContext和HiveContext的组合,SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成

    val conf: SparkConf = new SparkConf().setAppName("test")
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

     

    二、repartition与coalesce区别

    repartition一般是用来增加分区数(当然也可以减少),coalesce只能用来减少分区数。所以如果不介意保存的文件块大小不一样,可以使用coalesce来减少分区数,保存的时候一个分区就会生成一个文件块

     

    三、Scala常用方法

    1. StringBuilder

    主要用于字符串的拼接,可作用于生成倒排序列,如:
    val userItemScore = sc.parallelize(List((1001, 1, 0.8), (1001, 2, 0.7), (1001, 3, 0.5), (1001, 4, 0.9)))
    userItemScore.map(x => (x._1, (x._2.toString, x._3.toString))).groupByKey()
    .map{x =>
    val userid = x._1
    val item_score_list = x._2
    val tmp_arr = item_score_list.toArray.sortWith(_._2 > _._2)
    val watch_len = tmp_arr.length
    val strbuf = new StringBuilder()
    
    for (i <- 0 until watch_len - 1) {
    strbuf ++= tmp_arr(i)._1
    strbuf.append(":")
    strbuf ++= tmp_arr(i)._2
    strbuf.append(" ")
    }
    strbuf ++= tmp_arr(watch_len - 1)._1
    strbuf.append(":")
    strbuf ++= tmp_arr(watch_len - 1)._2
    
    userid + "	" + strbuf
    }.collect()

     

    2. scala.collection.mutable.ArrayBuffer

    相当于是一个大小可变数组,把需要的值添加进来,例如:
    val tmpArray = new ArrayBuffer[String]()
    val tmpArray = new ArrayBuffer[Int]()
    val tmpArray = new ArrayBuffer[(String, Int)]()
    scala> tmpArray.append(("wangzai", 1))
    scala> tmpArray
    res11: scala.collection.mutable.ArrayBuffer[(String, Int)] = ArrayBuffer((wangzai,1), (test,2))
    
    tmpArray.indexOf(("test",2))为获取当前值的索引,返回类型为整型
    tmpArray.slice(tmpArray.indexOf(("test", 2)), tmpArray.length)为切片,返回类型为ArrayBuffer

     

    四、通过spark-shell来操作数据库中的表

    1 启动(通过--jars指定包,后面reids包不需要,只是演示添加多个包的用法)

    /xxx/spark/bin/spark-shell 
    --master spark://xxx:7077 
    --executor-cores 1 
    --total-executor-cores 5 
    --driver-memory 2g 
    --jars /xxx/jars/mysql-connector-java-5.1.38.jar,/xxx/jars/jedis-2.9.0.jar

    2 在命令行中输入::paste, 然后粘贴以下代码,最后ctrl+D退出之后,即可执行

    
    

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.SparkConf
    val conf: SparkConf = new SparkConf()
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val mysqlUrl: String = "jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    val productTable: String = "product_info"
    val orderTable: String = "order_info"
    val properties: Properties = new Properties()
    properties.put("user", user)
    properties.put("password", password)

    
    

    // 获取同事购配置表数据
    val productDF: DataFrame = spark.read.jdbc(mysqlUrl, productTable, properties).select("id", "name")
    val orderDF: DataFrame = spark.read.jdbc(mysqlUrl, orderTable, properties).select("product_id", "createTime")

    
    

    val totalDataDF = productDF.join(orderDF, orderDF("product_id") === productDF("id")).drop("id")
    //如果product_info对应的id为product_id,即关联id字段名不相同
    //val totalDataDF = productDF.join(orderDF, Seq("product_id"))

    3 把该DateFrame注册为临时表才能通过spark-sql操作

    totalDataDF.createOrReplaceTempView("totalDataDF")

     

    五、spark-sql的基本操作

    //默认显示20条数据
    scala> df.show()
    //打印模式信息
    scala> df.printSchema()
    //选择多列
    scala> df.select(df("name"),df("age")+1).show()
    // 条件过滤
    scala> df.filter(df("age") > 20 ).show()
    // 分组聚合
    scala> df.groupBy("age").count().show()
    // 排序
    scala> df.sort(df("age").desc).show()
    //多列排序
    scala> df.sort(df("age").desc, df("name").asc).show()
    //对列进行重命名
    scala> df.select(df("name").as("username"),df("age")).show()
    //对多个列重命名
    scala> df.withColumnRenamed("id", "userId").withColumnRenamed("name", "userName")

     

     

  • 相关阅读:
    Delphi TWebBrowser[11] 读写html代码
    Nginx 配置反向代理
    清除git中缓存的凭证(用户名及密码)
    python 摄像头
    a 链接控制打开新窗口 无地址栏
    树形多级菜单数据源嵌套结构与扁平结构互转
    使用 git 的正确姿势
    JavaScript this
    JavaScript Scope Chain
    JavaScript Scope Context
  • 原文地址:https://www.cnblogs.com/654wangzai321/p/11096992.html
Copyright © 2011-2022 走看看