zoukankan      html  css  js  c++  java
  • 关于30大洋看的一篇帖子(为什么我的Flink任务正常运行,UI上却不显示接收和发送的数据条数呢?)

    最近发现有好几个同学问我这个问题,为什么我的Flink任务正常运行,数据也可以打印,而且都保存到数据库了,但是UI上面却不显示数据接收和发送的条数,我都快被问疯了,今天就给大家详细说一下这个小问题.

    首先先来复现一下这个问题,我们先看下面的代码(只是一部分代码)

    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      env.setParallelism(1)
      val ds = CommonUtils.getDataStream(env = env)
        .name("kafka-source")
        .filter(_.nonEmpty)
        .print()
      env.execute()
    }

    代码非常的简单,没有任何的逻辑,从kafka读取数据,只是做了一个filter,然后直接print,我这里就不写sink,但是效果和直接sink是一样的,把这个任务提交到集群运行,效果如下图所示:

     

     我已经向kafka写入数据了,这个地方怎么不显示呢?然后我们来看下tm的stdout,因为我代码里面直接print了,看下面的图

    很明显数据打印了,说明我们程序是没有问题的,那问题在哪呢?其实并不是你的程序有问题,也不是Flink的UI有bug,是因为默认情况下Flink开启了operator chain,所以source filter print chain在了起就是在一个DAG里面,没有向下游发送数据,所以显示都为0,关于operator chain前面的文章已经说过了,还不了解的可以去查一下.那怎么能让他显示呢?
    第一种方法,就是从metric里面看,可以自己添加两个metric,如下图所示

     我添加了两个metric,一个filter的输入和filter的输出,可以看到都是100条数据.那如果想不添加metric,在ui上就能显示呢?这个时候就需要打断operator chain,具体有三种写法,如下代码所示

    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      env.setParallelism(1)
      val ds = CommonUtils.getDataStream(env = env)
        .name("kafka-source")
        .filter(_.nonEmpty)
        .startNewChain()
        .disableChaining()
        .setParallelism(2)
        .print()
      env.execute()
    }

    这三种写法都可以达到打断chain的目的,有什么区别呢?startNewChain和disableChaining没有实质性的区别,他俩都会打断chain,但是不会改变算子的并发度,setParallelism和前面的算子并发度,设置的不一致自然就打断chain了.我们就演示一下第一个

    可以看到上面的DAG图显示了两个,并且下面可以看到接收和发送的数据了,剩下的2种方法同样可以达到这样的效果,大家可以尝试一下.

    Flink的operator chain是有利于提高程序的性能的,建议使用,如果想要打断operator chain又不想改变并发,就用前两种方法,如果想要改变并发就用第三张方法

    30大洋 我尽力了

  • 相关阅读:
    Windows下Yarn安装与使用
    论文阅读Graph Convolutional Matrix Completion
    如何快速查询中科院JCR分区和汤森路透JCR分区
    Implement GAN from scratch
    PyCharm将main.py解析成text文件的解决方法
    理解PyTorch的自动微分机制
    mcast_set_if函数
    mcast_get_if函数
    mcast_unblock_source函数
    mcast_block_source函数
  • 原文地址:https://www.cnblogs.com/gxgd/p/12661753.html
Copyright © 2011-2022 走看看