zoukankan      html  css  js  c++  java
  • CoProcessFunction实战三部曲之一:基本功能

    欢迎访问我的GitHub

    https://github.com/zq2599/blog_demos

    内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

    关于《CoProcessFunction实战三部曲》系列

    • 《CoProcessFunction实战三部曲》旨在通过三次实战,由浅入深的学习和掌握Flink低阶处理函数CoProcessFunction的用法;
    • 整个系列的开篇先介绍CoProcessFunction,然后迅速进入实战,了解CoProcessFunction的基本功能;
    • 下一篇会结合状态,让双流元素的处理彼此保持关系;
    • 终篇的实战会加入定时器功能,确保同一个key的数据在双流场景下能够及时处理;

    版本信息

    1. 开发环境操作系统:MacBook Pro 13寸, macOS Catalina 10.15.3
    2. 开发工具:IDEA ULTIMATE 2018.3
    3. JDK:1.8.0_211
    4. Maven:3.6.0
    5. Flink:1.9.2

    系列文章链接

    1. 基本功能
    2. 状态处理
    3. 定时器和侧输出

    关于CoProcessFunction

    • CoProcessFunction的作用是同时处理两个数据源的数据;
    • 试想在面对两个输入流时,如果这两个流的数据之间有业务关系,该如何编码实现呢,例如下图中的操作,同时监听99989999端口,将收到的输出分别处理后,再由同一个sink处理(打印):
      在这里插入图片描述
    • Flink支持的方式是扩展CoProcessFunction来处理,为了更清楚认识,我们把KeyedProcessFunctionCoProcessFunction的类图摆在一起看,如下所示:
      在这里插入图片描述
    • 从上图可见,CoProcessFunction和KeyedProcessFunction的继承关系一样,另外CoProcessFunction自身也很简单,在processElement1processElement2中分别处理两个上游流入的数据即可,并且也支持定时器设置;

    本篇实战功能简介

    本篇咱们要开发的应用,其功能非常简单,描述如下:

    1. 建两个数据源,数据分别来自本地99989999端口;
    2. 每个端口收到类似aaa,123这样的数据,转成Tuple2实例,f0是aaa,f1是123
    3. 在CoProcessFunction的实现类中,对每个数据源的数据都打日志,然后全部传到下游算子;
    4. 下游操作是打印,因此99989999端口收到的所有数据都会在控制台打印出来;
    5. 整个demo的功能如下图所示:
      在这里插入图片描述
    • 接下来开始编码;

    源码下载

    如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

    名称 链接 备注
    项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
    git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
    git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

    这个git项目中有多个文件夹,本章的应用在flinkstudy文件夹下,如下图红框所示:
    在这里插入图片描述

    代码简介

    1. 开发一个Map算子,将字符串转成Tuple2;
    2. 再开发抽象类AbstractCoProcessFunctionExecutor,功能包括:flink启动、监听端口、调用算子处理数据、双流连接、将双流处理结果打印出来;
    3. 从上面的描述可见,AbstractCoProcessFunctionExecutor做了很多事情,唯独没有实现双流连接后的具体业务逻辑,这些没有做的是留给子类来实现的,整个三部曲系列的重点都集中在AbstractCoProcessFunctionExecutor的子类上,把双流连接后的业务逻辑做好,如下图所示,红色为CoProcessFunction的业务代码,其他的都在抽象类中完成:
      在这里插入图片描述

    Map算子

    1. 做一个map算子,用来将字符串aaa,123转成Tuple2实例,f0是aaa,f1是123
    2. 算子名为WordCountMap.java
    package com.bolingcavalry.coprocessfunction;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.StringUtils;
    
    public class WordCountMap implements MapFunction<String, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(String s) throws Exception {
    
            if(StringUtils.isNullOrWhitespaceOnly(s)) {
                System.out.println("invalid line");
                return null;
            }
    
            String[] array = s.split(",");
    
            if(null==array || array.length<2) {
                System.out.println("invalid line for array");
                return null;
            }
    
            return new Tuple2<>(array[0], Integer.valueOf(array[1]));
        }
    }
    

    抽象类

    • 抽象类AbstractCoProcessFunctionExecutor.java,源码如下,稍后会说明几个关键点:
    package com.bolingcavalry.coprocessfunction;
    
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
    
    /**
     * @author will
     * @email zq2599@gmail.com
     * @date 2020-11-09 17:33
     * @description 串起整个逻辑的执行类,用于体验CoProcessFunction
     */
    public abstract class AbstractCoProcessFunctionExecutor {
    
        /**
         * 返回CoProcessFunction的实例,这个方法留给子类实现
         * @return
         */
        protected abstract CoProcessFunction<
                Tuple2<String, Integer>,
                Tuple2<String, Integer>,
                Tuple2<String, Integer>> getCoProcessFunctionInstance();
    
        /**
         * 监听根据指定的端口,
         * 得到的数据先通过map转为Tuple2实例,
         * 给元素加入时间戳,
         * 再按f0字段分区,
         * 将分区后的KeyedStream返回
         * @param port
         * @return
         */
        protected KeyedStream<Tuple2<String, Integer>, Tuple> buildStreamFromSocket(StreamExecutionEnvironment env, int port) {
            return env
                    // 监听端口
                    .socketTextStream("localhost", port)
                    // 得到的字符串"aaa,3"转成Tuple2实例,f0="aaa",f1=3
                    .map(new WordCountMap())
                    // 将单词作为key分区
                    .keyBy(0);
        }
    
        /**
         * 如果子类有侧输出需要处理,请重写此方法,会在主流程执行完毕后被调用
         */
        protected void doSideOutput(SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream) {
        }
    
        /**
         * 执行业务的方法
         * @throws Exception
         */
        public void execute() throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 并行度1
            env.setParallelism(1);
    
            // 监听9998端口的输入
            KeyedStream<Tuple2<String, Integer>, Tuple> stream1 = buildStreamFromSocket(env, 9998);
    
            // 监听9999端口的输入
            KeyedStream<Tuple2<String, Integer>, Tuple> stream2 = buildStreamFromSocket(env, 9999);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = stream1
                    // 两个流连接
                    .connect(stream2)
                    // 执行低阶处理函数,具体处理逻辑在子类中实现
                    .process(getCoProcessFunctionInstance());
    
            // 将低阶处理函数输出的元素全部打印出来
            mainDataStream.print();
    
            // 侧输出相关逻辑,子类有侧输出需求时重写此方法
            doSideOutput(mainDataStream);
    
            // 执行
            env.execute("ProcessFunction demo : CoProcessFunction");
        }
    }
    
    • 关键点之一:一共有两个数据源,每个源的处理逻辑都封装到buildStreamFromSocket方法中;
    • 关键点之二:stream1.connect(stream2)将两个流连接起来;
    • 关键点之三:process接收CoProcessFunction实例,合并后的流的处理逻辑就在这里面;
    • 关键点之四:getCoProcessFunctionInstance是抽象方法,返回CoProcessFunction实例,交给子类实现,所以CoProcessFunction中做什么事情完全由子类决定;
    • 关键点之五:doSideOutput方法中啥也没做,但是在主流程代码的末尾会被调用,如果子类有侧输出(SideOutput)的需求,重写此方法即可,此方法的入参是处理过的数据集,可以从这里取得侧输出;

    子类,对连接后的双流进行操作

    1. 本篇子类CollectEveryOne.java如下所示,逻辑很简单,将每个源的上游数据直接输出到下游算子:
    package com.bolingcavalry.coprocessfunction;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
    import org.apache.flink.util.Collector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class CollectEveryOne extends AbstractCoProcessFunctionExecutor {
    
        private static final Logger logger = LoggerFactory.getLogger(CollectEveryOne.class);
    
        @Override
        protected CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> getCoProcessFunctionInstance() {
            return new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    
                @Override
                public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                    logger.info("处理1号流的元素:{},", value);
                    out.collect(value);
                }
    
                @Override
                public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                    logger.info("处理2号流的元素:{}", value);
                    out.collect(value);
                }
            };
        }
    
        public static void main(String[] args) throws Exception {
            new CollectEveryOne().execute();
        }
    }
    
    1. 上述代码中,CoProcessFunction后面的泛型定义很长:<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> ,一共三个Tuple2,分别代表一号数据源输入、二号数据源输入、下游输出的类型;
    2. 编码完成,运行起来试试;

    验证

    1. 分别开启本机的99989999端口,我这里是MacBook,执行nc -l 9998nc -l 9999
    2. 启动Flink应用,如果您和我一样是Mac电脑,直接运行CollectEveryOne.main方法即可(如果是windows电脑,我这没试过,不过做成jar在线部署也是可以的);
    3. 在监听9998和9999端口的控制台分别输入aaa,111bbb,222
    4. 以下是flink控制台输出的内容,可见processElement1和processElement2方法的日志代码已经执行,并且print方法作为最下游,将两个数据源的数据都打印出来了,符合预期:
    12:45:38,774 INFO CollectEveryOne - 处理1号流的元素:(aaa,111),
    (aaa,111)
    12:45:43,816 INFO CollectEveryOne - 处理2号流的元素:(bbb,222)
    (bbb,222)
    
    • 至此,咱们的第一个双流处理低阶函数就完成了,对CoProcessFunction也有了最基本的认识,当然CoProcessFunction的作用远不及此,下一篇咱们借助状态让processElement1processElement2分别对方处理过的状态,让每个元素的处理都和另一个流关联,不再孤立;

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列

    欢迎关注公众号:程序员欣宸

    微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
    https://github.com/zq2599/blog_demos

  • 相关阅读:
    BZOJ 1191 HNOI2006 超级英雄hero
    BZOJ 2442 Usaco2011 Open 修建草坪
    BZOJ 1812 IOI 2005 riv
    OJ 1159 holiday
    BZOJ 1491 NOI 2007 社交网络
    NOIP2014 D1 T3
    BZOJ 2423 HAOI 2010 最长公共子序列
    LCA模板
    NOIP 2015 D1T2信息传递
    数据结构
  • 原文地址:https://www.cnblogs.com/bolingcavalry/p/15009202.html
Copyright © 2011-2022 走看看