package com.swust.action;
import com.alibaba.fastjson.JSONObject;
import com.swust.constant.Constants;
import com.swust.skynet.SelfDefineAccumulator;
import com.swust.utils.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
*
* @author 雪瞳
* @Slogan 时钟尚且前行,人怎能就此止步!
* @Function 检测路口状态
*
*/
public class CheckMonitorState {
public static JavaPairRDD<Integer,String> checkMonitorState(JavaSparkContext jsc,
SparkSession session,
JavaPairRDD<String,String> startMonitorInfos,
final long taskId,
JSONObject taskParamsJsonObject,
SelfDefineAccumulator accumulator){
//从monitor_camera_info标准表中查询出来每一个卡口对应的camera的数量
String sqlText = "select * from monitor_camera_info";
Dataset<Row> standDataFrame = session.sql(sqlText);
JavaRDD<Row> standRdd = standDataFrame.toJavaRDD();
//转换成k-v格式RDD
JavaPairRDD<String, String> monitorRdd = standRdd.mapToPair(new PairFunction<Row, String, String>() {
@Override
public Tuple2<String, String> call(Row row) throws Exception {
String key = row.getString(0);
String value = row.getString(1);
Tuple2<String, String> tp = new Tuple2<>(key, value);
return tp;
}
});
/**
* 对每一个卡扣下面的信息进行统计,统计出来camera_count(这个卡扣下一共有多少个摄像头),camera_ids(这个卡扣下,所有的摄像头编号拼接成的字符串)
* 返回:
* ("monitorId","cameraIds=xxx|cameraCount=xxx")
* 例如:
* ("0008","cameraIds=02322,01213,03442|cameraCount=3")
* 如何来统计?
* 1、按照monitor_id分组
* 2、使用mapToPair遍历,遍历的过程可以统计
*/
JavaPairRDD<String, Iterable<String>> monitorGroupRdd = monitorRdd.groupByKey();
JavaPairRDD<String, String> monitorGroupInfosRdd = monitorGroupRdd.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
String monitorId = tuple._1();
Iterator<String> cameraIterator = tuple._2().iterator();
int count = 0;
StringBuilder cameraIds = new StringBuilder();
while (cameraIterator.hasNext()) {
String next = cameraIterator.next();
count++;
cameraIds.append(next);
}
String cameraInfos = Constants.FIELD_CAMERA_IDS + "=" + cameraIds.toString().substring(1) + "|"
+ Constants.FIELD_CAR_COUNT + "=" + count;
return new Tuple2<>(monitorId, cameraInfos);
}
});
//左外连接两个RDD
JavaPairRDD<String, Tuple2<String, Optional<String>>> resultRdd = monitorGroupInfosRdd.leftOuterJoin(startMonitorInfos);
JavaPairRDD<Integer, String> res = resultRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<String, Tuple2<String, Optional<String>>>>, Integer, String>() {
@Override
public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> iterator) throws Exception {
List<Tuple2<Integer, String>> list = new ArrayList<>();
while (iterator.hasNext()) {
Tuple2<String, Tuple2<String, Optional<String>>> tuple2 = iterator.next();
String monitorId = tuple2._1();
String standCameraInfos = tuple2._2()._1();
Optional<String> factCameraInfosOptional = tuple2._2()._2();
String factCameraInfo = "";
if (factCameraInfosOptional.isPresent()) {
factCameraInfo = factCameraInfosOptional.get();
} else {
String standCameraIds = StringUtils.
getFieldFromConcatString(
standCameraInfos, "\|", Constants.FIELD_CAMERA_IDS);
String abnoramlCameraCount = StringUtils.
getFieldFromConcatString(
standCameraInfos,"\|", Constants.FIELD_CAMERA_COUNT);
accumulator.add(
Constants.FIELD_ABNORMAL_MONITOR_COUNT + "=1|"
+ Constants.FIELD_ABNORMAL_CAMERA_COUNT + "=" + abnoramlCameraCount + "|"
+ Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS + "=" + monitorId + ":" + standCameraIds
);
continue;
}
//实际摄像头个数
int factCameraCount = Integer.parseInt(StringUtils.
getFieldFromConcatString(
factCameraInfo,"\|", Constants.FIELD_CAMERA_COUNT));
//标准摄像头个数
int standCameraCount = Integer.parseInt(StringUtils.
getFieldFromConcatString(
standCameraInfos, "\|", Constants.FIELD_CAMERA_COUNT));
if (standCameraCount == factCameraCount) {
/**
* 1 正常路口的数量
* 2 异常路口的数量
* 3 正常通道个数
* 4 摄像头异常的个数
*/
accumulator.add(Constants.FIELD_NORMAL_MONITOR_COUNT + "=1|" + Constants.FIELD_NORMAL_CAMERA_COUNT + "=" + factCameraCount);
} else {
//获取实际摄像头编号
String factCameraIds = StringUtils.getFieldFromConcatString(
factCameraInfo, "\|", Constants.FIELD_CAMERA_IDS);
//获取标准摄像头编号
String standCameraIds = StringUtils.getFieldFromConcatString(
standCameraInfos, "\|", Constants.FIELD_CAMERA_IDS);
List<String> factCameraIdList = Arrays.asList(factCameraIds.split(","));
List<String> standCameraIdList = Arrays.asList(standCameraIds.split(","));
StringBuilder abnormalCameraInfos = new StringBuilder();
int abnormalCameraCount = 0;
int normalCameraCount = 0;
for (String cameraId : standCameraIdList) {
if (!factCameraIdList.contains(cameraId)) {
abnormalCameraCount++;
abnormalCameraInfos.append("," + cameraId);
}
}
normalCameraCount = standCameraIdList.size() - abnormalCameraCount;
//往累加器中更新状态
accumulator.add(
Constants.FIELD_ABNORMAL_MONITOR_COUNT + "=1|"
+ Constants.FIELD_NORMAL_CAMERA_COUNT + "=" + normalCameraCount + "|"
+ Constants.FIELD_ABNORMAL_CAMERA_COUNT + "=" + abnormalCameraCount + "|"
+ Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS + "=" + monitorId + ":" + abnormalCameraInfos.toString().substring(1));
}
//从实际数据拼接到字符串中获取车流量
int carCount = Integer.parseInt(
StringUtils.getFieldFromConcatString(
factCameraInfo, "\|", Constants.FIELD_CAMERA_COUNT));
list.add(new Tuple2<>(carCount, monitorId));
}
return list.iterator();
}
});
return res;
}
}