import org.apache.spark.{SparkContext, SparkConf} /** * Created by spark on 15-1-19. * 根据key对K-V类型的RDD进行排序获得新的RDD */ object SortByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","cat","owl","gnu","ant")) val b = sc.parallelize(1 to a.count().toInt) val c = a.zip(b) //asc c.sortByKey(true).collect().foreach(print) //desc c.sortByKey(false).collect().foreach(print) } } /** * Created by spark on 15-1-19. * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */ object Subtract { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10) val b = sc.parallelize(1 to 3) //45678910 //a.subtract(b).collect().foreach(print) val c = sc.parallelize(1 to 10) val d = sc.parallelize(List(1,2,3,11)) //45678910 c.subtract(d).collect().foreach(print) } } /** * Created by spark on 15-1-19. * RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 key中存在的,RDD2 key中不存在的 */ object SubtractByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","he","word","hello")) val b = a.keyBy(_.length) val c = sc.parallelize(List("cat","first","everyone")) val d = c.keyBy(_.length) //(2,he)(4,word) b.subtractByKey(d).collect().foreach(print) } } /** * Created by spark on 15-1-19. * sumApprox没有出现我希望的结果 */ object SumAndSumApprox { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) val b = a.sum() val c = a.sumApprox(0L,0.9).getFinalValue() println(b + " *** " + c) } } /** * Created by spark on 15-1-19. * 取出RDD的前n个元素,以数组的形式返回 */ object Take { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) //12345678910 a.take(10).foreach(print) } } /** * Created by spark on 15-1-19. * 对RDD元素进行升序排序 * 取出前n个元素并以数组的形式放回 */ object TakeOrdered { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(List("ff","aa","dd","cc")) //aacc a.takeOrdered(2).foreach(print) } } /** * Created by spark on 15-1-19. * 数据取样 */ object TakeSample { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10000) /** * 9048 5358 5216 7301 6303 6179 6151 5304 8115 3869 */ a.takeSample(true , 10 , 1).foreach(println) } } /** * Created by spark on 15-1-19. * debug 详情信息显示 */ object ToDebugString { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 9) val b = sc.parallelize(1 to 3) val c = a.subtract(b) c.toDebugString } } /** * Created by spark on 15-1-19. * 获得前几个最大值 */ object Top { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000) val c = a.top(10) /** *1000 999 998 997 996 995 994 993 992 991 */ c.foreach(println) } } /** * Union == ++ 把两个RDD合并为一个新的RDD */ object Union { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 3) val b = sc.parallelize(3 to 5) val c = a.union(b) val d = a ++ b /** *123345 */ c.collect().foreach(print) /** *123345 */ d.collect().foreach(print) } }
--Java
package com.demo.sparkWordCount; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import akka.japi.Function; import scala.Tuple2; /* * Ming Z M LI * */ public class FunctionDemo { /* * create Context */ public static JavaSparkContext createContext() { SparkConf sparkConf = new SparkConf().setAppName("FunctionDemo").setMaster("local[*]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); return ctx; } public static void main(String[] args) { demo5(); } /* * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */ public static void demo2() { JavaSparkContext ctx = createContext(); List<String> list1 = new ArrayList<String>(); list1.add("hello1"); list1.add("hello2"); list1.add("hello3"); list1.add("hello4"); List<String> list2 = new ArrayList<String>(); list2.add("hello3"); list2.add("hello4"); list2.add("world5"); list2.add("world6"); JavaRDD<String> a = ctx.parallelize(list1); JavaRDD<String> b = ctx.parallelize(list2); a.subtract(b).foreach(new VoidFunction<String>() { public void call(String t) throws Exception { System.out.println(t.toString()); } }); } /** * Created by spark on 15-1-19. RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 * key中存在的,RDD2 key中不存在的 foreach 结果带key (4, bird) (5, hello) (3, cat) output * - (4,bird) (4,bird) */ public static void demo3() { JavaSparkContext ctx = createContext(); JavaRDD<String> a = ctx.parallelize(new ArrayList<String>(Arrays.asList("cat", "hello", "bird", "bird"))); JavaRDD<String> b = ctx.parallelize(new ArrayList<String>(Arrays.asList("cat", "hello", "testing"))); JavaPairRDD<Integer, String> c = a.keyBy(new org.apache.spark.api.java.function.Function<String, Integer>() { public Integer call(String v1) throws Exception { return v1.length(); } }); // c.foreach(new VoidFunction<Tuple2<Integer,String>>(){ // // public void call(Tuple2<Integer, String> t) throws Exception { // // TODO Auto-generated method stub // System.out.println("("+t._1+", "+t._2+")"); // } // }); JavaPairRDD<Integer, String> d = b.keyBy(new org.apache.spark.api.java.function.Function<String, Integer>() { public Integer call(String v1) throws Exception { return v1.length(); } }); c.subtract(d).foreach(new VoidFunction<Tuple2<Integer, String>>() { public void call(Tuple2<Integer, String> t) throws Exception { // TODO Auto-generated method stub System.out.println("(" + t._1 + ", " + t._2 + ")"); } }); } /** * 取出RDD的前n个元素,以数组的形式返回 */ public static void demo4() { JavaSparkContext ctx = createContext(); JavaRDD<String> a = ctx.parallelize(new ArrayList<String>(Arrays.asList("1", "4", "2", "3"))); List<String> b = a.take(3); for (String c : b) { System.out.println(c); } } /** * 获得前几个最大值 output - hello 3 */ public static void demo5() { JavaSparkContext ctx = createContext(); JavaRDD<String> a = ctx.parallelize(new ArrayList<String>(Arrays.asList("1", "hello", "2", "3"))); List<String> b = a.top(2); for (String c : b) { System.out.println(c); } } }