def test4(): Unit = { val conf = new SparkConf().setAppName("WC").setMaster("local[2]") val sc = new SparkContext(conf) val arr = List(("Apache" -> "Spark"), ("Apache" -> "Kafka"), ("Oracle" -> "JAVA", "Oracle" -> "DB ORACLE", "Oracle" -> "Mysql")); val tmp = arr.foldLeft(mutable.ListBuffer[Tuple2[String,String]]())((m,i)=>{
if (i.productArity > 2) { val it = i.productIterator while(it.hasNext){ val t = it.next() m+=t.asInstanceOf[Tuple2[String,String]] } m } else { m += i.asInstanceOf[Tuple2[String,String]] } }); tmp.foreach(println(_)) val rdd1 = sc.makeRDD(tmp).groupByKey() println(rdd1.collect().toBuffer) }