zoukankan      html  css  js  c++  java
  • Flink广播流Demo

    广播状态
    从版本1.5.0开始,Apache Flink具有一种新的状态,称为广播状态。

    三种应用场景

    1. 动态配置更新
    2. 规则改变
    3. 类似开关的功能
      假设场景,
      有两条流,一条是普通的流,另一条是控制流,如果需要动态调整代码逻辑时,可以使用广播状态
    package com.haoziqi.chapter_09;
    
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
    import org.apache.flink.streaming.api.datastream.BroadcastStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    
    import java.nio.file.attribute.UserPrincipalLookupService;
    
    /**
     * description
     * created by A on 2021/3/16
     */
    public class State_BroadcastState {
        public static void main(String[] args) {
            //控制流发送到普通流后,普通流会收到一个广播状态
            //1.创建环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<String> inputDS = env.socketTextStream("localhost", 8888);
            DataStreamSource<String> controlDS = env.socketTextStream("localhost", 9999);
            //TODO 1.把其中一条流(控制流) 广播出去
            //定义一个Map状态描述器,控制流会把这个状态广播出去
            MapStateDescriptor<String, String> broadcast = new MapStateDescriptor<>("boradcast-state", Types.STRING, Types.STRING);
            BroadcastStream<String> contrlBS = controlDS.broadcast(broadcast);
    
            //TODO 2.把另一条流和广播流关联起来
            BroadcastConnectedStream<String, String> inputBCS = inputDS.connect(contrlBS);
            
            //TODO 3.调用Process
    
            inputBCS.process(
                    new BroadcastProcessFunction<String, String, String>() {
                        /*
                            获取广播状态,获取数据进行处理
                         */
                        @Override
                        public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                            //TODO 5.通过上下文获取广播状态,取出里面的值
                            ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                            String aSwitch = broadcastState.get("switch");
                            if("1".equals(aSwitch)){
                                out.collect("切换到1的逻辑");
                            }else if("2".equals(aSwitch)){
                                out.collect("切换到2的逻辑");
                            }
    
    
                        }
    
                        /**
                         * 处理广播流的数据:这里主要定义,什么数据往广播状态存
                         * @param value
                         * @param ctx
                         * @param out
                         * @throws Exception
                         */
                        @Override
                        public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                            // TODO 4.通过上下文获取广播状态,并往广播状态里存数据
                            BroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                            broadcastState.put("switch",value);
                        }
                    }
            ).print();
            //提交任务
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    
    
  • 相关阅读:
    MySQL 基本字段类型
    《将博客搬至CSDN》
    【转载·收藏】 html5手机网站自适应需要加的meta标签
    SQL LIKE操作符 Thinkphp
    Thinkphp判断值是否为空
    Thinkphp重复字段过滤
    Thinkphp框架删除确认对话框
    PHP微信公众平台开发高级篇——群发接口(慕课网学习笔记)
    通过当前cateid来判断切换tab
    js获取当前页面的url中id
  • 原文地址:https://www.cnblogs.com/traveller-hzq/p/14545543.html
Copyright © 2011-2022 走看看