zoukankan      html  css  js  c++  java
  • Beam Pipeline的几种模式

    复制模式的 Pipeline

    PCollection<Video> videoDataCollection = ...;
     
    //  生成高画质视频
    PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateHighResolution(c.element()));
      }
    }));
     
    //  生成低画质视频
    PCollection<Video> lowResolutionVideoCollection = videoDataCollection.apply("lowResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateLowResolution(c.element()));
      }
    }));
     
    // 生成GIF动画
    PCollection<Image> gifCollection = videoDataCollection.apply("gifTransform", ParDo.of(new DoFn<Video, Image>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateGIF(c.element()));
      }
    }));
     
    //  生成视频字幕
    PCollection<Caption> captionCollection = videoDataCollection.apply("captionTransform", ParDo.of(new DoFn<Video, Caption>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateCaption(c.element()));
      }
    }));
     
    //   分析视频
    PCollection<Report> videoAnalysisCollection = videoDataCollection.apply("videoAnalysisTransform", ParDo.of(new DoFn<Video, Report>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(analyzeVideo(c.element()));
      }
    }));

    过滤模式的 Pipeline

    PCollection<User> userCollection = ...;
     
    PCollection<User> diamondUserCollection = userCollection.apply("filterDiamondUserTransform", ParDo.of(new DoFn<User, User>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (isDiamondUser(c.element()) {
          c.output(c.element());
        }
      }
    }));
     
    PCollection<User> notifiedUserCollection = userCollection.apply("notifyUserTransform", ParDo.of(new DoFn<User, User>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (notifyUser(c.element()) {
          c.output(c.element());
        }
      }
    }));

    分离模式的 Pipeline

    // 首先定义每一个output的tag
    final TupleTag<User> fiveStarMembershipTag = new TupleTag<User>(){};
    final TupleTag<User> goldenMembershipTag = new TupleTag<User>(){};
    final TupleTag<User> diamondMembershipTag = new TupleTag<User>(){};
     
    PCollection<User> userCollection = ...;
     
    PCollectionTuple mixedCollection =
        userCollection.apply(ParDo
            .of(new DoFn<User, User>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                if (isFiveStartMember(c.element())) {
                  c.output(c.element());
                } else if (isGoldenMember(c.element())) {
                  c.output(goldenMembershipTag, c.element());
                } else if (isDiamondMember(c.element())) {
        c.output(diamondMembershipTag, c.element());
      }
              }
            })
            .withOutputTags(fiveStarMembershipTag,
                            TupleTagList.of(goldenMembershipTag).and(diamondMembershipTag)));
     
    // 分离出不同的用户群组
    mixedCollection.get(fiveStarMembershipTag).apply(...);
     
    mixedCollection.get(goldenMembershipTag).apply(...);
     
    mixedCollection.get(diamondMembershipTag).apply(...);

    合并模式的 Pipeline

    PCollectionList<Image> collectionList = PCollectionList.of(internalImages).and(thirdPartyImages);
    PCollection<Image> mergedCollectionWithFlatten = collectionList
        .apply(Flatten.<Image>pCollections());
     
    mergedCollectionWithFlatten.apply(...);

    参考资料:

    https://time.geekbang.org/column/article/103301?utm_source=pinpaizhuanqu&utm_medium=geektime&utm_campaign=guanwang&utm_term=guanwang&utm_content=0511

    https://www.cnblogs.com/AlanWilliamWalker/p/10370747.html

    https://blog.csdn.net/leehom__/article/details/96576668

  • 相关阅读:
    RESTful日#2:使用Unity容器和引导程序在Web api中使用依赖注入实现控制反转
    RESTful日#2:使用Unity容器和引导程序在Web api中使用依赖注入实现控制反转
    带有可选选项的输入文本框(组合框)
    在组合框中嵌入一个DataGridView
    ItemData在。net
    实现一个所有者绘制的组合框
    模板化的“请等待”按钮和模板控件介绍
    使用AvalonEdit (WPF文本编辑器)
    办公风格的扁平组合箱
    【windows】telnet 和一些dos命令
  • 原文地址:https://www.cnblogs.com/libin2015/p/13932318.html
Copyright © 2011-2022 走看看