zoukankan      html  css  js  c++  java
  • flink 1.11.2 学习笔记(5)-lambda表达式的使用问题

    flink的api,提供了流畅的链式编程写法,写起来行云流水,感受一下:

    SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
            //设置并行度1,方便观察输出
            .setParallelism(1)
            //添加kafka数据源
            .addSource(
                    new FlinkKafkaConsumer011<>(
                            SOURCE_TOPIC,
                            new SimpleStringSchema(),
                            props))
            //转变成pojo对象
            .map((MapFunction<String, WordCountPojo>) value -> {
                WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
                return pojo;
            })
            //设置watermark以及事件时间提取逻辑
            .assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                        @Override
                        public long extractTimestamp(WordCountPojo element) {
                            return element.eventTimestamp;
                        }
                    })
            //统计每个word的出现次数
            .flatMap(new FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>() {
                @Override
                public void flatMap(WordCountPojo value, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                    String word = value.word;
                    //获取每个统计窗口的时间(用于显示)
                    String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
                    if (word != null && word.trim().length() > 0) {
                        //收集(类似:map-reduce思路)
                        out.collect(new Tuple3<>(word.trim(), 1, windowTime));
                    }
                }
            })
            .keyBy(v -> v.f0)
            //按1分钟开窗(TumblingWindows)
            .timeWindow(Time.minutes(1))
            //允许数据延时10秒
            .allowedLateness(Time.seconds(10))
            //将word的count汇总
            .sum(1);
    

      

    如果idea环境,使用jdk1.8的话,可能会智能提示,让你把24行改与lambda表达式,看上去更清爽一些:

    SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
        .setParallelism(1)
        .addSource(
                new FlinkKafkaConsumer011<>(
                        SOURCE_TOPIC,
                        new SimpleStringSchema(),
                        props))
        .map((MapFunction<String, WordCountPojo>) value -> {
            WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
            return pojo;
        })
        .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(WordCountPojo element) {
                        return element.eventTimestamp;
                    }
                })
        .flatMap((FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>) (value, out) -> {
            //改成lambda写法
            String word = value.word;
            String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
            if (word != null && word.trim().length() > 0) {
                out.collect(new Tuple3<>(word.trim(), 1, windowTime));
            }
        })
        .keyBy(v -> v.f0)
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.seconds(10))
        .sum(1);
    

    逻辑完全没变,但是运行后,会遇到一个报错:

    Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.

    大致意思是,lambda写法无法提供足够的类型信息,无法推断出正确的类型,建议要么改成匿名类写法,要么用type information提供明细的类型信息。

    解决方法:

    SingleOutputStreamOperator<Tuple3<String, Integer, String>> counts = env
        .setParallelism(1)
        .addSource(
                new FlinkKafkaConsumer011<>(
                        SOURCE_TOPIC,
                        new SimpleStringSchema(),
                        props))
        .map((MapFunction<String, WordCountPojo>) value -> {
            WordCountPojo pojo = gson.fromJson(value, WordCountPojo.class);
            return pojo;
        })
        .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<WordCountPojo>(Time.milliseconds(200)) {
                    @Override
                    public long extractTimestamp(WordCountPojo element) {
                        return element.eventTimestamp;
                    }
                })
        .flatMap((FlatMapFunction<WordCountPojo, Tuple3<String, Integer, String>>) (value, out) -> {
            String word = value.word;
            String windowTime = sdf.format(new Date(TimeWindow.getWindowStartWithOffset(value.eventTimestamp, 0, 60 * 1000)));
            if (word != null && word.trim().length() > 0) {
                out.collect(new Tuple3<>(word.trim(), 1, windowTime));
            }
        })
        //明细指定返回类型
        .returns(((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, String.class)))
        .keyBy(0)
        .timeWindow(Time.minutes(1))
        .allowedLateness(Time.seconds(10))
        .sum(1);
    

    27行这里,明细指定返回类型,同时keyBy的写法,略为调整下,就能正常运行了。

    作者:菩提树下的杨过
    出处:http://yjmyzz.cnblogs.com
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    bootstrap基本用法
    Maven学习笔记(一)
    Tomcat的安装以及基本配置
    jQuery实现用户头像裁剪插件cropbox.js
    position的用法与心得
    ES6新特性学习(一)
    jQuery mobile 滑动打开面板
    vue-day05----自定义指令(directive)、render和template的区别、mixin混入、Vue.use()、Vue.extend()、Vue.filter()、vue中的数据流向
    我的一个React路由嵌套(多级路由),路由传参之旅
    vue04----watch、slot、响应式原理、set、vue脚手架(vue-cli)、单页面应用和多页面应用、单页面开发首屏加载过慢,白屏如何缓解
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/using-lambda-in-flink.html
Copyright © 2011-2022 走看看