zoukankan      html  css  js  c++  java
  • flink使用总结

    一、flink 并行度:

    在flink中,每个算子是一个线程,source、 filter、 flatMap 、map、 sink 这5算子都是独立的线程。在4个层面上可以设置并行度,算子层级的优先级最高。


    •Operator Level(算子层次)

    •Execution Environment Level(执行环境层次)

    •Client Level(客户端层次)

    •System Level(系统层次)

    Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例。一般情况下你的slot数是你每个节点的cpu的核数。

    TaskManager为子节点,solt为子节点上独立的进程,类似storm的worker进程。

    启动一个job的话,如果并行度为1,那么所有算子线程都运行在一个节点的一个slot上, 如果单独给flatMap算子设置并行度为4,那么flatMap线程会并行运行在4个slot上。

    flink 同一个算子的 不同task不可以共享slot,即一个算子的并行度为2,它一定运行在两个slot上, slot是内存不共享的。


    flink 两个连续的算子并行度一样的话,flink会把两个算子 放入一个task中,同一个task 是一个线程,减少了线程切换带来的开销。



    storm的并行和flink的并行有一点不同,就是storm在worker进程级别设置并行度的时候,会将不同的线程 均分到不同的进程中,如果有两个子节点那么两个进程会优先分到两个节点上,如果只有一个子节点,那么两个进程会运行在同一个节点上。flink设置并行度的时候,先保证同一个slot里有全部的算子,同一个算子的其他并行算子才会分到其他slot中。如果有两个节点会优先分配在两个节点的slot上,如果有一个节点会在同一个节点下分配到两个slot上。

     二、flink环境启停及job任务提交与取消

    注意:每次替换了jar包 都要停了flink job和flink环境然后再重启:

    1)停止flink job:

    在bin目录执行:./flink list 查看正在运行的flink任务的jobid

    ------------------ Running/Restarting Jobs -------------------
    03.01.2020 16:24:30 : 4de78031c1c99110328d88fbdf2384c3 :   Flink Application (RUNNING)

    然后执行: ./flink cancel 4de78031c1c99110328d88fbdf2384c3


    2)停止和重启flink环境的命令(在bin目录下):
    ./stop-cluster.sh

    ./start-clush.sh

    3)启动job
    ./flink run --class com.najing.FlinkAppStart  xxx/xxx.jar


    三、flink kafka消费积压问题解决思路
    1)不要把所有topic都放到一个job中,这样会导致一个消费者消费所有主题数据,会导致积压,要根据topic进行job拆分。
    2)给kafka主题设置分区同时给flink的consumer设置并行度,使分区数=并行度,让3个消费者同时消费三个分区。
    3)如果上面两种方式都设置后,还是积压,一定是在source-> filter->flatMap->map->sink 算子处理流程中有个别算子处理速度非常慢,导致积压。
    因为不同的算子是不同的线程,当中间有线程处理慢以后,前面的线程也会被它阻塞掉,导致拉取环节的线程也变慢,最终导致积压。这就要找出处理慢的算子,进行针对性的优化。

    kafka的自动提交,会在拉取后不管有没有没成功处理,到时间就自动提交。

    四、集成springboot的flink,job提交后spring环境启动方式

    flink job提交后其实是把各个算子分配到不同节点运行的,所以要想在算子里使用spring的bean,必须在算子运行前拉起spring环境,在各个算子的open()方法里要拉起springboot 环境。

    一个节点上即使有多个flot,只需要拉起一个spring进程即可,各个flot和算子共用这一个环境,为了避免在同一节点上多次启动spring环境导致启动报错,须在启动spring环境的方法里做判断,如果已经启动了或则正在启动,则不再启动。

    理论上各个算子都要拉起下spring环境,避免出错,但如果环境只有两个节点,且算子的最大并行度也是2,只在并行度最大的算子里拉起spring环境也可以保证job正常运行,但这样有启动报错风险。

  • 相关阅读:
    io学习
    asp.net文件上传进度条研究
    asp.net页面中的Console.WriteLine结果如何查看
    谨慎跟随初始目的不被关联问题带偏
    android 按钮特效 波纹 Android button effects ripple
    安卓工作室 日志设置
    安卓工作室 文件浏览器 android studio File browser
    一个新的Android Studio 2.3.3可以在稳定的频道中使用。A new Android Studio 2.3.3 is available in the stable channel.
    新巴巴运动网上商城 项目 快速搭建 教程 The new babar sports online mall project quickly builds a tutorial
    码云,git使用 教程-便签
  • 原文地址:https://www.cnblogs.com/luckyna/p/13086143.html
Copyright © 2011-2022 走看看