spark-shell --master=spark://namenode01:7077 --executor-memory 2g --driver-class-path /app/spark141/lib/mysql-connector-java-5.1.6-bin.jar hdfs dfs -put README.md ./ val file=sc.textFile("hdfs:///user/hadoop/README.md").filter(line=>line.contains("spark")) val wordcount=sc.textFile("hdfs:///user/hadoop/README.md").flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_) wordcount.saveTextFile("/data/result") //sort by count val wordcount2=sc.textFile("hdfs:///user/hadoop/README.md").flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1)) wordcount2.saveAsTextFile("/data/wordcount2") //启动hive metasotre service SPARK sql show nohup hive --service metastore > metastore.log 2>&1 & 注意:如果要使用hive,需要将hive-site.xml文件复制到conf/下 pssh " cp /app/hive/lib/mysql-connector-java-5.1.6-bin.jar /app/spark141/lib/" spark-shell --master=spark://namenode01:7077 --executor-memory 2g --driver-class-path /app/spark141/lib/mysql-connector-java-5.1.6-bin.jar val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.sql("use test") hiveContext.sql("show tables").collect().foreach(println) spark-sql --driver-class-path /app/spark141/lib/mysql-connector-java-5.1.6-bin.jar just like use hive , write sql use test show tables //parallelize show val num=sc.parallelize(1 to 10) val alpha=sc.parallelize('a' to 'z') val num2=num.map(_*2).collect().foreach(println) val num3=num.map(_%3==0).collect().foreach(println) val num3=num.filter(_%3==0).collect().foreach(println) num.reduce(_+_) num.reduce(_*_) num.reduceByKey(_+_) num.sortBy(x=>x,false)
//K-V演示 val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) kv1.sortByKey().collect //注意sortByKey的小括号不能省 asc kv1.sortByKey(false).collect //desc //how to sort by value? kv1.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1)).collect kv1.sortBy(x=>x).collect kv1.groupByKey().collect kv1.reduceByKey(_+_).collect val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5))) kv2.distinct.collect kv1.union(kv2).collect val kv3=sc.parallelize(List(("A",10),("B",20),("D",30))) kv1.join(kv3).collect kv1.cogroup(kv3).collect val kv4=sc.parallelize(List(List(1,2),List(3,4))) kv4.flatMap(x=>x.map(_+1)).collect