map 产生的键值对是tupple, split分隔出来的是数组
一、常用Transformation算子 (map 、flatMap 、filter 、groupByKey 、reduceByKey 、sortByKey 、join 、cogroup )
JAVA:
package day2; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; /** * 演示常用的Transformation类型的算子 */ public class TransformationDemo { public static void main(String[] args) { distinctDemo(); // filterDemo(); // mapDemo(); } public static void distinctDemo(){ SparkConf conf=new SparkConf().setAppName("distinctDemo") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); // JavaRDD<Integer> datasRDD = sc.parallelize(Arrays.asList(1,2,3,3,4,2,1)); JavaRDD<Integer> resultRDD = datasRDD.distinct(); List<Integer> list = resultRDD.collect(); for (Integer value : list) { System.out.println(value); } sc.close(); } /** * filter算子使用案例 * filter算子可以根据某一条件过滤数据。 * 如果条件成立,返回true,保留数据。 * 如果条件不成立,返回false,过滤掉数据。 */ public static void filterDemo(){ SparkConf conf=new SparkConf().setAppName("filterDemo") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); //模拟数据,并创建初始RDD JavaRDD<Integer> datasRdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7)); JavaRDD<Integer> resultRdd = datasRdd.filter(new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Integer num) throws Exception { // TODO Auto-generated method stub return num%2!=0; } }); List<Integer> list = resultRdd.collect(); for (Integer value : list) { System.out.println(value); } sc.close(); } /** * map算子案例 * 用Java开发Spark中,有两种类型的map算子: * 第一种,返回单个值用map() * 第二种,返回键值对用mapToPair() */ public static void mapDemo(){ SparkConf conf=new SparkConf().setAppName("mapDemo") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); //模拟数据,将每个数据乘2 List<Integer> datas = Arrays.asList(1,2,3,4,5,6,7); //创建初始的RDD JavaRDD<Integer> datasRdd = sc.parallelize(datas); JavaRDD<Integer> result = datasRdd.map(new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer num) throws Exception { // TODO Auto-generated method stub return num*2; } }); List<Integer> resultInfo = result.collect(); for (Integer data : resultInfo) { System.out.println(data); } sc.close(); }
}
public static void joinDemo(){ SparkConf conf=new SparkConf().setAppName("joinDemo") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); //模拟数据 @SuppressWarnings({ "unused", "unchecked" }) List<Tuple2<Integer, String>> stusInfo = Arrays.asList( new Tuple2<Integer, String>(1,"张三"), new Tuple2<Integer, String>(2,"李四"), new Tuple2<Integer, String>(3,"王五"), new Tuple2<Integer, String>(4,"小六") ); @SuppressWarnings({ "unchecked", "unused" }) List<Tuple2<Integer, Integer>> scoresInfo = Arrays.asList( new Tuple2<Integer, Integer>(1, 90), new Tuple2<Integer, Integer>(2, 80), new Tuple2<Integer, Integer>(3, 96) ); //创建初始RDD JavaPairRDD<Integer, String> stusRDD = sc.parallelizePairs(stusInfo); JavaPairRDD<Integer, Integer> scoresRDD = sc.parallelizePairs(scoresInfo); //通过join算子进行统计 JavaPairRDD<Integer, Tuple2<String, Integer>> resultRDD = stusRDD.join(scoresRDD); resultRDD.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<Integer, Tuple2<String, Integer>> info) throws Exception { // TODO Auto-generated method stub System.out.println("学号:"+info._1 +" 姓名:" +info._2._1+" 分数: "+info._2._2); } }); sc.close(); } /** * reduceByKey案例 */ public static void reduceByKeyDemo(){ SparkConf conf=new SparkConf().setAppName("reduceByKeyDemo") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); //模拟数据,(班级,分数),统计每班的成绩 @SuppressWarnings("unchecked") List<Tuple2<String, Integer>> scores = Arrays.asList( new Tuple2<String, Integer>("class1",90), new Tuple2<String, Integer>("class2",99), new Tuple2<String, Integer>("class1",92), new Tuple2<String, Integer>("class1",93), new Tuple2<String, Integer>("class2",80), new Tuple2<String, Integer>("class1",90) ); //创建初始的RDD JavaPairRDD<String, Integer> datasRDD = sc.parallelizePairs(scores); //通过reduceByKey统计每个班级的总分 /** * reduceByKey,首先根据Key进行分组,然后对分组后的Value值进行计算 */ JavaPairRDD<String, Integer> resultRDD = datasRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer num1, Integer num2) throws Exception { // TODO Auto-generated method stub return num1+num2; } }); List<Tuple2<String, Integer>> list = resultRDD.collect(); for (Tuple2<String, Integer> tuple2 : list) { System.out.println("班级:"+tuple2._1+",总分:"+tuple2._2); } sc.close(); } /** * groupByKey案例 */ public static void groupByKeyDemo(){ SparkConf conf=new SparkConf().setAppName("groupByKey") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); //模拟数据,(班级,分数),统计每班的成绩 @SuppressWarnings("unchecked") List<Tuple2<String, Integer>> scores = Arrays.asList( new Tuple2<String, Integer>("class1",90), new Tuple2<String, Integer>("class2",99), new Tuple2<String, Integer>("class1",92), new Tuple2<String, Integer>("class1",93), new Tuple2<String, Integer>("class2",80), new Tuple2<String, Integer>("class1",90) ); //创建初始的RDD //注意:此时用sc.parallelizePairs产生键值对类型的JavaPairRDD JavaPairRDD<String, Integer> datasRDD = sc.parallelizePairs(scores); /** * groupByKey,根据Key进行分组(聚合),将Key值相同的Value放在一个集合中。 */ JavaPairRDD<String, Iterable<Integer>> resultRDD = datasRDD.groupByKey(); resultRDD.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<String, Iterable<Integer>> infos) throws Exception { // TODO Auto-generated method stub System.out.println("班 级: "+infos._1); System.out.println(infos._2.toString()); System.out.println("==================================="); } }); sc.close(); } /** * distinct()案例 */ public static void distinctDemo(){ SparkConf conf=new SparkConf().setAppName("distinctDemo") .setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); // JavaRDD<Integer> datasRDD = sc.parallelize(Arrays.asList(1,2,3,3,4,2,1)); JavaRDD<Integer> resultRDD = datasRDD.distinct(); List<Integer> list = resultRDD.collect(); for (Integer value : list) { System.out.println(value); } sc.close(); }
// cogroup()
public static void cogroup(){ SparkConf conf=new SparkConf().setAppName("cogroup").setMaster("local"); JavaSparkContext sc=new JavaSparkContext(conf); @SuppressWarnings("unchecked") List<Tuple2<Integer, String>> asList = Arrays.asList( new Tuple2<Integer, String>(1, "张三"), new Tuple2<Integer, String>(2, "李四"), new Tuple2<Integer, String>(3, "刘伟"), new Tuple2<Integer, String>(4, "凌风") ); @SuppressWarnings("unchecked") List<Tuple2<Integer, Integer>> asList2 = Arrays.asList( new Tuple2<Integer, Integer>(1, 90), new Tuple2<Integer, Integer>(2, 60), new Tuple2<Integer, Integer>(3, 80) ); JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(asList); JavaPairRDD<Integer, Integer> parallelizePairs2 = sc.parallelizePairs(asList2); JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroup = parallelizePairs.cogroup(parallelizePairs2); cogroup.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() { /** * */ private static final long serialVersionUID = 1L; public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception { System.out.println(t._1+" "+t._2._1+" "+t._2._2); } }); }
scala:
object Transformatiopn { def main(args: Array[String]): Unit = { // mapDemo filter } def mapDemo: Unit = { val conf = new SparkConf().setAppName("mapDemo").setMaster("local") val sc = new SparkContext(conf) val datasRdd = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7), 2) val result = datasRdd.map(x => x * 2) result.foreach(println(_)) sc.stop() } def filter: Unit ={ val conf = new SparkConf().setAppName("mapDemo").setMaster("local") val sc = new SparkContext(conf) val datasRdd = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7)) val result = datasRdd.filter(x=>x%2!=0) result.foreach(println(_)) sc.stop() }
}
def groupBykey: Unit ={ val conf=new SparkConf().setAppName("maoDemo").setMaster("local") val sc=new SparkContext(conf) val dataRDD=sc.parallelize(Array(Tuple2("class1",90),Tuple2("class1",91) ,Tuple2("class2",91),Tuple2("class2",93))) val result =dataRDD.groupByKey() result.foreach(it=>{ println(it._1) println(it._2.toString())}) sc.stop() } def reduceBykey: Unit ={ val conf=new SparkConf().setAppName("maoDemo").setMaster("local") val sc=new SparkContext(conf) val dataRDD=sc.parallelize(Array(Tuple2("class1",90),Tuple2("class1",91) ,Tuple2("class2",91),Tuple2("class2",93))) val result=dataRDD.reduceByKey(_+_) result.foreach(it=>{ println(it._1) println(it._2)}) sc.stop() } def join: Unit ={ val conf=new SparkConf().setAppName("maoDemo").setMaster("local") val sc=new SparkContext(conf) val stuRDD=sc.parallelize(Array(Tuple2(1,"zhangsan"),Tuple2(2,"lisi"),Tuple2(3,"liuwe"),Tuple2(4,"liu"))) val scoreRDD=sc.parallelize(Array(Tuple2(1,80),Tuple2(2,89),Tuple2(3,90))) val result=stuRDD.join(scoreRDD) result.foreach(it=>{ println(it._1+" "+it._2._1+" "+it._2._2) }) }
//cogroup算子
def cogroup: Unit ={ val conf=new SparkConf().setAppName("cogroup").setMaster("local") val sc=new SparkContext(conf) val stuRDD=sc.parallelize(Array(Tuple2(1,"zhangsan"),Tuple2(2,"lisi"),Tuple2(3,"liuwe"),Tuple2(4,"liu"))) val scoreRDD=sc.parallelize(Array(Tuple2(1,80),Tuple2(2,89),Tuple2(3,90))) val result =stuRDD.cogroup(scoreRDD) result.foreach(it=>{ println(it._1+" "+it._2._1+" "+it._2._2) }) }
2、join() 分为:join (相当于内连接)、leftOuterJoin(左外链接)、rightOuterJoin(右外连接)
hive中空值返回NULL,spark中返回none
cogroup 类似于leftOuterJoin
补充:1、农产品项目 (统计每个省份的农产品市场总数)
package SparkCore.day1 import org.apache.spark.{SparkConf, SparkContext} /** * Created by tg on 3/21/17. */ object ProductDemo { def main(args: Array[String]): Unit = { marketCount } /** * 统计每个省份的农产品市场总数 */ def marketCount: Unit ={ val conf=new SparkConf().setAppName("marketCount") .setMaster("local") val sc=new SparkContext(conf) val lines=sc.textFile("file:///home/tg/datas/product.txt") /** * 1、map算子分隔数据 * 2、filter算子根据每行的长度进行过滤 * 3、map算子取出每行的省份、农产品市场 * 4、distinct算子对省份、农产品市场进行去重 * 5、groupByKey按照省份进行分组,将相同省份的农产品市场放在同一个集合中, * 集合的长度即为农产品市场总数 */ lines.map(x=>x.split(" ")) .filter(x=>x.length==6) .map(x=>x(4)+":"+x(3)) //山东:济南农产品市场 .distinct() //对“省份:产品市场”进行去重 .map(x=>{ val info=x.split(":") val prov=info(0) val market=info(1) (prov,market) //形成键值对(省份,农产品市场) }).groupByKey() //根据KEY值省份进行分组 .map(x=>{ val prov=x._1 val mcount=x._2.size (prov,mcount) //形成键值对(省份,农产品市场总数) }) .foreach(item=>{ println(item._1+"省的农产品市场总数:"+item._2) }) sc.stop() }
}
hive 实现;select p.provice,count(*)act from (select provice,market from prducts group by provice,market)p group by p.provice order by act desc;
/** * 统计没有农产品市场的省份有哪些 */ def noMarketProvince: Unit ={ val conf=new SparkConf().setAppName("noMarketProvince") .setMaster("local") val sc=new SparkContext(conf) //有农产品市场的省份,全部数据 val haveMarketProvince=sc.textFile("file:///home/tg/datas/product.txt") //全国所有的省份 val allProvince=sc.textFile("file:///home/tg/datas/allprovince.txt") //针对有农产品市场的省份,全部数据进行操作 val haveMarketRDD=haveMarketProvince.map(_.split(" ")) .filter(x=>x.length==6 && x(4)!=null) .map(x=>{ val prov=x(4).trim (prov,prov) //形成键值对 }).distinct() //对省份进行去重 //针对全国所有的省份进行操作 val allProvinceRDD=allProvince.map(x=>(x,x)) //进行左外联接操作 val resultRDD=allProvinceRDD.leftOuterJoin(haveMarketRDD) resultRDD.filter(x=>x._2._2==None) .foreach(x=>println(x._1)) sc.stop() }
用hive 实现:
select a.provinceName from (select provice from products group by provice)p right outer join allprovince a on p.province=provinceName wheere p.province is null;
另一种:
//统计农产品种类数排名前三的省份
def top3: Unit ={ val conf =new SparkConf().setAppName("top3").setMaster("local") val sc=new SparkContext(conf) val lines=sc.textFile("file:///home/hadoop/product.txt") val lines1=sc.textFile("file:///home/hadoop/allprovince.txt") lines.map(_.split(",")).filter(x=>x.length==6&&x(0)!=null&&x(4)!=null).map(x=>(x(4),x(0))) .distinct() .groupByKey() .map(x=>{ val sheng=x._1 val count=x._2.size (count,sheng) }).sortByKey(false).take(3) .foreach(item=>{ println(item._2+" "+item._1) }) sc.stop() }
hive实现:select p.province,count(*) as cnt from (select province,name from products group by province,name)p group by p.province order by cnt desc limit 3;
//统计每个省份排名前三的农产品市场 def ofodaan: Unit ={ val conf =new SparkConf().setAppName("ofodaan").setMaster("local") val sc=new SparkContext(conf) val lines=sc.textFile("file:///home/hadoop/product.txt") lines.map(_.split(",")).filter(x=>x.length==6&&x(0)!=null&&x(3)!=null&&x(4)!=null).map(x=>{ val name=x(0).trim val market=x(3).trim val prov=x(4).trim (prov+"-"+market,name) }).groupByKey().map(x=>{ val info=x._1.split("-") val prov=info(0) val market=info(1) val mcount=x._2.size (prov,(market,mcount)) }).groupByKey().map(x=>{ val prov=x._1 val datas=x._2.toArray.sortWith(_._2>_._2).take(3) (prov,datas) }).foreach(it=>{ println(it._1) it._2.foreach(x=>{ println(x._1+" "+x._2) }) })
//计算山西省的每种农产品的价格波动趋势,即计算每天价格均值 用hive 实现: select name ,times, if(count(price)>2,round((sum(price)-max(price)-min(price))/(count(price)-2),2),round(sum(price)/count(price),2)) from product where province='山西' group by name,times;
//统计排名前3的省份共同拥有的农产品类型
hive实现:
select c.name,count(*) as cont from
(select a.province,a.name from
(select provice,name from product group by province,name)a
left semi join
(select p.provice,count(*) as cnt from
(select provice,name from product group by province,name) p
group by p.province order by cnt desc limit 3)b
on a.province=b.province)c
group by c.name having cont>2;
//统计排名前3的省份共同拥有的农产品类型 //用到ArrayBuffer,需要导入 import scala.collection.mutable.ArrayBuffer def top_3: Unit ={ val conf =new SparkConf().setAppName("ofodaan").setMaster("local") val sc=new SparkContext(conf) val lines=sc.textFile("file:///home/hadoop/product.txt") val result=lines.map(_.split(",")).filter(x=>x.length==6&&x(0)!=null&&x(4)!=null) .map(x=>{ val prov=x(4).trim val name=x(0).trim (prov,name) }) .distinct() .groupByKey() .map(x=>{ val prov=x._1 val names=x._2 val ncount=names.toArray.length (ncount,(prov,names)) }).sortByKey(false).take(3) .map(x=>{ val prov=x._2._1 val names=x._2._2.toArray (prov,names) }) val provs=new ArrayBuffer[String]() //存放排名前3的省份 val array1=new ArrayBuffer[String]() //存入排名第一的省份的农产品 val array2=new ArrayBuffer[String]() //存入排名第二的省份的农产品 val array3=new ArrayBuffer[String]() //存入排名第三的省份的农产品 val numTest=sc.accumulator(0) //创建累加器 result.foreach(item=>{ provs+=item._1 numTest+=1 if(numTest.value==1){ array1++=item._2 //排名第一的省份的农产品 }else if(numTest.value==2){ array2++=item._2 //排名第二的省份的农产品 }else{ array3++=item._2 //排名第三的省份的农产品 } }) //通过intersect取交集,即可统计出排名前3的省份共同所有的农产品类型 val produts=(array1.intersect(array2)).intersect(array3) println ("排名前三名的省份是:") for(elem <- provs) println(elem) println("排名前3的省份共同所有的农产品类型:") for(elem <- produts) println(elem) sc.stop() }
2、其他格式的文件导入Linux上,会出现乱码的格式:
另存为text文件,用nodepad打开转换为utf-8,如果还不行, 在CRT上打开options ---sessions-----Appearance 选择编码utf-8
3、sortByKey 默认是true(升序)
4、已经有了hive为何还要推出Hbase:
hive:离线批处理
Hbase:数据实时计算问题
5、hadoop 插件重要性依次是:
hive
MR HBase(storm+Hbase+kafka)
hdfs zookeeper
中间件:kafka flume sqoop azkaban Ooize
spark:
scala spark core spark streaming
spark sql spark mllib
spark Graphx
6、在spark架构的运式原理中
主要进程:DriverMasterWorkerExecutor
线程:Task