zoukankan      html  css  js  c++  java
  • 每日一题 为了工作 2020 0510 第六十八题

    package com.swust.action;
    
    import com.alibaba.fastjson.JSONObject;
    import com.swust.constant.Constants;
    import com.swust.skynet.SelfDefineAccumulator;
    import com.swust.utils.StringUtils;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能就此止步!
     * @Function 检测路口状态
     *
     */
    public class CheckMonitorState {
        public static JavaPairRDD<Integer,String> checkMonitorState(JavaSparkContext jsc,
                                                                    SparkSession session,
                                                                    JavaPairRDD<String,String> startMonitorInfos,
                                                                    final long taskId,
                                                                    JSONObject taskParamsJsonObject,
                                                                    SelfDefineAccumulator  accumulator){
            //从monitor_camera_info标准表中查询出来每一个卡口对应的camera的数量
            String sqlText = "select * from monitor_camera_info";
            Dataset<Row> standDataFrame = session.sql(sqlText);
            JavaRDD<Row> standRdd = standDataFrame.toJavaRDD();
    
            //转换成k-v格式RDD
            JavaPairRDD<String, String> monitorRdd = standRdd.mapToPair(new PairFunction<Row, String, String>() {
                @Override
                public Tuple2<String, String> call(Row row) throws Exception {
    
                    String key = row.getString(0);
                    String value = row.getString(1);
                    Tuple2<String, String> tp = new Tuple2<>(key, value);
                    return tp;
                }
            });
            /**
             * 对每一个卡扣下面的信息进行统计,统计出来camera_count(这个卡扣下一共有多少个摄像头),camera_ids(这个卡扣下,所有的摄像头编号拼接成的字符串)
             * 返回:
             * 	("monitorId","cameraIds=xxx|cameraCount=xxx")
             * 例如:
             * 	("0008","cameraIds=02322,01213,03442|cameraCount=3")
             * 如何来统计?
             * 	1、按照monitor_id分组
             * 	2、使用mapToPair遍历,遍历的过程可以统计
             */
            JavaPairRDD<String, Iterable<String>> monitorGroupRdd = monitorRdd.groupByKey();
            JavaPairRDD<String, String> monitorGroupInfosRdd = monitorGroupRdd.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, String>() {
                @Override
                public Tuple2<String, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
                    String monitorId = tuple._1();
                    Iterator<String> cameraIterator = tuple._2().iterator();
                    int count = 0;
                    StringBuilder cameraIds = new StringBuilder();
    
                    while (cameraIterator.hasNext()) {
                        String next = cameraIterator.next();
                        count++;
                        cameraIds.append(next);
                    }
                    String cameraInfos = Constants.FIELD_CAMERA_IDS + "=" + cameraIds.toString().substring(1) + "|"
                            + Constants.FIELD_CAR_COUNT + "=" + count;
    
                    return new Tuple2<>(monitorId, cameraInfos);
                }
            });
            //左外连接两个RDD
            JavaPairRDD<String, Tuple2<String, Optional<String>>> resultRdd = monitorGroupInfosRdd.leftOuterJoin(startMonitorInfos);
            JavaPairRDD<Integer, String> res = resultRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Tuple2<String, Optional<String>>>>, Integer, String>() {
                @Override
                public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> iterator) throws Exception {
                    List<Tuple2<Integer, String>> list = new ArrayList<>();
                    while (iterator.hasNext()) {
                        Tuple2<String, Tuple2<String, Optional<String>>> tuple2 = iterator.next();
                        String monitorId = tuple2._1();
                        String standCameraInfos = tuple2._2()._1();
                        Optional<String> factCameraInfosOptional = tuple2._2()._2();
                        String factCameraInfo = "";
                        if (factCameraInfosOptional.isPresent()) {
                            factCameraInfo = factCameraInfosOptional.get();
                        } else {
                            String standCameraIds = StringUtils.
                                    getFieldFromConcatString(
                                            standCameraInfos, "\|", Constants.FIELD_CAMERA_IDS);
                            String abnoramlCameraCount = StringUtils.
                                    getFieldFromConcatString(
                                            standCameraInfos,"\|", Constants.FIELD_CAMERA_COUNT);
                            accumulator.add(
                                    Constants.FIELD_ABNORMAL_MONITOR_COUNT + "=1|"
                                            + Constants.FIELD_ABNORMAL_CAMERA_COUNT + "=" + abnoramlCameraCount + "|"
                                            + Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS + "=" + monitorId + ":" + standCameraIds
                            );
                            continue;
                        }
                        //实际摄像头个数
                        int factCameraCount = Integer.parseInt(StringUtils.
                                getFieldFromConcatString(
                                        factCameraInfo,"\|", Constants.FIELD_CAMERA_COUNT));
                        //标准摄像头个数
                        int standCameraCount = Integer.parseInt(StringUtils.
                                getFieldFromConcatString(
                                        standCameraInfos, "\|", Constants.FIELD_CAMERA_COUNT));
                        if (standCameraCount == factCameraCount) {
                            /**
                             * 1 正常路口的数量
                             * 2 异常路口的数量
                             * 3 正常通道个数
                             * 4 摄像头异常的个数
                             */
                            accumulator.add(Constants.FIELD_NORMAL_MONITOR_COUNT + "=1|" + Constants.FIELD_NORMAL_CAMERA_COUNT + "=" + factCameraCount);
                        } else {
                            //获取实际摄像头编号
                            String factCameraIds = StringUtils.getFieldFromConcatString(
                                    factCameraInfo, "\|", Constants.FIELD_CAMERA_IDS);
                            //获取标准摄像头编号
                            String standCameraIds = StringUtils.getFieldFromConcatString(
                                    standCameraInfos, "\|", Constants.FIELD_CAMERA_IDS);
    
                            List<String> factCameraIdList = Arrays.asList(factCameraIds.split(","));
                            List<String> standCameraIdList = Arrays.asList(standCameraIds.split(","));
                            StringBuilder abnormalCameraInfos = new StringBuilder();
                            int abnormalCameraCount = 0;
                            int normalCameraCount = 0;
                            for (String cameraId : standCameraIdList) {
                                if (!factCameraIdList.contains(cameraId)) {
                                    abnormalCameraCount++;
                                    abnormalCameraInfos.append("," + cameraId);
                                }
                            }
                            normalCameraCount = standCameraIdList.size() - abnormalCameraCount;
                            //往累加器中更新状态
                            accumulator.add(
                                    Constants.FIELD_ABNORMAL_MONITOR_COUNT + "=1|"
                                            + Constants.FIELD_NORMAL_CAMERA_COUNT + "=" + normalCameraCount + "|"
                                            + Constants.FIELD_ABNORMAL_CAMERA_COUNT + "=" + abnormalCameraCount + "|"
                                            + Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS + "=" + monitorId + ":" + abnormalCameraInfos.toString().substring(1));
                        }
                        //从实际数据拼接到字符串中获取车流量
                        int carCount = Integer.parseInt(
                                StringUtils.getFieldFromConcatString(
                                        factCameraInfo, "\|", Constants.FIELD_CAMERA_COUNT));
                        list.add(new Tuple2<>(carCount, monitorId));
                    }
                    return list.iterator();
                }
            });
        return res;
        }
    }
    

      

  • 相关阅读:
    Python学习笔记:pip使用技巧
    机器学习笔记:训练集、验证集和测试集区别
    MySQL学习笔记:3道面试题小测
    Python学习笔记:精确的四舍五入
    Hive学习笔记:列转行之collect_list/collect_set/concat_ws
    Python学习笔记:6个代码性能坏习惯
    爬虫学习笔记:打造自己的代理池
    Mysql学习笔记:5.5升级至8.0版本
    机器学习笔记:sklearn.model_selection.train_test_split切分训练、测试集
    HashSet其实就那么一回事儿之源码浅析
  • 原文地址:https://www.cnblogs.com/walxt/p/12864844.html
Copyright © 2011-2022 走看看