zoukankan      html  css  js  c++  java
  • Spark测试代码

    测试代码:

      1 import org.apache.spark.{SparkConf, SparkContext}
      2 import org.apache.spark.sql.hive.HiveContext
      3 
      4 /**
      5   * Created by Administrator on 2017/1/7.
      6   */
      7 object TestMain {
      8   def main(args: Array[String]): Unit = {
      9     val conf = new SparkConf().setAppName("Hangzhou_Test")
     10     //.setMaster("local[1]").setMaster("spark://172.21.7.10:7077").setJars(List("xxx.jar")).set("spark.executor.memory", "10g")
     11     val sc = new SparkContext(conf)
     12     val hiveContext = new HiveContext(sc)
     13     // use rc_hive_db;
     14     hiveContext.sql("use rc_hive_db")
     15 
     16     import hiveContext.implicits._
     17 
     18     hiveContext.setConf("mapred.max.split.size", "256000000")
     19     hiveContext.setConf("mapred.min.split.size.per.node", "100000000")
     20     hiveContext.setConf("mapred.min.split.size.per.rack", "100000000")
     21     hiveContext.setConf("hive.input.format", "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat")
     22     hiveContext.setConf("hive.merge.mapfiles", "true")
     23     hiveContext.setConf("hive.merge.mapredfiles", "true")
     24     hiveContext.setConf("hive.merge.size.per.task", "256000000")
     25     hiveContext.setConf("hive.merge.smallfiles.avgsize", "256000000")
     26     hiveContext.setConf("hive.groupby.skewindata", "true")
     27 
     28     hiveContext.sql("create table if not exists tb_id_vs_name(id int,name string)")
     29     hiveContext.sql("create table if not exists tb_id_vs_name2(id int,name string)")
     30 
     31     println("-------------------------word count:------------------------------------")
     32     // http://blog.csdn.net/t1dmzks/article/details/70189509
     33     var words = "When building the vocabulary ignore terms that have a document frequency strictly lower than the given threshold. This value is also called cut-off in the literature. If float, the parameter represents a proportion of documents, integer absolute counts. This parameter is ignored if vocabulary is not None."
     34     val textFile = sc.parallelize(words.split(" "), 2)
     35     textFile.flatMap(line => line.split(" "))
     36       .map(word => (word, 1))
     37       .reduceByKey(_ + _)
     38       .foreach(println)
     39 
     40     println("-------------------------map(func):------------------------------------")
     41     //    1.map(func)
     42     val rdd = sc.parallelize(1 to 10) //创建RDD
     43     val map = rdd.map(_ * 2) //对RDD中的每个元素都乘于2
     44     map.foreach(x => print(x + " "))
     45 
     46     println("-------------------------flatMap(func):------------------------------------")
     47     //    2.flatMap(func)
     48     val fm = rdd.flatMap(x => (1 to x)).collect()
     49     fm.foreach(x => print(x + " "))
     50 
     51     println("-------------------------mapPartitions(func) 1:------------------------------------")
     52     //    3.mapPartitions(func)
     53     val mp = sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2).mapPartitions(x => {
     54       var woman = List[String]()
     55       while (x.hasNext) {
     56         val next = x.next()
     57         next match {
     58           case (_, "female") => woman = next._1 :: woman
     59           case _ =>
     60         }
     61       }
     62       woman.iterator
     63     })
     64     /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
     65     mp.collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出
     66 
     67     println("-------------------------mapPartitions(func) 2:------------------------------------")
     68     sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2)
     69       .mapPartitions(x => x.filter(_._2 == "female"))
     70       .map(x => x._1)
     71       .foreach(x => (print(x + " ")))
     72 
     73     println("-------------------------mapPartitionsWithIndex(func) :------------------------------------")
     74     //    4.mapPartitionsWithIndex(func)
     75     sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2)
     76       .mapPartitionsWithIndex((index: Int, iter: Iterator[(String, String)]) => {
     77         var woman = List[String]()
     78         while (iter.hasNext) {
     79           val next = iter.next()
     80           next match {
     81             case (_, "female") => woman = "[" + index + "]" + next._1 :: woman
     82             case _ =>
     83           }
     84         }
     85         woman.iterator
     86       })
     87       .collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出
     88 
     89     println("-------------------------simple(withReplacement,fraction,seed) :------------------------------------")
     90     //    5.simple(withReplacement,fraction,seed)
     91     val sample1 = rdd.sample(true, 0.5, 3)
     92     sample1.collect.foreach(x => print(x + " "))
     93 
     94     println("-------------------------union(ortherDataset) :将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重------------------------------------")
     95     //    6.union(ortherDataset)
     96     val rdd1 = sc.parallelize(1 to 3)
     97     val rdd2 = sc.parallelize(3 to 5)
     98     rdd1.union(rdd2).collect.foreach(x => print(x + " "))
     99 
    100     println("-------------------------union(ortherDataset) :返回两个RDD的交集------------------------------------")
    101     //    7.intersection(otherDataset)
    102     rdd1.intersection(rdd2).collect.foreach(x => print(x + " "))
    103 
    104     println("-------------------------distinct([numTasks]) :对RDD中的元素进行去重------------------------------------")
    105     //    8.distinct([numTasks])
    106     sc.parallelize(List(1, 1, 2, 5, 2, 9, 6, 1)).distinct().collect.foreach(x => print(x + " "))
    107 
    108     println("-------------------------cartesian(otherDataset):对两个RDD中的所有元素进行笛卡尔积操作------------------------------------")
    109     //    9.cartesian(otherDataset)
    110     sc.parallelize(1 to 3).cartesian(sc.parallelize(2 to 5)).foreach(x => println(x + " "))
    111 
    112     println("-------------------------coalesce(numPartitions,shuffle):对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数------------------------------------")
    113     //    10.coalesce(numPartitions,shuffle)
    114     val coalesceRDD = sc.parallelize(1 to 16, 4).coalesce(3) //当suffle的值为false时,不能增加分区数(即分区数不能从5->7)
    115     println("重新分区后的分区个数:" + coalesceRDD.partitions.size)
    116 
    117     val coalesceRDD2 = sc.parallelize(1 to 16, 4).coalesce(7, true)
    118     println("重新分区后的分区个数:" + coalesceRDD2.partitions.size)
    119     println("RDD依赖关系:" + coalesceRDD2.toDebugString)
    120 
    121     println("-------------------------repartition(numPartition):是函数coalesce(numPartition,true)的实现,效果和例9.1的coalesce(numPartition,true)的一样------------------------------------")
    122     //    11.repartition(numPartition)
    123 
    124 
    125     //    12.glom()glom():将RDD的每个分区中的类型为T的元素转换换数组Array[T]
    126     //    13.randomSplit(weight:Array[Double],seed):根据weight权重值将一个RDD划分成多个RDD,权重越高划分得到的元素较多的几率就越大
    127 
    128     println("-------------------------repartition(numPartition)-----------------------------")
    129 
    130     sc.parallelize(List((1, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"),
    131       (21, "211"), (22, "222"), (23, "233"), (24, "244"), (25, "255"), (26, "266"), (27, "277"), (28, "288"), (29, "99"), (210, "21010"),
    132       (31, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"),
    133       (41, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"),
    134       (51, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"),
    135       (61, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"),
    136       (71, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"),
    137       (81, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010")
    138     )).map(s => (s._1, s._2)).toDF().registerTempTable("temp_tb_id_vs_name")
    139 
    140     hiveContext.sql("insert into tb_id_vs_name select * from temp_tb_id_vs_name")
    141 
    142     sc.parallelize(List((1, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010")
    143     )).map(s => (s._1, s._2)).toDF().registerTempTable("temp_tb_id_vs_name2")
    144 
    145     hiveContext.sql("insert into tb_id_vs_name2 select * from temp_tb_id_vs_name2")
    146 
    147     var result = hiveContext.sql("select t10.id as t10_id,t10.name as t10_name from tb_id_vs_name t10 inner join tb_id_vs_name2 t11 on t10.id=t11.id")
    148     result.map(s => (s.getAs[Int]("t10_id"), s.getAs[String]("t10_name"))).foreach(s => {
    149       println(s._1 + ":" + s._2)
    150     })
    151 
    152     sc.stop()
    153   }
    154 }

    测试结果:

     1 -------------------------word count:------------------------------------
     2 -------------------------map(func):------------------------------------         
     3 -------------------------flatMap(func):------------------------------------     
     4 1 1 2 1 2 3 1 2 3 4 1 2 3 4 5 1 2 3 4 5 6 1 2 3 4 5 6 7 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 10 
    -------------------------mapPartitions(func) 1:------------------------------------ 5 kpop lucy
    -------------------------mapPartitions(func) 2:------------------------------------ 6 -------------------------mapPartitionsWithIndex(func) :------------------------------------ 7 [0]kpop [1]lucy -------------------------simple(withReplacement,fraction,seed) :------------------------------------ 8 4 4 6 8
    -------------------------union(ortherDataset) :灏嗕袱涓猂DD涓�殑鏁版嵁闆嗚繘琛屽悎骞讹紝鏈€缁堣繑鍥炰袱涓猂DD鐨勫苟闆嗭紝鑻�DD涓�瓨鍦ㄧ浉鍚岀殑鍏冪礌涔熶笉浼氬幓閲----------------------------------- 9 1 2 3 3 4 5
    -------------------------union(ortherDataset) :杩斿洖涓や釜RDD鐨勪氦闆----------------------------------- 10 3
    -------------------------distinct([numTasks]) :瀵筊DD涓�殑鍏冪礌杩涜�鍘婚噸------------------------------------ 11 1 2 5 6 9
    -------------------------cartesian(otherDataset):瀵逛袱涓猂DD涓�殑鎵€鏈夊厓绱犺繘琛岀瑳鍗″皵绉�搷浣----------------------------------- 12 -------------------------coalesce(numPartitions锛宻huffle):瀵筊DD鐨勫垎鍖鸿繘琛岄噸鏂板垎鍖猴紝shuffle榛樿�鍊间负false,褰搒huffle=false鏃讹紝涓嶈兘澧炲姞鍒嗗尯鏁----------------------------------- 13 閲嶆柊鍒嗗尯鍚庣殑鍒嗗尯涓�暟:3 14 閲嶆柊鍒嗗尯鍚庣殑鍒嗗尯涓�暟:7 15 RDD渚濊禆鍏崇郴:(7) MapPartitionsRDD[40] at coalesce at TestMain.scala:117 [] 16 | CoalescedRDD[39] at coalesce at TestMain.scala:117 [] 17 | ShuffledRDD[38] at coalesce at TestMain.scala:117 [] 18 +-(4) MapPartitionsRDD[37] at coalesce at TestMain.scala:117 [] 19 | ParallelCollectionRDD[36] at parallelize at TestMain.scala:117 [] 20 -------------------------repartition(numPartition):鏄�嚱鏁癱oalesce(numPartition,true)鐨勫疄鐜帮紝鏁堟灉鍜屼緥9.1鐨刢oalesce(numPartition,true)鐨勪竴鏍----------------------------------- 21 -------------------------repartition(numPartition)-----------------------------
  • 相关阅读:
    设置按钮取消高亮
    类似刷新微博后,顶部显示更新数量的动画。
    真机调试时,后台无法持续定位
    第六百三十三、四天 how can I 坚持
    第六百三十二天 how can I 坚持
    第六百三十一天 how can I 坚持
    第六百三十天 how can I 坚持
    第六百二十九天 how can I 坚持
    第六百二十八天 how can I 坚持
    第六百二十七天 how can I 坚持
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7380866.html
Copyright © 2011-2022 走看看