zoukankan      html  css  js  c++  java
  • Beam Pipeline的几种模式

    复制模式的 Pipeline

    PCollection<Video> videoDataCollection = ...;
     
    //  生成高画质视频
    PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateHighResolution(c.element()));
      }
    }));
     
    //  生成低画质视频
    PCollection<Video> lowResolutionVideoCollection = videoDataCollection.apply("lowResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateLowResolution(c.element()));
      }
    }));
     
    // 生成GIF动画
    PCollection<Image> gifCollection = videoDataCollection.apply("gifTransform", ParDo.of(new DoFn<Video, Image>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateGIF(c.element()));
      }
    }));
     
    //  生成视频字幕
    PCollection<Caption> captionCollection = videoDataCollection.apply("captionTransform", ParDo.of(new DoFn<Video, Caption>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateCaption(c.element()));
      }
    }));
     
    //   分析视频
    PCollection<Report> videoAnalysisCollection = videoDataCollection.apply("videoAnalysisTransform", ParDo.of(new DoFn<Video, Report>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(analyzeVideo(c.element()));
      }
    }));

    过滤模式的 Pipeline

    PCollection<User> userCollection = ...;
     
    PCollection<User> diamondUserCollection = userCollection.apply("filterDiamondUserTransform", ParDo.of(new DoFn<User, User>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (isDiamondUser(c.element()) {
          c.output(c.element());
        }
      }
    }));
     
    PCollection<User> notifiedUserCollection = userCollection.apply("notifyUserTransform", ParDo.of(new DoFn<User, User>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (notifyUser(c.element()) {
          c.output(c.element());
        }
      }
    }));

    分离模式的 Pipeline

    // 首先定义每一个output的tag
    final TupleTag<User> fiveStarMembershipTag = new TupleTag<User>(){};
    final TupleTag<User> goldenMembershipTag = new TupleTag<User>(){};
    final TupleTag<User> diamondMembershipTag = new TupleTag<User>(){};
     
    PCollection<User> userCollection = ...;
     
    PCollectionTuple mixedCollection =
        userCollection.apply(ParDo
            .of(new DoFn<User, User>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                if (isFiveStartMember(c.element())) {
                  c.output(c.element());
                } else if (isGoldenMember(c.element())) {
                  c.output(goldenMembershipTag, c.element());
                } else if (isDiamondMember(c.element())) {
        c.output(diamondMembershipTag, c.element());
      }
              }
            })
            .withOutputTags(fiveStarMembershipTag,
                            TupleTagList.of(goldenMembershipTag).and(diamondMembershipTag)));
     
    // 分离出不同的用户群组
    mixedCollection.get(fiveStarMembershipTag).apply(...);
     
    mixedCollection.get(goldenMembershipTag).apply(...);
     
    mixedCollection.get(diamondMembershipTag).apply(...);

    合并模式的 Pipeline

    PCollectionList<Image> collectionList = PCollectionList.of(internalImages).and(thirdPartyImages);
    PCollection<Image> mergedCollectionWithFlatten = collectionList
        .apply(Flatten.<Image>pCollections());
     
    mergedCollectionWithFlatten.apply(...);

    参考资料:

    https://time.geekbang.org/column/article/103301?utm_source=pinpaizhuanqu&utm_medium=geektime&utm_campaign=guanwang&utm_term=guanwang&utm_content=0511

    https://www.cnblogs.com/AlanWilliamWalker/p/10370747.html

    https://blog.csdn.net/leehom__/article/details/96576668

  • 相关阅读:
    学习比较-列表
    查看Linux下系统资源占用常用命令
    eclipse加载maven工程提示pom.xml无法解析org.apache.maven.plugins:maven-resources-plugin:2.4.3解决方案
    springmvc 注解扫描失败的可能原因
    单例模式:懒加载(延迟加载)和即时加载
    nginx 正向代理和反向代理
    LINUX中错误 SELinux is disabled
    修改Win10默认窗口背景色为护眼色的方法
    搜索引擎之Lucene
    MongoDB系列(一):MongoDB安装及基础语法
  • 原文地址:https://www.cnblogs.com/libin2015/p/13932318.html
Copyright © 2011-2022 走看看