zoukankan      html  css  js  c++  java
  • Flink使用SideOutPut替换Split实现分流

    以前的数据分析项目(版本1.4.2),对从Kafka读取的原始数据流,调用split接口实现分流.
    新项目决定使用Flink 1.7.2,使用split接口进行分流的时候,发现接口被标记为depracted(后续可能会被移除).
    搜索相关文档,发现新版本Flink中推荐使用带外数据进行分流.

    预先建立OutputTag实例(LogEntity是从kafka读取的日志实例类).

    private static final OutputTag<LogEntity> APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));
    private static final OutputTag<LogEntity> ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));
    

    kafka读取的原始数据,通过process接口,打上相应标记.

        private static SingleOutputStreamOperator<LogEntity> sideOutStream(DataStream<LogEntity> rawLogStream) {
            return rawLogStream
                    .process(new ProcessFunction<LogEntity, LogEntity>() {
                        @Override
                        public void processElement(LogEntity entity, Context ctx, Collector<LogEntity> out) throws Exception {
                            // 根据日志等级,给对象打上不同的标记
                            if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {
                                ctx.output(ANALYZE_METRIC_TAG, entity);
                            } else {
                                ctx.output(APP_LOG_TAG, entity);
                            }
                        }
                    })
                    .name("RawLogEntitySplitStream");
        }
    
        // 调用函数,对原始数据流中的对象进行标记
        SingleOutputStreamOperator<LogEntity> sideOutLogStream = sideOutStream(rawLogStream);
        // 根据标记,获取不同的数据流,以便后续进行进一步分析
        DataStream<LogEntity> appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);
        DataStream<LogEntity> rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);
    

    通过以上步骤,就实现了数据流的切分.

    PS:
    如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢!
    程序员打怪之路

  • 相关阅读:
    使用Visual Studio 2012 开发 Html5 应用
    模块化与MVC
    跨站脚本攻击(Cross‐Site Scripting (XSS))
    C#程序开发中经常遇到的10条实用的代码
    运用DebugDiag诊断ASP.Net异常
    前端MVVM框架avalon
    TOGAF架构开发方法(ADM)之需求管理阶段
    C#4.0中var和dynamic的区别
    hive 不同用户 权限设置 出错处理
    Delphi中类的运行期TypeInfo信息结构说明
  • 原文地址:https://www.cnblogs.com/jason1990/p/11610130.html
Copyright © 2011-2022 走看看