zoukankan      html  css  js  c++  java
  • Flink实例(三十七):状态管理(八)自定义操作符状态(三)广播状态(Broadcast state)(一) KeyedBroadcastProcessFunction

    什么是 Broadcast State

    Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。英语好的同学可以直接移步 Flink 官方介绍
    Broadcast State 区别于其他 operator state 的地方有:

    • Broadcast State 类似 Map 结构,可以 put get putAll remove 等
    • 必须有一条广播流和一条非广播流
    • 符合条件的 operator 可以有多个不同名字的 BroadcastState(疑惑:普通的 operator 也可以有多个不同名字的 state 吧,只是不是 BroadcastState。这么想也说得通了)

    Broadcast state 示例

    下面从一个示例来认识如何使用 Broadcast state. 我们对 wordcount 的例子都很熟悉,就简单改造下 wordcount吧。我们的改造目标是:实时控制输出结果中的单词长度。
    首先大体说一下思路,准备两个流,一个数据流(wordcount 需要统计的流) A,一个配置流(即广播流,后面有生成方法) B,这两个流的来源都可以自己定义,这里我们都用 kafka 作为输入源;然后用 A.keyBy(0).connect(B), 这里注意,一定是用数据流[.func()].connect(广播流),生成一个新的 BroadcastConnectedStream C;最后 C.process(new KeyedBroadcastProcessFunction<…>(…)) 进行逻辑处理。

    1.数据流消费者
    数据流就是 wordcount 程序统计的普通文本

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic-data",...);

    2.广播流消费者
    我们定义了广播流一条消息的格式为 {“length”:n} .其中 n 为数字,表示单词的最大长度。

    FlinkKafkaConsumer<String> consumerBroadcast = new FlinkKafkaConsumer<>("input-topic-config",...);

    3.生成数据流 A
    这里对数据流进行了 wordcount 中的分词操作,输出流为 <单词, 数量, 生成时间>

    DataStream<Tuple3<String, Integer, Long>> dataStream = env.addSource(consumer).flatMap(new LineSplitter());

    4.生成广播流 B 并广播
    我们知道,在 Flink 中,访问 state 前先要定义状态描述符(StateDescriptor). BroadcastState 的状态描述符是 MapStateDescriptor. MapStateDescriptor 的 value 类型即是广播流的元素类型,这个例子里是 Map<String,Object>。前面我们定义了广播的原始消息格式为 json 字符串,在这里我们通过 flatMap 函数转化成 Map<String,Object> 类型。转化完成后,调用 broadcast() 将这个流广播出去,这样,每个 task 都会收到广播流数据并在本地保存一份 BroadcastState,实际上,生成 BroadcastState 是在后续的 process() 中操作的。

    // 定义 MapStateDescriptor
    final MapStateDescriptor<String,Map<String,Object>> broadCastConfigDescriptor = new MapStateDescriptor<>("broadCastConfig",BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, Object.class));
    // i.e. {"length":5}
    BroadcastStream<Map<String,Object>> broadcastStream = env.addSource(consumerBroadcast).
             flatMap(new FlatMapFunction<String, Map<String,Object>>() {
                        // 解析 json 数据
                        private final ObjectMapper mapper = new ObjectMapper();
                        @Override
                        public void flatMap(String value, Collector<Map<String,Object>> out) {
                            try {
                                 out.collect(mapper.readValue(value, Map.class));
                            } catch (IOException e) {
                                 e.printStackTrace();
                                 System.out.println(value);
                            }
                        }
                   }
            // 这里需要调用 broadcast 广播出去,并且只能是 MapStateDescriptor 类型。可以指定多个
            ).broadcast(broadCastConfigDescriptor); //这里可以指定多个descriptor

    5.连接两个流
    接下来是两个流的连接部分了。前面说过,必须是 数据流.connect(广播流). 这里又分成两种情况

    • noKeyedStream.connect(BroadcastStream).process(new BroadcastProcessFunction<>(…)) 非 KeyedStream 连接 BroadcastStream 的,只能使用 BroadcastProcessFunction 函数处理连接逻辑
    • KeyedStream.connect(BroadcastStream).process(new KeyedBroadcastProcessFunction<>(…)) KeyedStream 连接 BroadcastStream 的,只能使用 KeyedBroadcastProcessFunction 函数处理连接逻辑
      KeyedBroadcastProcessFunction 比 BroadcastProcessFunction 多了计时器服务和获取当前 key 接口,当然,这两个功能不一定能用到。

    我们这里使用的是 KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>:
      KS 是 KeyedStream 中 key 的类型;IN1 是数据流(即非广播流)的元素类型;IN2 是广播流的元素类型;OUT 是两个流连接完成后,输出流的元素类型。

    dataStream.keyBy(0).connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, Integer, Long>, Map<String, Object>, Tuple2<String,Integer>>(){...}

    我们单独把 KeyedBroadcastProcessFunction 摘出来,这个函数用于处理具体的连接逻辑和业务逻辑。主要需要实现以下两个函数:

    • public void processElement(Tuple3<String, Integer, Long> value, ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out):

         这个函数处理数据流的数据,这里之只能获取到 ReadOnlyBroadcastState,因为 Flink 不允许在这里修改 BroadcastState 的状态。value 是数据流中的一个元素;ctx 是上下文,可以提供计时器服务、当前 key和只读的 BroadcastState;out 是输出流收集器。

    • public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<Tuple2<String,Integer>> out):

        这里处理广播流的数据,将广播流数据保存到 BroadcastState 中。value 是广播流中的一个元素;ctx 是上下文,提供 BroadcastState 和修改方法;out 是输出流收集器。

    下面是示例源码:

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.state.*;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.typeutils.MapTypeInfo;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.BroadcastStream;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.util.Collector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Map;
    
    public class BroadCastWordCountExample {
        public static void main (String[] args) throws Exception {
            final ParameterTool parameterTool = ParameterTool.fromArgs(args);
            if (parameterTool.getNumberOfParameters() < 5) {
                System.out.println("Missing parameters!
    " +
                        "Usage: Kafka --input-topic-data <topic> --input-topic-config <topic> --output-topic <topic> " +
                        "--bootstrap.servers <kafka brokers> " +
                        "--group.id <some id> --auto.offset.reset <latest, earliest, none>");
                return;
            }
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.enableCheckpointing(parameterTool.getInt("checkpoint.interval",60000)); // create a checkpoint every n mill seconds
    
            // set mode to exactly-once (this is the default)
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
            // make sure 500 ms of progress happen between checkpoints
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    
            // checkpoints have to complete within one minute, or are discarded
            env.getCheckpointConfig().setCheckpointTimeout(60000);
    
            // allow only one checkpoint to be in progress at the same time
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
            // make parameters available in the web interface
            env.getConfig().setGlobalJobParameters(parameterTool);
    
            FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                    parameterTool.getRequired("input-topic-data"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties());
            FlinkKafkaConsumer<String> consumerBroadcast = new FlinkKafkaConsumer<>(
                    parameterTool.getRequired("input-topic-config"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties());
    
            DataStream<Tuple3<String, Integer, Long>> dataStream = env.addSource(consumer).flatMap(new LineSplitter());
            final MapStateDescriptor<String,Map<String,Object>> broadCastConfigDescriptor = new MapStateDescriptor<>("broadCastConfig",
                    BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, Object.class));
            // e.g. {"length":5}
            BroadcastStream<Map<String,Object>> broadcastStream = env.addSource(consumerBroadcast).
                    flatMap(new FlatMapFunction<String, Map<String, Object>>() {
                                // 解析 json 数据
                                private final ObjectMapper mapper = new ObjectMapper();
    
                                @Override
                                public void flatMap(String value, Collector<Map<String, Object>> out) {
                                    try {
                                        out.collect(mapper.readValue(value, Map.class));
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                        System.out.println(value);
                                    }
                                }
                            }
                    ).broadcast(broadCastConfigDescriptor); //这里可以指定多个descriptor
    
            dataStream.keyBy(0).connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, Integer, Long>, Map<String, Object>, Tuple2<String,Integer>>() {
                private final Logger logger = LoggerFactory.getLogger(BroadCastWordCountExample.class);
                private transient MapState<String, Integer> counterState;
                int length = 5;
                // 必须和上文的 broadCastConfigDescriptor 一致,否则报 java.lang.IllegalArgumentException: The requested state does not exist 的错误
                private final MapStateDescriptor<String, Map<String,Object>> broadCastConfigDescriptor = new MapStateDescriptor<>("broadCastConfig", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, Object.class));
                private final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("counter",String.class, Integer.class);
                @Override
                public void open(Configuration parameters) throws Exception{
                    counterState = getRuntimeContext().getMapState(descriptor);
                    logger.info("get counter/globalConfig MapState from checkpoint");
                }
                /**
                 * 这里处理数据流的数据
                 * */
                @Override
                public void processElement(Tuple3<String, Integer, Long> value, ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                    /**
                     * 这里之只能获取到 ReadOnlyBroadcastState,因为 Flink 不允许在这里修改 BroadcastState 的状态
                     * */
                    // 从广播状态中获取规则
                    ReadOnlyBroadcastState<String, Map<String,Object>> broadcastState = ctx.getBroadcastState(broadCastConfigDescriptor);
                    if (broadcastState.contains("broadcastStateKey")) {
                        length = (Integer) broadcastState.get("broadcastStateKey").get("length");
                    }
                    if (value.f0.length() > length) {
                        logger.warn("length of str {} > {}, ignored", value.f0, length);
                        return;
                    }
                    if (counterState.contains(value.f0)) {
                        counterState.put(value.f0, counterState.get(value.f0) + value.f1);
                    } else {
                        counterState.put(value.f0, value.f1);
                    }
                    out.collect(new Tuple2<>(value.f0, counterState.get(value.f0)));
                }
                /**
                 * 这里处理广播流的数据
                 * */
                @Override
                public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<Tuple2<String,Integer>> out) throws Exception {
                    if (!value.containsKey("length")) {
                        logger.error("stream element {} do not contents "length"", value);
                        return;
                    }
    
                    /*ctx.applyToKeyedState(broadCastConfigDescriptor, (key, state) -> {
                         // 这里可以修改所有 broadCastConfigDescriptor 描述的 state
                    });*/
                    /** 这里获取 BroadcastState,BroadcastState 包含 Map 结构,可以修改、添加、删除、迭代等
                     * */
                    BroadcastState<String, Map<String,Object>> broadcastState = ctx.getBroadcastState(broadCastConfigDescriptor);
                    // 前面说过,BroadcastState 类似于 MapState.这里的 broadcastStateKey 是随意指定的 key, 用于示例
                    // 更新广播流的规则到广播状态: BroadcastState
                    if (broadcastState.contains("broadcastStateKey")) {
                        Map<String, Object> oldMap = broadcastState.get("broadcastStateKey");
                        logger.info("get State {}, replaced with State {}",oldMap,value);
                    } else {
                        logger.info("do not find old State, put first counterState {}",value);
                    }
                    broadcastState.put("broadcastStateKey",value);
                }
            }).print();
    
            env.execute("BroadCastWordCountExample");
        }
    }

    示例测试:

    1.启动 flink 集群,并行度为2,运行该 job;
    2.数据流输入:

    No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

    task 打印输出:

    2> (no,1)
    1> (for,1)
    2> (path,1)
    1> (jar,1)
    2> (the,1)
    2> (flink,1)
    2> (using,1)
    2> (the,2)
    1> (jar,2)
    2> (of,1)
    2> (class,1)
    2> (to,1)
    2> (the,3)

    3.广播流输入:

    {"length":6}

    数据流输入相同数据:

    No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

    task 打印输出:

    2> (no,2)
    2> (path,2)
    1> (for,2)
    2> (the,4)
    1> (jar,3)
    2> (flink,2)
    2> (using,2)
    2> (the,5)
    1> (jar,4)
    2> (of,2)
    2> (class,2)
    2> (to,2)
    2> (locate,1)
    2> (the,6)

    使用 broadcast state 时需要注意的事项

      1.同一个 operator 的各个 task 之间没有通信:这也是为何只有 广播流侧(processBroadcastElement) 才能修改 broadcast state,而数据流侧(processElement) 只能读 broadcast state. 此外,开发者需要保证所有 operator task 对 broadcast state 的修改逻辑是相同的(一般都是相同的吧),否则会导致非预期的结果。
      2.operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游 tasks,但是元素到达的顺序可能不同。所以更新 state 时不能依赖元素到达的顺序。
      3.每个 task 对各自的 broadcast state 做快照:虽然每个 task 收到的广播流元素和做快照时的 broadcast state 是一样的,但是每个 task 快照到本地。这样做是为了防止失败恢复时,所有的 tasks 同时读一个文件导致的热点问题(hotspots)。当恢复后并行度不变或变小时,task 读取各自的 state;当恢复后并行度变大,之前的 tasks 读取各自的 state,新增的 task(p_new-p_old) 以 round-robin 方式读取前一个 task 的 state。
      4.目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,开发者应该为其预留合适的内存。

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13795243.html

  • 相关阅读:
    浅谈 iOS 之 Crash log 符号化
    聊聊 Statsd 和 Collectd 那点事!
    如何使用 Zend Expressive 建立 NASA 图片库?
    Nagios 邮箱告警的方式太OUT了!
    如何从软硬件层面提升 Android 动画性能?
    这样查看告警邮件要慢一点……
    Android 共享文件的 Runtime 权限
    第38节:hashCode()与toString()与equals()函数的作用,内部类和匿名内部类
    第37节:多线程安全问题
    第37节:多线程安全问题
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13795243.html
Copyright © 2011-2022 走看看