Flink中的 filter map flatmap
public class BlanksFilterFunction<T> implements FilterFunction<Tuple2<String,String>> {
public boolean filter(Tuple2<String, String> tupleValue) throws Exception {
if (tupleValue != null) {
String colKey = tupleValue.f0;
String colValue = tupleValue.f1;
if (!"".equals(colKey) && !"".equals(colValue)) {
return true;
} else {
return false;
} else { return false; }
public class IdeSubscribeParseFunction implements MapFunction<String, TesteBean> {
public TesteBean map(String value) throws Exception {
if(StringUtils.isBlank(value)) {
return null;
JSONObject js = null;
try {
js = JSON.parseObject(value);
} catch (Exception e) {
return null;
if(js != null && !js.isEmpty()) {
String jobName = js.getString("jobName");
TesteBean ideRecord = new TesteBean();
return ideRecord;
return null;
* Implements a string tokenizer that splits sentences into words as a user-defined FlatMapFunction.
*The function takes a line (String) and splits it into multiple pairs in the form of "(word,1)" ({ Tuple2<String,Integer>}).
public static class SelectEnglishAndTokenizeFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private transient ObjectMapper jsonParser;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
if (jsonParser == null) {
jsonParser = new ObjectMapper();
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
boolean isEnglish = jsonNode.has("user") && jsonNode.get("user").has("lang") && jsonNode.get("user").get("lang").asText().equals("en");
boolean hasText = jsonNode.has("text");
if (isEnglish && hasText) {
// message of tweet
StringTokenizer tokenizer = new StringTokenizer(jsonNode.get("text").asText());
// split the message
while (tokenizer.hasMoreTokens()) {
String result = tokenizer.nextToken().replaceAll("\s*", "").toLowerCase();
if (!result.equals("")) {
out.collect(new Tuple2<>(result, 1));
public class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
public String getKey(Tuple2<String, Integer> value) {
return value.f0;
// Convert each line of Array[Byte] to String, and split into words
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
public Iterator<String> call(byte[] line) {
String s = new String(line, StandardCharsets.UTF_8);
return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
public class TestMatchFlatFuntion implements FlatMapFunction<Row, Row> {
Map<String, TestBean> sysMap = null;
* 用于传递变量
* @param useBroad
public TestMatchFlatFuntion(Broadcast<Map<String, >> useBroad) {
sysMap = useBroad.getValue();
public Iterator<Row> call(Row montiorRow) throws Exception {
String system_cd = montiorRow.getAs("system").toString().toLowerCase();
String query_commond = montiorRow.getAs("sql");
String crowdid = montiorRow.getAs("crowdid");
TestBean syTestBean = sysMap.get(system_cd);
Set<TestBean> TestSet = null;
if( syTestBean !=null){ TestSet = syTestBean.parseCrowd(query_commond);}
List<Row> resultLs = new ArrayList<Row>();
if (TestSet != null && !TestSet.isEmpty()) {
for (TestBean parsedTest : TestSet) {
String Test_cd = parsedTest.getTest_cd();
Row row = RowFactory.create( system_cd, Test_cd );
return resultLs.iterator();
Flink 源码