package com.swust.skynet;
import com.swust.constant.Constants;
import com.swust.utils.StringUtils;
import org.apache.spark.util.AccumulatorV2;
/**
* In String
* Out String
* @author 雪瞳
* @Slogan 时钟尚且前行,人怎能就此止步!
* @Function 自定义累加器
*
*/
public class SelfDefineAccumulator extends AccumulatorV2<String,String>{
String returnResult = "";
@Override
public boolean isZero() {
return "normalMonitorCount=0|normalCameraCount=0|abnormalMonitorCount=0|abnormalCameraCount=0|abnormalMonitorCameraInfos= ".equals(returnResult);
}
@Override
public AccumulatorV2<String, String> copy() {
SelfDefineAccumulator accumulator = new SelfDefineAccumulator();
accumulator.returnResult = this.returnResult;
return accumulator;
}
/**
* 每个分区的初始值
*/
@Override
public void reset() {
this.returnResult = Constants.FIELD_NORMAL_MONITOR_COUNT +"=0|"
+Constants.FIELD_NORMAL_CAMERA_COUNT +"=0|"
+Constants.FIELD_ABNORMAL_MONITOR_COUNT +"=0|"
+Constants.FIELD_ABNORMAL_CAMERA_COUNT +"=0|"
+Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS+"= ";
}
/**
* 每个分区会拿着 reset 初始化的值 ,在各自的分区内相加
* @param v
*/
@Override
public void add(String v) {
returnResult = myAdd(returnResult,v);
}
/**
* 每个分区最终的结果和初始值 returnResult="" 做累加
* @param other
*/
@Override
public void merge(AccumulatorV2<String, String> other) {
SelfDefineAccumulator accumulator = (SelfDefineAccumulator) other;
returnResult = myAdd(returnResult,accumulator.returnResult);
}
@Override
public String value() {
return this.returnResult;
}
/**
* 自定义累加规则
* @param v1
* @param v2
* @return
*/
private String myAdd(String v1,String v2){
if (StringUtils.isEmpty(v1)){
return v2;
}
String[] valueArray = v2.split("\|");
for (String string:valueArray){
String regularExpression = "=";
String[] fieldAndValueArray = string.split(regularExpression);
String field = fieldAndValueArray[0];
String value = fieldAndValueArray[1];
String oldValue = StringUtils.getFieldFromConcatString(v1,"\|",field);
if (oldValue != null){
//将非String类型的数据单独取出
if (Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS.equals(field)){
if (value.startsWith(" ~")){
value = value.substring(2);
}
v1 = StringUtils.setFieldConcatString(v1,"\|",field,oldValue+" ~"+value);
}else {
//其余都是int类型 直接加减即可
int newValue = Integer.parseInt(oldValue)+Integer.parseInt(value);
v1 = StringUtils.setFieldConcatString(v1,"\|",field,String.valueOf(newValue));
}
}
}
return v1;
}
}