package com.mengyao.examples.spark.core; import java.io.Serializable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 国内乘用车4月、1-4月销量数据统计 * @author mengyao * */ @SuppressWarnings("all") public class CarSaleStatistics { static class Sale implements Serializable { private static final long serialVersionUID = -5393067134730174480L; //排名 private int no; //车型 private String model; //车企 private String brand; //4月销量 private int fourSale; //1-4月累计销量 private int totalSale; public Sale(int no, String model, String brand, int fourSale, int totalSale) { this.no = no; this.model = model; this.brand = brand; this.fourSale = fourSale; this.totalSale = totalSale; } public int getNo() { return no; } public void setNo(int no) { this.no = no; } public String getModel() { return model; } public void setModel(String model) { this.model = model; } public String getBrand() { return brand; } public void setBrand(String brand) { this.brand = brand; } public int getFourSale() { return fourSale; } public void setFourSale(int fourSale) { this.fourSale = fourSale; } public int getTotalSale() { return totalSale; } public void setTotalSale(int totalSale) { this.totalSale = totalSale; } @Override public String toString() { return no + " " + model + " " + brand + " " + fourSale + " " + totalSale; } } /** * 集群模式:spark-submit --class com.mengyao.examples.spark.core.CarSaleStatistics --master yarn --deploy-mode cluster --driver-memory 2048m --executor-memory 1024m --executor-cores 1 --queue default examples-0.0.1-SNAPSHOT.jar /data/carsales_data/2018.4-china-car-sales_volume.txt /data/carsales_data/statistics/ * 本地模式:Run As > Java Application * @param args [in,out] */ public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName(CarSaleStatistics.class.getName()); if (null==args||args.length==0) { args = new String[]{"./src/main/resources/data/2018.4-china-car-sales_volume.txt", "D:/"}; System.setProperty("hadoop.home.dir", "D:/softs/dev/apache/hadoop-2.7.5"); conf.setMaster("local"); } JavaSparkContext sc = new JavaSparkContext(conf); //中国市场合资、国产乘用车4月分销量数据 JavaRDD<String> linesRDD = sc.textFile(args[0]); //按品牌分组 JavaPairRDD<String, Sale> brandSalesRDD = linesRDD.mapToPair(new PairFunction<String, String, Sale>() { private static final long serialVersionUID = -3023653638555855696L; @Override public Tuple2<String, Sale> call(String line) throws Exception { String[] fields = line.split(" "); Sale sale = new Sale(Integer.parseInt(fields[0]), fields[1], fields[2], Integer.parseInt(fields[3]), Integer.parseInt(fields[4])); return new Tuple2<String, Sale>(sale.getBrand(), sale); } }); //同品牌4月总销量、1-4月总销量 JavaPairRDD<String, Sale> brandTotalSalesRDD = brandSalesRDD.reduceByKey(new Function2<Sale, Sale, Sale>() { private static final long serialVersionUID = 1L; @Override public Sale call(Sale item1, Sale item2) throws Exception { item2.setFourSale(item1.getFourSale()+item2.getFourSale()); item2.setTotalSale(item1.getTotalSale()+item2.getTotalSale()); item2.setModel(item1.getModel()+","+item2.getModel()); return item2; } }); //4月份销量排名,转换key为4月销量 JavaPairRDD<Integer, Sale> fourSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() { private static final long serialVersionUID = 2012736852338064223L; @Override public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception { return new Tuple2<Integer, Sale>(t._2.getFourSale(), t._2); } }); //4月份销量排名降序 JavaPairRDD<Integer, Sale> fourSaleRankDescRDD = fourSaleRankRDD.sortByKey(false); fourSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() { private static final long serialVersionUID = -8110929872210046547L; @Override public void call(Tuple2<Integer, Sale> t) throws Exception { Sale sale = t._2; System.out.println("==== 4月份销量排名:"+sale.getBrand()+" = "+sale.getFourSale()); } }); fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"fourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class); //1-4月份累计销量排名,转换key为1-4月销量 JavaPairRDD<Integer, Sale> totalSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() { private static final long serialVersionUID = 2012736852338064223L; @Override public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception { return new Tuple2<Integer, Sale>(t._2.getTotalSale(), t._2); } }); //1-4月份累计销量排名降序 JavaPairRDD<Integer, Sale> totalSaleRankDescRDD = totalSaleRankRDD.sortByKey(false); totalSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() { private static final long serialVersionUID = -8110929872210046547L; @Override public void call(Tuple2<Integer, Sale> t) throws Exception { Sale sale = t._2; System.out.println("==== 1-4月份累计销量排名:"+sale.getBrand()+" = "+sale.getTotalSale()); } }); fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"oneTofourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class); //关闭 sc.close(); } }
查看HDP Spark的HistoryServer(IP,18081),如下图表示成功: