zoukankan      html  css  js  c++  java
  • Spark实现销量统计

    package com.mengyao.examples.spark.core;
    
    
    import java.io.Serializable;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    
    import scala.Tuple2;
    
    /**
     * 国内乘用车4月、1-4月销量数据统计
     * @author mengyao
     *
     */
    @SuppressWarnings("all")
    public class CarSaleStatistics {
    
        static class Sale implements Serializable {
            private static final long serialVersionUID = -5393067134730174480L;
            //排名
            private int no;
            //车型
            private String model;
            //车企
            private String brand;
            //4月销量
            private int fourSale;
            //1-4月累计销量
            private int totalSale;
            public Sale(int no, String model, String brand, int fourSale, int totalSale) {
                this.no = no;
                this.model = model;
                this.brand = brand;
                this.fourSale = fourSale;
                this.totalSale = totalSale;
            }
            public int getNo() {
                return no;
            }
            public void setNo(int no) {
                this.no = no;
            }
            public String getModel() {
                return model;
            }
            public void setModel(String model) {
                this.model = model;
            }
            public String getBrand() {
                return brand;
            }
            public void setBrand(String brand) {
                this.brand = brand;
            }
            public int getFourSale() {
                return fourSale;
            }
            public void setFourSale(int fourSale) {
                this.fourSale = fourSale;
            }
            public int getTotalSale() {
                return totalSale;
            }
            public void setTotalSale(int totalSale) {
                this.totalSale = totalSale;
            }
            @Override
            public String toString() {
                return no + "	" + model + "	" + brand + "	" + fourSale + "	" + totalSale;
            }
        }
        
        /**
         * 集群模式:spark-submit --class com.mengyao.examples.spark.core.CarSaleStatistics --master yarn --deploy-mode cluster --driver-memory 2048m --executor-memory 1024m --executor-cores 1 --queue default examples-0.0.1-SNAPSHOT.jar /data/carsales_data/2018.4-china-car-sales_volume.txt /data/carsales_data/statistics/
         * 本地模式:Run As > Java Application
         * @param args [in,out]
         */
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName(CarSaleStatistics.class.getName());
            if (null==args||args.length==0) {
                args = new String[]{"./src/main/resources/data/2018.4-china-car-sales_volume.txt", "D:/"};
                System.setProperty("hadoop.home.dir", "D:/softs/dev/apache/hadoop-2.7.5");
                conf.setMaster("local");
            }
            JavaSparkContext sc = new JavaSparkContext(conf);
            //中国市场合资、国产乘用车4月分销量数据
            JavaRDD<String> linesRDD = sc.textFile(args[0]);
            //按品牌分组
            JavaPairRDD<String, Sale> brandSalesRDD = linesRDD.mapToPair(new PairFunction<String, String, Sale>() {
                private static final long serialVersionUID = -3023653638555855696L;
                @Override
                public Tuple2<String, Sale> call(String line) throws Exception {
                    String[] fields = line.split("	");
                    Sale sale = new Sale(Integer.parseInt(fields[0]), fields[1], fields[2], Integer.parseInt(fields[3]), Integer.parseInt(fields[4]));
                    return new Tuple2<String, Sale>(sale.getBrand(), sale);
                }
            });
            //同品牌4月总销量、1-4月总销量
            JavaPairRDD<String, Sale> brandTotalSalesRDD = brandSalesRDD.reduceByKey(new Function2<Sale, Sale, Sale>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Sale call(Sale item1, Sale item2) throws Exception {
                    item2.setFourSale(item1.getFourSale()+item2.getFourSale());
                    item2.setTotalSale(item1.getTotalSale()+item2.getTotalSale());
                    item2.setModel(item1.getModel()+","+item2.getModel());
                    return item2;
                }
            });
            //4月份销量排名,转换key为4月销量
            JavaPairRDD<Integer, Sale> fourSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() {
                private static final long serialVersionUID = 2012736852338064223L;
                @Override
                public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception {
                    return new Tuple2<Integer, Sale>(t._2.getFourSale(), t._2);
                }
            });
            //4月份销量排名降序
            JavaPairRDD<Integer, Sale> fourSaleRankDescRDD = fourSaleRankRDD.sortByKey(false);
            fourSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() {
                private static final long serialVersionUID = -8110929872210046547L;
                @Override
                public void call(Tuple2<Integer, Sale> t) throws Exception {
                    Sale sale = t._2;
                    System.out.println("==== 4月份销量排名:"+sale.getBrand()+" = "+sale.getFourSale());
                }
            });
            fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"fourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class);
            
            //1-4月份累计销量排名,转换key为1-4月销量
            JavaPairRDD<Integer, Sale> totalSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() {
                private static final long serialVersionUID = 2012736852338064223L;
                @Override
                public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception {
                    return new Tuple2<Integer, Sale>(t._2.getTotalSale(), t._2);
                }
            });
            //1-4月份累计销量排名降序
            JavaPairRDD<Integer, Sale> totalSaleRankDescRDD = totalSaleRankRDD.sortByKey(false);
            totalSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() {
                private static final long serialVersionUID = -8110929872210046547L;
                @Override
                public void call(Tuple2<Integer, Sale> t) throws Exception {
                    Sale sale = t._2;
                    System.out.println("==== 1-4月份累计销量排名:"+sale.getBrand()+" = "+sale.getTotalSale());
                }
            });
            fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"oneTofourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class);
            //关闭
            sc.close();
        }
    
    }

    查看HDP Spark的HistoryServer(IP,18081),如下图表示成功:

  • 相关阅读:
    mysqlbinlog基于某个偏移量进行数据的恢复(重做),--start-position,--stop-position的使用方法
    mysql数据库binary log中的事件到底是什么?
    mysqlbinlog工具的作用是什么呢,如何将binary log转换为文本格式?
    mysqldump对于DB进行逻辑备份的时候,是否会备份视图呢?
    mysqldump工具,通过--where选项,导出指定表中指定数据?
    mysql数据库中,通过mysqladmin工具,创建数据库
    mysql数据库中,查看某个数据库下的表的存储类型都有哪些
    Oracle体系结构之rac内存管理
    Oracle HA 之 Server Pool 实战
    Oracle HA 之 SERVICE和DRM实战
  • 原文地址:https://www.cnblogs.com/mengyao/p/9235500.html
Copyright © 2011-2022 走看看