zoukankan      html  css  js  c++  java
  • 100flink 报错排查整理

    1、关键字:InvalidTypesException,'Collector' are missing , hints by using the returns(...) method

    org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information

     报这个错是因为泛型的类型擦除,根据你写的lambda表达式提供的类型信息不足以让java自动去返回你要的类型,可以用return()函数来解决

    报错的代码如下:

     DataStream<Tuple2<String, Integer>> wordCounts = text
                    .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    });

    修改:使用returns 函数(不好用),或者不要使用lambda表达式,匿名内部类也很香,读起来不费劲

    SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)

    修改后:

    DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String in, Collector out) throws Exception {
                    for (String word : in.split("\\s+")) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            });

    2、关键字 no timestamp marker,ProcessingTime,DataStream.assignTimestampsAndWatermarks(...)

    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker).
    Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

    这个是说提供的source数据里没有timestamp字段,但是却用的eventTime作为时间处理,需要改为processingTime或指定timestampAndWatermarks

    报错代码:

    DataStream text = env.socketTextStream("10.192.78.17", 9000, "\n");
            //首先将字符串数据解析成单词和次数(使用元组类型Tuple2<String, Integer>表示),第一个字段是单词,第二个字段是次数,次数初始值都设置成1
            DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String in, Collector out) throws Exception {
                    for (String word : in.split("\\s")) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            }).keyBy((KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0)
              .window(TumblingEventTimeWindows.of(Time.seconds(5)))
              .sum(1);

    解决:将TumblingEventTimeWindows改为TumblingProcessingTimeWindows(具体还是看业务)

    3、关键字 Cannot resolve method 'aggregate(CountAgg, WindowResultFunction)'

    报错代码:.aggregate函数位置

    DataStream<ItemViewCount> windowedData = pvData
                    .keyBy(new KeySelector<UserBehavior, Long>() {
                        @Override
                        public Long getKey(UserBehavior userBehavior) throws Exception {
                            return userBehavior.getItemId();
                        }
                    })
                    .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(5)))
                    .aggregate(new CountAgg(), new WindowResultFunction());
    public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

    @Override
    public Long createAccumulator() {
    return 0L;
    }

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {
    return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {
    return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
    return acc1 + acc2;
    }
    }

    public class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple1<Long>, TimeWindow> {

    /**
    * @param key 窗口的主键,即 itemId
    * @param window 窗口
    * @param aggregateResult 聚合函数的结果,即 count 值
    * @param collector 输出类型为 ItemViewCount
    * @throws Exception
    */
    @Override
    public void apply(Tuple1<Long> key, TimeWindow window, Iterable<Long> aggregateResult, Collector<ItemViewCount> collector) throws Exception {
    Long itemId = key.f0;
    Long count = aggregateResult.iterator().next();
    collector.collect(new ItemViewCount(itemId, window.getEnd(), count));
    }

    }

     这个报错提示很无语,lambda表达式有时候看不出是啥问题,最后查到问题是类型指定不对,WindowResultFunction的第三个类型Tuple1<Long>改为Long即可

    public class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Long, TimeWindow> {
    
        /**
         * @param key             窗口的主键,即 itemId
         * @param window          窗口
         * @param aggregateResult 聚合函数的结果,即 count 值
         * @param collector       输出类型为 ItemViewCount
         * @throws Exception
         */
        @Override
        public void apply(Long key, TimeWindow window, Iterable<Long> aggregateResult, Collector<ItemViewCount> collector) throws Exception {
            Long itemId = key;
            Long count = aggregateResult.iterator().next();
            collector.collect(new ItemViewCount(itemId, window.getEnd(), count));
        }
    
    }

     4、关键字 TypeExtractor TimestampedFileInputSplit  POJO fields

    20:48:25,426 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
    20:48:25,431 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

    这个不是error,参考stackoverflow的一个回复:

    The logs that you share do not show an error. The logs are on INFO level and no exception is thrown (at least not in the provided logs).
    
    The log entry just says that the class TimestampedFileInputSplit cannot be treated as a POJO. In general this message indicates that the performance is not optimal but in this particular case it is not a problem.

     5、关键字:和第三个差不多,泛型问题引起

    process(new TopNHotItems(3));  // 求点击量前3名的商品

    红色波浪线提示can't resolve method process(xxx),TopNHotItems类源码如下:

    public class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
    
        private final int topSize;
    
        public TopNHotItems(int topSize) {
            this.topSize = topSize;
        }
    
        /**
         * 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
         */
        private ListState<ItemViewCount> itemState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 状态的注册
            ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                    "itemState-state",
                    ItemViewCount.class);
            itemState = getRuntimeContext().getListState(itemsStateDesc);
        }
    
        @Override
        public void processElement(
                ItemViewCount input,
                Context context,
                Collector<String> collector) throws Exception {
    
            // 每条数据都保存到状态中
            itemState.add(input);
            // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
            context.timerService().registerEventTimeTimer(input.windowEnd + 1);
        }
    
        @Override
        public void onTimer(
                long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 获取收到的所有商品点击量
            List<ItemViewCount> allItems = new ArrayList<>();
            for (ItemViewCount item : itemState.get()) {
                allItems.add(item);
            }
            // 提前清除状态中的数据,释放空间
            itemState.clear();
            // 按照点击量从大到小排序
            allItems.sort(new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return (int) (o2.viewCount - o1.viewCount);
                }
            });
            // 将排名信息格式化成 String, 便于打印
            StringBuilder result = new StringBuilder();
            result.append("====================================\n");
            result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n");
            for (int i = 0; i < topSize; i++) {
                ItemViewCount currentItem = allItems.get(i);
                // No1:  商品ID=12224  浏览量=2413
                result.append("No").append(i).append(":")
                        .append("  商品ID=").append(currentItem.itemId)
                        .append("  浏览量=").append(currentItem.viewCount)
                        .append("\n");
            }
            result.append("====================================\n\n");
    
            out.collect(result.toString());
        }
    }

    解决方法:KeyedProcessFunction<Tuple, ItemViewCount, String>第一个泛型需要改为Long类型


    6、关键字  kafka offset checkpoint

    16:49:45,148 WARN  org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - 
    Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available.
    This does not compromise Flink's checkpoint integrity.

    //todo  待解决,应该是kafka参数配置问题

    7、关键字:json classPath

    Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath
    原因:缺少flink配套的json包,加上这个jar
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
  • 相关阅读:
    单元测试多租户数据库提供商
    在ASP.NET Core上实施每个租户策略的数据库
    再起航,我的学习笔记之JavaScript设计模式30(简单模板模式)
    再起航,我的学习笔记之JavaScript设计模式29(节流模式)
    笨鸟先飞之ASP.NET MVC系列之过滤器(02授权过滤器)
    再起航,我的学习笔记之JavaScript设计模式28(委托模式)
    笨鸟先飞之ASP.NET MVC系列之过滤器(01过滤器简介)
    再起航,我的学习笔记之JavaScript设计模式27(链模式)
    再起航,我的学习笔记之JavaScript设计模式26(解释器模式)
    再起航,我的学习笔记之JavaScript设计模式25(迭代器模式)
  • 原文地址:https://www.cnblogs.com/yb38156/p/15545379.html
Copyright © 2011-2022 走看看