zoukankan      html  css  js  c++  java
  • 每日一题 为了工作 2020 0508 第六十六题

    package spark.action.factory;
    
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    
    import java.util.*;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能就此止步!
     * @Function 模拟数据并创建DataFrame
     *
     */
    public class MockData {
        public static void main(String[] args) {
            String master = "local";
            String appname = "dataFrame";
            SparkSession session = SparkSession.builder().master(master).appName(appname).getOrCreate();
            JavaSparkContext jsc = JavaSparkContext.fromSparkContext(session.sparkContext());
    
            List<Row> dataList = new ArrayList<>();
            Random random = new Random();
    
            String[] locations = new String[]{"鲁","京","冀","鄂","粤","沪","京","深","蒙","川"};
            String date = DateUtils.getTodayDate();
    
            for (int i=0 ; i < 3000 ; i++){
                //车牌号
                String car = locations[random.nextInt(locations.length)]+
                        (char)(65+random.nextInt(26))+
                         StringUtils.fullFillNumBites(5,String.valueOf(random.nextInt(10000)));
                //模拟24小时 yyyyMMdd  HH
                String baseActionTime = date+" "+
                         StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(24)));
                //模拟一辆车被多少摄像头拍摄
                for (int j=0; j< random.nextInt(300)+1 ;j++){
                    //每30个摄像头 小时+1
                    if (j % 30==0 && j!=0){
                        int tmp = Integer.parseInt(
                                baseActionTime.split(" ")[1]) + 1;
                        baseActionTime = date+ " "+
                                StringUtils.fullFillTwoBites(String.valueOf(tmp));
                    }
                    //模拟区域ID 1-8
                    String areaId = StringUtils.fullFillNumBites(2,
                            String.valueOf(random.nextInt(8)+1));
                    //模拟道路ID 1-50
                    String roadId = String.valueOf(random.nextInt(50)+1);
                    //模拟路口数
                    String monitorId = StringUtils.fullFillNumBites(4,
                            String.valueOf(random.nextInt(9)+1));
                    //模拟车辆被多少个摄像头拍摄
                    String cameraId = StringUtils.fullFillNumBites(5,
                            String.valueOf(random.nextInt(100000)+1));
                    //模拟经过此路口开始时间 ,如:2018-01-01 20:09:10
                    String actionTime = baseActionTime+
                            StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)))+
                            StringUtils.fullFillTwoBites(String.valueOf(random.nextInt(60)));
                    //模拟车速
                    String speed = String.valueOf(random.nextInt(260)+1);
                    //
                    Row row = RowFactory.create(date, monitorId, cameraId, car, actionTime, speed, roadId, areaId);
                    dataList.add(row);
                }
            }
    
            //将list序列化成row类型的javaRDD
            JavaRDD<Row> rowJavaRDD = jsc.parallelize(dataList);
            //动态创建schema方式创建DataFrame
            StructType structType = DataTypes.createStructType(Arrays.asList(
                    DataTypes.createStructField("date", DataTypes.StringType, true),
                    DataTypes.createStructField("monitor_id", DataTypes.StringType, true),
                    DataTypes.createStructField("camera_id", DataTypes.StringType, true),
                    DataTypes.createStructField("car", DataTypes.StringType, true),
                    DataTypes.createStructField("action_time", DataTypes.StringType, true),
                    DataTypes.createStructField("speed", DataTypes.StringType, true),
                    DataTypes.createStructField("road_id", DataTypes.StringType, true),
                    DataTypes.createStructField("area_id", DataTypes.StringType, true)
            ));
            //创建DataFrame
            Dataset<Row> dataFrame = session.createDataFrame(rowJavaRDD, structType);
    
            //打印数据
            System.err.println("车辆信息数据");
            dataFrame.show(50);
            dataFrame.registerTempTable("monitor_flow_action");
            //生成路口号与摄像头的对应表
            Map<String,Set<String>> monitorAndCameras = new HashMap<>();
    
            int index = 0;
            for (Row row : dataList){
    
                String monitorId = row.getString(1);
                Set<String> sets = monitorAndCameras.get(monitorId);
                if (sets == null){
                    sets = new HashSet<>();
                    monitorAndCameras.put(monitorId,sets);
                }
                index ++;
                if (index % 1000 == 0){
                    sets.add(StringUtils.fullFillNumBites(5,
                            String.valueOf(random.nextInt(100000))));
                }
                String cameraId = row.getString(2);
                sets.add(cameraId);
            }
            //创建路口号与摄像头对应的dataFrame
            dataList.clear();
            Set<Map.Entry<String, Set<String>>> entrySet = monitorAndCameras.entrySet();
            for (Map.Entry<String, Set<String>> entry:entrySet){
                String monitorId = entry.getKey();
                Set<String> cameraIds = entry.getValue();
                Row row = null;
                for (String cameraId : cameraIds){
                    row = RowFactory.create(monitorId,cameraId);
                    dataList.add(row);
                }
            }
            //动态创建schema
            StructType structTypeTwo = DataTypes.createStructType(Arrays.asList(
                    DataTypes.createStructField("monitor_id", DataTypes.StringType, true),
                    DataTypes.createStructField("camera_id", DataTypes.StringType, true)
            ));
            JavaRDD<Row> parallelize = jsc.parallelize(dataList);
            Dataset<Row> dataFrameTwo = session.createDataFrame(parallelize, structTypeTwo);
            dataFrameTwo.registerTempTable("monitor_camera_info");
            System.err.println("路口与摄像头");
            dataFrameTwo.show(50);
        }
    }
    

      

  • 相关阅读:
    多线程的多核分配问题验证
    C++C#联合调试
    UNITY 手游(安卓)如何使用C/C++代码
    关于C#内存释放的BUG?
    日期转换
    深度剖析目标检测算法YOLOV4
    2. 使用Shell能做什么
    【Jmeter】之进行接口批量压力测试
    MongoDB-ChangeStream使用笔记
    Mongo-BI(bi-connector)配置使用笔记
  • 原文地址:https://www.cnblogs.com/walxt/p/12853054.html
Copyright © 2011-2022 走看看