zoukankan      html  css  js  c++  java
  • Spark实现销量统计

    package com.mengyao.examples.spark.core;
    
    
    import java.io.Serializable;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    
    
    import scala.Tuple2;
    
    /**
     * 国内乘用车4月、1-4月销量数据统计
     * @author mengyao
     *
     */
    @SuppressWarnings("all")
    public class CarSaleStatistics {
    
        static class Sale implements Serializable {
            private static final long serialVersionUID = -5393067134730174480L;
            //排名
            private int no;
            //车型
            private String model;
            //车企
            private String brand;
            //4月销量
            private int fourSale;
            //1-4月累计销量
            private int totalSale;
            public Sale(int no, String model, String brand, int fourSale, int totalSale) {
                this.no = no;
                this.model = model;
                this.brand = brand;
                this.fourSale = fourSale;
                this.totalSale = totalSale;
            }
            public int getNo() {
                return no;
            }
            public void setNo(int no) {
                this.no = no;
            }
            public String getModel() {
                return model;
            }
            public void setModel(String model) {
                this.model = model;
            }
            public String getBrand() {
                return brand;
            }
            public void setBrand(String brand) {
                this.brand = brand;
            }
            public int getFourSale() {
                return fourSale;
            }
            public void setFourSale(int fourSale) {
                this.fourSale = fourSale;
            }
            public int getTotalSale() {
                return totalSale;
            }
            public void setTotalSale(int totalSale) {
                this.totalSale = totalSale;
            }
            @Override
            public String toString() {
                return no + "	" + model + "	" + brand + "	" + fourSale + "	" + totalSale;
            }
        }
        
        /**
         * 集群模式:spark-submit --class com.mengyao.examples.spark.core.CarSaleStatistics --master yarn --deploy-mode cluster --driver-memory 2048m --executor-memory 1024m --executor-cores 1 --queue default examples-0.0.1-SNAPSHOT.jar /data/carsales_data/2018.4-china-car-sales_volume.txt /data/carsales_data/statistics/
         * 本地模式:Run As > Java Application
         * @param args [in,out]
         */
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setAppName(CarSaleStatistics.class.getName());
            if (null==args||args.length==0) {
                args = new String[]{"./src/main/resources/data/2018.4-china-car-sales_volume.txt", "D:/"};
                System.setProperty("hadoop.home.dir", "D:/softs/dev/apache/hadoop-2.7.5");
                conf.setMaster("local");
            }
            JavaSparkContext sc = new JavaSparkContext(conf);
            //中国市场合资、国产乘用车4月分销量数据
            JavaRDD<String> linesRDD = sc.textFile(args[0]);
            //按品牌分组
            JavaPairRDD<String, Sale> brandSalesRDD = linesRDD.mapToPair(new PairFunction<String, String, Sale>() {
                private static final long serialVersionUID = -3023653638555855696L;
                @Override
                public Tuple2<String, Sale> call(String line) throws Exception {
                    String[] fields = line.split("	");
                    Sale sale = new Sale(Integer.parseInt(fields[0]), fields[1], fields[2], Integer.parseInt(fields[3]), Integer.parseInt(fields[4]));
                    return new Tuple2<String, Sale>(sale.getBrand(), sale);
                }
            });
            //同品牌4月总销量、1-4月总销量
            JavaPairRDD<String, Sale> brandTotalSalesRDD = brandSalesRDD.reduceByKey(new Function2<Sale, Sale, Sale>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Sale call(Sale item1, Sale item2) throws Exception {
                    item2.setFourSale(item1.getFourSale()+item2.getFourSale());
                    item2.setTotalSale(item1.getTotalSale()+item2.getTotalSale());
                    item2.setModel(item1.getModel()+","+item2.getModel());
                    return item2;
                }
            });
            //4月份销量排名,转换key为4月销量
            JavaPairRDD<Integer, Sale> fourSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() {
                private static final long serialVersionUID = 2012736852338064223L;
                @Override
                public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception {
                    return new Tuple2<Integer, Sale>(t._2.getFourSale(), t._2);
                }
            });
            //4月份销量排名降序
            JavaPairRDD<Integer, Sale> fourSaleRankDescRDD = fourSaleRankRDD.sortByKey(false);
            fourSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() {
                private static final long serialVersionUID = -8110929872210046547L;
                @Override
                public void call(Tuple2<Integer, Sale> t) throws Exception {
                    Sale sale = t._2;
                    System.out.println("==== 4月份销量排名:"+sale.getBrand()+" = "+sale.getFourSale());
                }
            });
            fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"fourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class);
            
            //1-4月份累计销量排名,转换key为1-4月销量
            JavaPairRDD<Integer, Sale> totalSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() {
                private static final long serialVersionUID = 2012736852338064223L;
                @Override
                public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception {
                    return new Tuple2<Integer, Sale>(t._2.getTotalSale(), t._2);
                }
            });
            //1-4月份累计销量排名降序
            JavaPairRDD<Integer, Sale> totalSaleRankDescRDD = totalSaleRankRDD.sortByKey(false);
            totalSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() {
                private static final long serialVersionUID = -8110929872210046547L;
                @Override
                public void call(Tuple2<Integer, Sale> t) throws Exception {
                    Sale sale = t._2;
                    System.out.println("==== 1-4月份累计销量排名:"+sale.getBrand()+" = "+sale.getTotalSale());
                }
            });
            fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"oneTofourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class);
            //关闭
            sc.close();
        }
    
    }

    查看HDP Spark的HistoryServer(IP,18081),如下图表示成功:

  • 相关阅读:
    php checkbox 复选框
    wp7 The remote connection to the device has been lost
    php json_decode null
    今入住博客园,希望笑傲职场!
    单例模式之见解设计模式
    简单工厂之见解设计模式
    infopath 序列化 在发布处有导出源文件.存放一地方后有myschema.xsd 文件
    超简单的天气预报webpart
    用户控件传值
    Proxy代理
  • 原文地址:https://www.cnblogs.com/mengyao/p/9235500.html
Copyright © 2011-2022 走看看