zoukankan      html  css  js  c++  java
  • spark基础练习(未完)

    1、filter
    val rdd = sc.parallelize(List(1,2,3,4,5))
    val mappedRDD = rdd.map(2*_)
    mappedRDD.collect
    val filteredRDD = mappedRdd.filter(_>4)
    filteredRDD.collect

    (上述完整写法)
    val filteredRDDAgain = sc.parallelize(List(1,2,3,4,5)).map(2 * _).filter(_ > 4).collect

    2、wordcount
    val rdd = sc.textfile("/data/README.md")
    rdd.count
    rdd.cache
    val wordcount = rdd.flatMap(_.split('、')).map(_,1).reduceByKey(_+_)
    wordcount.collect
    wordcount.saveAsTextFile("/data/result")

    3、sort
    val== rdd.flatMap(_split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveasTextFile("/data/resultsorted")

    4、union
    val rdd1 = sc.parallelize(List(('a',1),('b',1)))
    val rdd2 = sc.parallelize(List(('c',1),('d',1)))
    val result = rdd1 union rdd2
    result.collect
    (join 同理)

    5、连接mysql 创建DF

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.sql.{SaveMode, DataFrame}
    import org.apache.spark.sql.hive.HiveContext

    val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"

    val people_DDL = s"""
    CREATE TEMPORARY TABLE PEOPLE
    USING org.apache.spark.sql.jdbc
    OPTIONS (
    url '${mySQLUrl}',
    dbtable 'person'
    )""".stripMargin

    sqlContext.sql(people_DDL)
    val person = sql("SELECT * FROM PEOPLE").cache()

    val name = "name"
    val targets = person.filter("name ="+name).collect()

    for(line <- targets){
    val target_name = line(0)
    println(target_name)
    val target_age = line(1)
    println(target_age)
    }

     

    6、手工设置Spark SQL task个数

      SQLContext.setConf("spark.sql.shuffle.partitions","10")

  • 相关阅读:
    js上传照片本地预览
    2020年6月23日第一次面试题(外派PA)
    笔记
    2020VUE系统回顾与学习
    2019最全前端面试问题及答案总结
    常见的浏览器兼容性问题总结
    Vue咖啡app项目总结
    跨域问题研究总结
    Class.forName()用法及与new区别
    反射
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/4876262.html
Copyright © 2011-2022 走看看