zoukankan      html  css  js  c++  java
  • flink使用总结

    一、flink 并行度:

    在flink中,每个算子是一个线程,source、 filter、 flatMap 、map、 sink 这5算子都是独立的线程。在4个层面上可以设置并行度,算子层级的优先级最高。


    •Operator Level(算子层次)

    •Execution Environment Level(执行环境层次)

    •Client Level(客户端层次)

    •System Level(系统层次)

    Flink的每个TaskManager为集群提供solt。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例。一般情况下你的slot数是你每个节点的cpu的核数。

    TaskManager为子节点,solt为子节点上独立的进程,类似storm的worker进程。

    启动一个job的话,如果并行度为1,那么所有算子线程都运行在一个节点的一个slot上, 如果单独给flatMap算子设置并行度为4,那么flatMap线程会并行运行在4个slot上。

    flink 同一个算子的 不同task不可以共享slot,即一个算子的并行度为2,它一定运行在两个slot上, slot是内存不共享的。


    flink 两个连续的算子并行度一样的话,flink会把两个算子 放入一个task中,同一个task 是一个线程,减少了线程切换带来的开销。



    storm的并行和flink的并行有一点不同,就是storm在worker进程级别设置并行度的时候,会将不同的线程 均分到不同的进程中,如果有两个子节点那么两个进程会优先分到两个节点上,如果只有一个子节点,那么两个进程会运行在同一个节点上。flink设置并行度的时候,先保证同一个slot里有全部的算子,同一个算子的其他并行算子才会分到其他slot中。如果有两个节点会优先分配在两个节点的slot上,如果有一个节点会在同一个节点下分配到两个slot上。

     二、flink环境启停及job任务提交与取消

    注意:每次替换了jar包 都要停了flink job和flink环境然后再重启:

    1)停止flink job:

    在bin目录执行:./flink list 查看正在运行的flink任务的jobid

    ------------------ Running/Restarting Jobs -------------------
    03.01.2020 16:24:30 : 4de78031c1c99110328d88fbdf2384c3 :   Flink Application (RUNNING)

    然后执行: ./flink cancel 4de78031c1c99110328d88fbdf2384c3


    2)停止和重启flink环境的命令(在bin目录下):
    ./stop-cluster.sh

    ./start-clush.sh

    3)启动job
    ./flink run --class com.najing.FlinkAppStart  xxx/xxx.jar


    三、flink kafka消费积压问题解决思路
    1)不要把所有topic都放到一个job中,这样会导致一个消费者消费所有主题数据,会导致积压,要根据topic进行job拆分。
    2)给kafka主题设置分区同时给flink的consumer设置并行度,使分区数=并行度,让3个消费者同时消费三个分区。
    3)如果上面两种方式都设置后,还是积压,一定是在source-> filter->flatMap->map->sink 算子处理流程中有个别算子处理速度非常慢,导致积压。
    因为不同的算子是不同的线程,当中间有线程处理慢以后,前面的线程也会被它阻塞掉,导致拉取环节的线程也变慢,最终导致积压。这就要找出处理慢的算子,进行针对性的优化。

    kafka的自动提交,会在拉取后不管有没有没成功处理,到时间就自动提交。

    四、集成springboot的flink,job提交后spring环境启动方式

    flink job提交后其实是把各个算子分配到不同节点运行的,所以要想在算子里使用spring的bean,必须在算子运行前拉起spring环境,在各个算子的open()方法里要拉起springboot 环境。

    一个节点上即使有多个flot,只需要拉起一个spring进程即可,各个flot和算子共用这一个环境,为了避免在同一节点上多次启动spring环境导致启动报错,须在启动spring环境的方法里做判断,如果已经启动了或则正在启动,则不再启动。

    理论上各个算子都要拉起下spring环境,避免出错,但如果环境只有两个节点,且算子的最大并行度也是2,只在并行度最大的算子里拉起spring环境也可以保证job正常运行,但这样有启动报错风险。

  • 相关阅读:
    Spring AOP Capabilities and Goal
    CDI Features
    Java Design Patterns
    Connector for python
    Spring reference
    a+1、&a+1、*(a+1)、*(&a+1)、*(*(&a+1))的区别
    int **p和int *p
    Hibernate注解
    功能测试
    零售商商品管理系统代码节选
  • 原文地址:https://www.cnblogs.com/luckyna/p/13086143.html
Copyright © 2011-2022 走看看