复制模式的 Pipeline
PCollection<Video> videoDataCollection = ...; // 生成高画质视频 PCollection<Video> highResolutionVideoCollection = videoDataCollection.apply("highResolutionTransform", ParDo.of(new DoFn<Video, Video>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateHighResolution(c.element())); } })); // 生成低画质视频 PCollection<Video> lowResolutionVideoCollection = videoDataCollection.apply("lowResolutionTransform", ParDo.of(new DoFn<Video, Video>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateLowResolution(c.element())); } })); // 生成GIF动画 PCollection<Image> gifCollection = videoDataCollection.apply("gifTransform", ParDo.of(new DoFn<Video, Image>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateGIF(c.element())); } })); // 生成视频字幕 PCollection<Caption> captionCollection = videoDataCollection.apply("captionTransform", ParDo.of(new DoFn<Video, Caption>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(generateCaption(c.element())); } })); // 分析视频 PCollection<Report> videoAnalysisCollection = videoDataCollection.apply("videoAnalysisTransform", ParDo.of(new DoFn<Video, Report>(){ @ProcessElement public void processElement(ProcessContext c) { c.output(analyzeVideo(c.element())); } }));
过滤模式的 Pipeline
PCollection<User> userCollection = ...; PCollection<User> diamondUserCollection = userCollection.apply("filterDiamondUserTransform", ParDo.of(new DoFn<User, User>(){ @ProcessElement public void processElement(ProcessContext c) { if (isDiamondUser(c.element()) { c.output(c.element()); } } })); PCollection<User> notifiedUserCollection = userCollection.apply("notifyUserTransform", ParDo.of(new DoFn<User, User>(){ @ProcessElement public void processElement(ProcessContext c) { if (notifyUser(c.element()) { c.output(c.element()); } } }));
分离模式的 Pipeline
// 首先定义每一个output的tag final TupleTag<User> fiveStarMembershipTag = new TupleTag<User>(){}; final TupleTag<User> goldenMembershipTag = new TupleTag<User>(){}; final TupleTag<User> diamondMembershipTag = new TupleTag<User>(){}; PCollection<User> userCollection = ...; PCollectionTuple mixedCollection = userCollection.apply(ParDo .of(new DoFn<User, User>() { @ProcessElement public void processElement(ProcessContext c) { if (isFiveStartMember(c.element())) { c.output(c.element()); } else if (isGoldenMember(c.element())) { c.output(goldenMembershipTag, c.element()); } else if (isDiamondMember(c.element())) { c.output(diamondMembershipTag, c.element()); } } }) .withOutputTags(fiveStarMembershipTag, TupleTagList.of(goldenMembershipTag).and(diamondMembershipTag))); // 分离出不同的用户群组 mixedCollection.get(fiveStarMembershipTag).apply(...); mixedCollection.get(goldenMembershipTag).apply(...); mixedCollection.get(diamondMembershipTag).apply(...);
合并模式的 Pipeline
PCollectionList<Image> collectionList = PCollectionList.of(internalImages).and(thirdPartyImages); PCollection<Image> mergedCollectionWithFlatten = collectionList .apply(Flatten.<Image>pCollections()); mergedCollectionWithFlatten.apply(...);