zoukankan      html  css  js  c++  java
  • Beam Pipeline的几种模式

    复制模式的 Pipeline

    PCollection<Video> videoDataCollection = ...;
     
    //  生成高画质视频
    PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateHighResolution(c.element()));
      }
    }));
     
    //  生成低画质视频
    PCollection<Video> lowResolutionVideoCollection = videoDataCollection.apply("lowResolutionTransform", ParDo.of(new DoFn<Video, Video>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateLowResolution(c.element()));
      }
    }));
     
    // 生成GIF动画
    PCollection<Image> gifCollection = videoDataCollection.apply("gifTransform", ParDo.of(new DoFn<Video, Image>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateGIF(c.element()));
      }
    }));
     
    //  生成视频字幕
    PCollection<Caption> captionCollection = videoDataCollection.apply("captionTransform", ParDo.of(new DoFn<Video, Caption>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(generateCaption(c.element()));
      }
    }));
     
    //   分析视频
    PCollection<Report> videoAnalysisCollection = videoDataCollection.apply("videoAnalysisTransform", ParDo.of(new DoFn<Video, Report>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(analyzeVideo(c.element()));
      }
    }));

    过滤模式的 Pipeline

    PCollection<User> userCollection = ...;
     
    PCollection<User> diamondUserCollection = userCollection.apply("filterDiamondUserTransform", ParDo.of(new DoFn<User, User>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (isDiamondUser(c.element()) {
          c.output(c.element());
        }
      }
    }));
     
    PCollection<User> notifiedUserCollection = userCollection.apply("notifyUserTransform", ParDo.of(new DoFn<User, User>(){
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (notifyUser(c.element()) {
          c.output(c.element());
        }
      }
    }));

    分离模式的 Pipeline

    // 首先定义每一个output的tag
    final TupleTag<User> fiveStarMembershipTag = new TupleTag<User>(){};
    final TupleTag<User> goldenMembershipTag = new TupleTag<User>(){};
    final TupleTag<User> diamondMembershipTag = new TupleTag<User>(){};
     
    PCollection<User> userCollection = ...;
     
    PCollectionTuple mixedCollection =
        userCollection.apply(ParDo
            .of(new DoFn<User, User>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                if (isFiveStartMember(c.element())) {
                  c.output(c.element());
                } else if (isGoldenMember(c.element())) {
                  c.output(goldenMembershipTag, c.element());
                } else if (isDiamondMember(c.element())) {
        c.output(diamondMembershipTag, c.element());
      }
              }
            })
            .withOutputTags(fiveStarMembershipTag,
                            TupleTagList.of(goldenMembershipTag).and(diamondMembershipTag)));
     
    // 分离出不同的用户群组
    mixedCollection.get(fiveStarMembershipTag).apply(...);
     
    mixedCollection.get(goldenMembershipTag).apply(...);
     
    mixedCollection.get(diamondMembershipTag).apply(...);

    合并模式的 Pipeline

    PCollectionList<Image> collectionList = PCollectionList.of(internalImages).and(thirdPartyImages);
    PCollection<Image> mergedCollectionWithFlatten = collectionList
        .apply(Flatten.<Image>pCollections());
     
    mergedCollectionWithFlatten.apply(...);

    参考资料:

    https://time.geekbang.org/column/article/103301?utm_source=pinpaizhuanqu&utm_medium=geektime&utm_campaign=guanwang&utm_term=guanwang&utm_content=0511

    https://www.cnblogs.com/AlanWilliamWalker/p/10370747.html

    https://blog.csdn.net/leehom__/article/details/96576668

  • 相关阅读:
    缓存架构设计细节二三事
    啥,又要为表增加一列属性?
    SpringMvc4.x---快捷的ViewController
    SpringMvc4.x--@ControllerAdvice注解
    SpringMvc4.x--Spring MVC的常用注解
    解决svn--Unable to connect to a repository at URL ‘https://xxxxxx’ 问题
    或许你不知道的10条SQL技巧
    Java 基础-运算符
    Java 运算符 % 和 /
    Java基础-注释
  • 原文地址:https://www.cnblogs.com/libin2015/p/13932318.html
Copyright © 2011-2022 走看看