1、关键字:InvalidTypesException,'Collector' are missing , hints by using the returns(...) method
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
Otherwise the type has to be specified explicitly using type information
DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> { for (String word : value.split("\\s")) { out.collect(Tuple2.of(word, 1)); } });
修改:使用returns 函数(不好用),或者不要使用lambda表达式,匿名内部类也很香,读起来不费劲
SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String in, Collector out) throws Exception { for (String word : in.split("\\s+")) { out.collect(Tuple2.of(word, 1)); } } });
2、关键字 no timestamp marker,ProcessingTime,DataStream.assignTimestampsAndWatermarks(...)
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker).
Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
DataStream text = env.socketTextStream("", 9000, "\n"); //首先将字符串数据解析成单词和次数(使用元组类型Tuple2<String, Integer>表示),第一个字段是单词,第二个字段是次数,次数初始值都设置成1 DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String in, Collector out) throws Exception { for (String word : in.split("\\s")) { out.collect(Tuple2.of(word, 1)); } } }).keyBy((KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1);
3、关键字 Cannot resolve method 'aggregate(CountAgg, WindowResultFunction)'
DataStream<ItemViewCount> windowedData = pvData .keyBy(new KeySelector<UserBehavior, Long>() { @Override public Long getKey(UserBehavior userBehavior) throws Exception { return userBehavior.getItemId(); } }) .window(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(5))) .aggregate(new CountAgg(), new WindowResultFunction());
public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
public Long createAccumulator() {
return 0L;
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
public Long getResult(Long acc) {
return acc;
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
public class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple1<Long>, TimeWindow> {
* @param key 窗口的主键,即 itemId
* @param window 窗口
* @param aggregateResult 聚合函数的结果,即 count 值
* @param collector 输出类型为 ItemViewCount
* @throws Exception
public void apply(Tuple1<Long> key, TimeWindow window, Iterable<Long> aggregateResult, Collector<ItemViewCount> collector) throws Exception {
Long itemId = key.f0;
Long count = aggregateResult.iterator().next();
collector.collect(new ItemViewCount(itemId, window.getEnd(), count));
public class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Long, TimeWindow> { /** * @param key 窗口的主键,即 itemId * @param window 窗口 * @param aggregateResult 聚合函数的结果,即 count 值 * @param collector 输出类型为 ItemViewCount * @throws Exception */ @Override public void apply(Long key, TimeWindow window, Iterable<Long> aggregateResult, Collector<ItemViewCount> collector) throws Exception { Long itemId = key; Long count = aggregateResult.iterator().next(); collector.collect(new ItemViewCount(itemId, window.getEnd(), count)); } }
4、关键字 TypeExtractor TimestampedFileInputSplit POJO fields
20:48:25,426 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
20:48:25,431 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
The logs that you share do not show an error. The logs are on INFO level and no exception is thrown (at least not in the provided logs). The log entry just says that the class TimestampedFileInputSplit cannot be treated as a POJO. In general this message indicates that the performance is not optimal but in this particular case it is not a problem.
process(new TopNHotItems(3)); // 求点击量前3名的商品
红色波浪线提示can't resolve method process(xxx),TopNHotItems类源码如下:
public class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> { private final int topSize; public TopNHotItems(int topSize) { this.topSize = topSize; } /** * 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算 */ private ListState<ItemViewCount> itemState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 状态的注册 ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement( ItemViewCount input, Context context, Collector<String> collector) throws Exception { // 每条数据都保存到状态中 itemState.add(input); // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 获取收到的所有商品点击量 List<ItemViewCount> allItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { allItems.add(item); } // 提前清除状态中的数据,释放空间 itemState.clear(); // 按照点击量从大到小排序 allItems.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) (o2.viewCount - o1.viewCount); } }); // 将排名信息格式化成 String, 便于打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n"); for (int i = 0; i < topSize; i++) { ItemViewCount currentItem = allItems.get(i); // No1: 商品ID=12224 浏览量=2413 result.append("No").append(i).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 浏览量=").append(currentItem.viewCount) .append("\n"); } result.append("====================================\n\n"); out.collect(result.toString()); } }
解决方法:KeyedProcessFunction<Tuple, ItemViewCount, String>第一个泛型需要改为Long类型
6、关键字 kafka offset checkpoint
16:49:45,148 WARN org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] -
Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available.
This does not compromise Flink's checkpoint integrity.
//todo 待解决,应该是kafka参数配置问题
7、关键字:json classPath
Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>