1 import org.apache.spark.{SparkConf, SparkContext} 2 import org.apache.spark.sql.hive.HiveContext 3 4 /** 5 * Created by Administrator on 2017/1/7. 6 */ 7 object TestMain { 8 def main(args: Array[String]): Unit = { 9 val conf = new SparkConf().setAppName("Hangzhou_Test") 10 //.setMaster("local[1]").setMaster("spark://").setJars(List("xxx.jar")).set("spark.executor.memory", "10g") 11 val sc = new SparkContext(conf) 12 val hiveContext = new HiveContext(sc) 13 // use rc_hive_db; 14 hiveContext.sql("use rc_hive_db") 15 16 import hiveContext.implicits._ 17 18 hiveContext.setConf("mapred.max.split.size", "256000000") 19 hiveContext.setConf("mapred.min.split.size.per.node", "100000000") 20 hiveContext.setConf("mapred.min.split.size.per.rack", "100000000") 21 hiveContext.setConf("hive.input.format", "") 22 hiveContext.setConf("hive.merge.mapfiles", "true") 23 hiveContext.setConf("hive.merge.mapredfiles", "true") 24 hiveContext.setConf("hive.merge.size.per.task", "256000000") 25 hiveContext.setConf("hive.merge.smallfiles.avgsize", "256000000") 26 hiveContext.setConf("hive.groupby.skewindata", "true") 27 28 hiveContext.sql("create table if not exists tb_id_vs_name(id int,name string)") 29 hiveContext.sql("create table if not exists tb_id_vs_name2(id int,name string)") 30 31 println("-------------------------word count:------------------------------------") 32 // 33 var words = "When building the vocabulary ignore terms that have a document frequency strictly lower than the given threshold. This value is also called cut-off in the literature. If float, the parameter represents a proportion of documents, integer absolute counts. This parameter is ignored if vocabulary is not None." 34 val textFile = sc.parallelize(words.split(" "), 2) 35 textFile.flatMap(line => line.split(" ")) 36 .map(word => (word, 1)) 37 .reduceByKey(_ + _) 38 .foreach(println) 39 40 println("-------------------------map(func):------------------------------------") 41 // 42 val rdd = sc.parallelize(1 to 10) //创建RDD 43 val map = * 2) //对RDD中的每个元素都乘于2 44 map.foreach(x => print(x + " ")) 45 46 println("-------------------------flatMap(func):------------------------------------") 47 // 2.flatMap(func) 48 val fm = rdd.flatMap(x => (1 to x)).collect() 49 fm.foreach(x => print(x + " ")) 50 51 println("-------------------------mapPartitions(func) 1:------------------------------------") 52 // 3.mapPartitions(func) 53 val mp = sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2).mapPartitions(x => { 54 var woman = List[String]() 55 while (x.hasNext) { 56 val next = 57 next match { 58 case (_, "female") => woman = next._1 :: woman 59 case _ => 60 } 61 } 62 woman.iterator 63 }) 64 /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/ 65 mp.collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出 66 67 println("-------------------------mapPartitions(func) 2:------------------------------------") 68 sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2) 69 .mapPartitions(x => x.filter(_._2 == "female")) 70 .map(x => x._1) 71 .foreach(x => (print(x + " "))) 72 73 println("-------------------------mapPartitionsWithIndex(func) :------------------------------------") 74 // 4.mapPartitionsWithIndex(func) 75 sc.parallelize(List(("kpop", "female"), ("zorro", "male"), ("mobin", "male"), ("lucy", "female")), 2) 76 .mapPartitionsWithIndex((index: Int, iter: Iterator[(String, String)]) => { 77 var woman = List[String]() 78 while (iter.hasNext) { 79 val next = 80 next match { 81 case (_, "female") => woman = "[" + index + "]" + next._1 :: woman 82 case _ => 83 } 84 } 85 woman.iterator 86 }) 87 .collect.foreach(x => (print(x + " "))) //将分区中的元素转换成Aarray再输出 88 89 println("-------------------------simple(withReplacement,fraction,seed) :------------------------------------") 90 // 5.simple(withReplacement,fraction,seed) 91 val sample1 = rdd.sample(true, 0.5, 3) 92 sample1.collect.foreach(x => print(x + " ")) 93 94 println("-------------------------union(ortherDataset) :将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重------------------------------------") 95 // 6.union(ortherDataset) 96 val rdd1 = sc.parallelize(1 to 3) 97 val rdd2 = sc.parallelize(3 to 5) 98 rdd1.union(rdd2).collect.foreach(x => print(x + " ")) 99 100 println("-------------------------union(ortherDataset) :返回两个RDD的交集------------------------------------") 101 // 7.intersection(otherDataset) 102 rdd1.intersection(rdd2).collect.foreach(x => print(x + " ")) 103 104 println("-------------------------distinct([numTasks]) :对RDD中的元素进行去重------------------------------------") 105 // 8.distinct([numTasks]) 106 sc.parallelize(List(1, 1, 2, 5, 2, 9, 6, 1)).distinct().collect.foreach(x => print(x + " ")) 107 108 println("-------------------------cartesian(otherDataset):对两个RDD中的所有元素进行笛卡尔积操作------------------------------------") 109 // 9.cartesian(otherDataset) 110 sc.parallelize(1 to 3).cartesian(sc.parallelize(2 to 5)).foreach(x => println(x + " ")) 111 112 println("-------------------------coalesce(numPartitions,shuffle):对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数------------------------------------") 113 // 10.coalesce(numPartitions,shuffle) 114 val coalesceRDD = sc.parallelize(1 to 16, 4).coalesce(3) //当suffle的值为false时,不能增加分区数(即分区数不能从5->7) 115 println("重新分区后的分区个数:" + coalesceRDD.partitions.size) 116 117 val coalesceRDD2 = sc.parallelize(1 to 16, 4).coalesce(7, true) 118 println("重新分区后的分区个数:" + coalesceRDD2.partitions.size) 119 println("RDD依赖关系:" + coalesceRDD2.toDebugString) 120 121 println("-------------------------repartition(numPartition):是函数coalesce(numPartition,true)的实现,效果和例9.1的coalesce(numPartition,true)的一样------------------------------------") 122 // 11.repartition(numPartition) 123 124 125 // 12.glom()glom():将RDD的每个分区中的类型为T的元素转换换数组Array[T] 126 // 13.randomSplit(weight:Array[Double],seed):根据weight权重值将一个RDD划分成多个RDD,权重越高划分得到的元素较多的几率就越大 127 128 println("-------------------------repartition(numPartition)-----------------------------") 129 130 sc.parallelize(List((1, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), 131 (21, "211"), (22, "222"), (23, "233"), (24, "244"), (25, "255"), (26, "266"), (27, "277"), (28, "288"), (29, "99"), (210, "21010"), 132 (31, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), 133 (41, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), 134 (51, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), 135 (61, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), 136 (71, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010"), 137 (81, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010") 138 )).map(s => (s._1, s._2)).toDF().registerTempTable("temp_tb_id_vs_name") 139 140 hiveContext.sql("insert into tb_id_vs_name select * from temp_tb_id_vs_name") 141 142 sc.parallelize(List((1, "11"), (2, "22"), (3, "33"), (4, "44"), (5, "55"), (6, "66"), (7, "77"), (8, "88"), (9, "99"), (10, "1010") 143 )).map(s => (s._1, s._2)).toDF().registerTempTable("temp_tb_id_vs_name2") 144 145 hiveContext.sql("insert into tb_id_vs_name2 select * from temp_tb_id_vs_name2") 146 147 var result = hiveContext.sql("select as t10_id, as t10_name from tb_id_vs_name t10 inner join tb_id_vs_name2 t11 on") 148 => (s.getAs[Int]("t10_id"), s.getAs[String]("t10_name"))).foreach(s => { 149 println(s._1 + ":" + s._2) 150 }) 151 152 sc.stop() 153 } 154 }
1 -------------------------word count:------------------------------------ 2 -------------------------map(func):------------------------------------ 3 -------------------------flatMap(func):------------------------------------ 4 1 1 2 1 2 3 1 2 3 4 1 2 3 4 5 1 2 3 4 5 6 1 2 3 4 5 6 7 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 10
-------------------------mapPartitions(func) 1:------------------------------------ 5 kpop lucy
-------------------------mapPartitions(func) 2:------------------------------------ 6 -------------------------mapPartitionsWithIndex(func) :------------------------------------ 7 [0]kpop [1]lucy -------------------------simple(withReplacement,fraction,seed) :------------------------------------ 8 4 4 6 8
