zoukankan      html  css  js  c++  java
  • spark基础练习(未完)

    1、filter
    val rdd = sc.parallelize(List(1,2,3,4,5))
    val mappedRDD = rdd.map(2*_)
    mappedRDD.collect
    val filteredRDD = mappedRdd.filter(_>4)
    filteredRDD.collect

    (上述完整写法)
    val filteredRDDAgain = sc.parallelize(List(1,2,3,4,5)).map(2 * _).filter(_ > 4).collect

    2、wordcount
    val rdd = sc.textfile("/data/README.md")
    rdd.count
    rdd.cache
    val wordcount = rdd.flatMap(_.split('、')).map(_,1).reduceByKey(_+_)
    wordcount.collect
    wordcount.saveAsTextFile("/data/result")

    3、sort
    val== rdd.flatMap(_split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveasTextFile("/data/resultsorted")

    4、union
    val rdd1 = sc.parallelize(List(('a',1),('b',1)))
    val rdd2 = sc.parallelize(List(('c',1),('d',1)))
    val result = rdd1 union rdd2
    result.collect
    (join 同理)

    5、连接mysql 创建DF

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.sql.{SaveMode, DataFrame}
    import org.apache.spark.sql.hive.HiveContext

    val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"

    val people_DDL = s"""
    CREATE TEMPORARY TABLE PEOPLE
    USING org.apache.spark.sql.jdbc
    OPTIONS (
    url '${mySQLUrl}',
    dbtable 'person'
    )""".stripMargin

    sqlContext.sql(people_DDL)
    val person = sql("SELECT * FROM PEOPLE").cache()

    val name = "name"
    val targets = person.filter("name ="+name).collect()

    for(line <- targets){
    val target_name = line(0)
    println(target_name)
    val target_age = line(1)
    println(target_age)
    }

     

    6、手工设置Spark SQL task个数

      SQLContext.setConf("spark.sql.shuffle.partitions","10")

  • 相关阅读:
    TCP/IP协议详解
    linux高性能服务器编程--初见
    聚合类
    类class 2
    继承
    构造函数再探
    静态成员与友元
    MySQL图形工具SQLyog破解版
    MySQL注释符号
    数据库中多对多关系的中间表的命名规则
  • 原文地址:https://www.cnblogs.com/yangsy0915/p/4876262.html
Copyright © 2011-2022 走看看