zoukankan      html  css  js  c++  java
  • 使用flink SQL对接kafka 时,日志不报错,也没有数据输出,原因有哪几个方面

    本地调试么?确认一下下面的问题

    一般先调试sink表的 connector换print 打印一下:

    1. 是earliest还是latest

    2. auto.commit 是true还是false

    3. source改成 socket输入试试逻辑有没问题

    示例代码参考:

    CREATE TABLE t_stock_match_p_1(
      id VARCHAR, 
      stkcode INT,
      volume INT,
      matchtime BIGINT,
      ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')),
      WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
     ) WITH (
      'connector' = 'kafka-0.10',
      'topic' = 'stock_match_p_zyh',
      'scan.startup.mode' = 'latest-offset',
      'properties.group.id' = 'stock_match_p_zyh',
      'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
      'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer',
      'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
      'format' = 'csv',
      'csv.field-delimiter' = ','
    );
    
    CREATE TABLE t_stock_match_1 (
      stkcode int,
      pd TIMESTAMP,
      volume  INT 
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode;

    解决:

    当时kafka只有一个分区,但是并行度设置大于了分区数,这样有的任务中没有数据,这样水印一直是最小值,

    在网上看到这样一个案例后,将我的任务的并行度改成和分区数一致,Flink WebUI上水印值出来了,数据也能正常写入目的地。

  • 相关阅读:
    怎么看到数据库以前做过的日志?
    感觉很好的网站
    uview 滑动切换
    Flyweight享元(结构型模式)
    悟空,真的是空?
    Interpreter解释器(行为型模式)
    Proxy代理(结构型模式)
    [转]有一种爱叫索取
    Command命令(行为型模式)
    Composite组合(结构型模式)
  • 原文地址:https://www.cnblogs.com/-courage/p/14467617.html
Copyright © 2011-2022 走看看