zoukankan      html  css  js  c++  java
  • Flink开发_Flink函数实现

    Flink函数实现

     Flink中的 filter map flatmap
    

    filter

     public class BlanksFilterFunction<T> implements FilterFunction<Tuple2<String,String>> {
     @Override
      public boolean filter(Tuple2<String, String> tupleValue) throws Exception {
        if (tupleValue != null) {
            String colKey   = tupleValue.f0;
            String colValue = tupleValue.f1;
            if (!"".equals(colKey) && !"".equals(colValue)) {
                return true;
            } else {
                return false;
            }
        } else { return false;  }
        }
    }
    

    map

    public class IdeSubscribeParseFunction implements MapFunction<String, TesteBean> {
        @Override
        public TesteBean map(String value) throws Exception {
            if(StringUtils.isBlank(value)) {
                return null;
            }
            JSONObject js = null;
            try {
                js = JSON.parseObject(value);
            } catch (Exception e) {
                return null;
            }
            if(js != null && !js.isEmpty()) {
                //获取cookie
                String jobName = js.getString("jobName");
    			TesteBean ideRecord = new TesteBean();
                ideRecord.setjobName(jobName);
                return ideRecord;
            }
            return null;
        }
    }
    

    flatMap

    /**
     * Implements a string tokenizer that splits sentences into words as a user-defined FlatMapFunction. 
     *The function takes a line (String) and splits it into multiple pairs in the form of "(word,1)" ({ Tuple2<String,Integer>}).
     */
    public static class SelectEnglishAndTokenizeFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    	private transient ObjectMapper jsonParser;
    	@Override
    	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
    		if (jsonParser == null) {
    			jsonParser = new ObjectMapper();
    		}
    		JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
    		boolean isEnglish = jsonNode.has("user") && jsonNode.get("user").has("lang") && jsonNode.get("user").get("lang").asText().equals("en");
    		boolean hasText = jsonNode.has("text");
    		if (isEnglish && hasText) {
    			// message of tweet
    			StringTokenizer tokenizer = new StringTokenizer(jsonNode.get("text").asText());
    
    			// split the message
    			while (tokenizer.hasMoreTokens()) {
    				String result = tokenizer.nextToken().replaceAll("\s*", "").toLowerCase();
    				if (!result.equals("")) {
    					out.collect(new Tuple2<>(result, 1));
    				}
    			}
    		 }
    	   }
        }
    

    getKey

    public  class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
    	@Override
    	public String getKey(Tuple2<String, Integer> value) {
    		return value.f0;
    	}
     }
    

    Spark的FlatMapFunction

     // Convert each line of Array[Byte] to String, and split into words
       JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
         @Override
         public Iterator<String> call(byte[] line) {
           String s = new String(line, StandardCharsets.UTF_8);
           return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
         }
       });
    
    public class TestMatchFlatFuntion implements FlatMapFunction<Row, Row> {
        Map<String, TestBean> sysMap = null;
        /**
         * 用于传递变量
         * @param useBroad
         */
        public TestMatchFlatFuntion(Broadcast<Map<String,     >> useBroad) {
            sysMap = useBroad.getValue();
        }
        @Override
        public Iterator<Row> call(Row montiorRow) throws Exception {
    
            String system_cd = montiorRow.getAs("system").toString().toLowerCase();
            String query_commond = montiorRow.getAs("sql");
            String crowdid = montiorRow.getAs("crowdid");
            TestBean syTestBean = sysMap.get(system_cd);
            Set<TestBean> TestSet = null;
            if( syTestBean !=null){  TestSet = syTestBean.parseCrowd(query_commond);}
            List<Row> resultLs = new ArrayList<Row>();
            if (TestSet != null && !TestSet.isEmpty()) {
                for (TestBean parsedTest : TestSet) {
                    String Test_cd = parsedTest.getTest_cd();
                    Row row = RowFactory.create(  system_cd, Test_cd );
                    resultLs.add(row);
                }
            }
            return resultLs.iterator();
        }
    
         }
    

    参考:

    Flink 源码
    Spark源码
  • 相关阅读:
    C# 各种数据类型的最大值和最小值常数
    使用EntityFramework6连接MySql数据库(db first方式)
    apache ignite系列(八):问题汇总
    apache ignite系列(六): 服务网格
    golang实现get和post请求的服务端和客户端
    python+selenium调用chrome打开网址获取内容
    spring-boot集成spark并使用spark-sql
    apache ignite系列(五):分布式计算
    sqoop导oracle数据到hive中并动态分区
    python使用cx_Oracle连接oracle
  • 原文地址:https://www.cnblogs.com/ytwang/p/14076149.html
Copyright © 2011-2022 走看看