Flink函数实现
Flink中的 filter map flatmap
filter
public class BlanksFilterFunction<T> implements FilterFunction<Tuple2<String,String>> {
@Override
public boolean filter(Tuple2<String, String> tupleValue) throws Exception {
if (tupleValue != null) {
String colKey = tupleValue.f0;
String colValue = tupleValue.f1;
if (!"".equals(colKey) && !"".equals(colValue)) {
return true;
} else {
return false;
}
} else { return false; }
}
}
map
public class IdeSubscribeParseFunction implements MapFunction<String, TesteBean> {
@Override
public TesteBean map(String value) throws Exception {
if(StringUtils.isBlank(value)) {
return null;
}
JSONObject js = null;
try {
js = JSON.parseObject(value);
} catch (Exception e) {
return null;
}
if(js != null && !js.isEmpty()) {
//获取cookie
String jobName = js.getString("jobName");
TesteBean ideRecord = new TesteBean();
ideRecord.setjobName(jobName);
return ideRecord;
}
return null;
}
}
flatMap
/**
* Implements a string tokenizer that splits sentences into words as a user-defined FlatMapFunction.
*The function takes a line (String) and splits it into multiple pairs in the form of "(word,1)" ({ Tuple2<String,Integer>}).
*/
public static class SelectEnglishAndTokenizeFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private transient ObjectMapper jsonParser;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
if (jsonParser == null) {
jsonParser = new ObjectMapper();
}
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
boolean isEnglish = jsonNode.has("user") && jsonNode.get("user").has("lang") && jsonNode.get("user").get("lang").asText().equals("en");
boolean hasText = jsonNode.has("text");
if (isEnglish && hasText) {
// message of tweet
StringTokenizer tokenizer = new StringTokenizer(jsonNode.get("text").asText());
// split the message
while (tokenizer.hasMoreTokens()) {
String result = tokenizer.nextToken().replaceAll("\s*", "").toLowerCase();
if (!result.equals("")) {
out.collect(new Tuple2<>(result, 1));
}
}
}
}
}
getKey
public class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) {
return value.f0;
}
}
Spark的FlatMapFunction
// Convert each line of Array[Byte] to String, and split into words
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
@Override
public Iterator<String> call(byte[] line) {
String s = new String(line, StandardCharsets.UTF_8);
return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
}
});
public class TestMatchFlatFuntion implements FlatMapFunction<Row, Row> {
Map<String, TestBean> sysMap = null;
/**
* 用于传递变量
* @param useBroad
*/
public TestMatchFlatFuntion(Broadcast<Map<String, >> useBroad) {
sysMap = useBroad.getValue();
}
@Override
public Iterator<Row> call(Row montiorRow) throws Exception {
String system_cd = montiorRow.getAs("system").toString().toLowerCase();
String query_commond = montiorRow.getAs("sql");
String crowdid = montiorRow.getAs("crowdid");
TestBean syTestBean = sysMap.get(system_cd);
Set<TestBean> TestSet = null;
if( syTestBean !=null){ TestSet = syTestBean.parseCrowd(query_commond);}
List<Row> resultLs = new ArrayList<Row>();
if (TestSet != null && !TestSet.isEmpty()) {
for (TestBean parsedTest : TestSet) {
String Test_cd = parsedTest.getTest_cd();
Row row = RowFactory.create( system_cd, Test_cd );
resultLs.add(row);
}
}
return resultLs.iterator();
}
}
参考:
Flink 源码
Spark源码