zoukankan      html  css  js  c++  java
  • Apache Beam WordCount编程实战及源码解读

    概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码

    Apache Beam WordCount编程实战及源码解读

    负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。

    1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。

    Apache Beam 于2017年1月10日成为Apache新的顶级项目。

    1.1.Apache Beam 特点:

    • 统一:对于批处理和流媒体用例使用单个编程模型。
    • 方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
    • 可扩展:编写和分享新的SDKs,IO连接器和transformation库
      部分翻译摘自官网:Apacher Beam 官网

    1.2.Apache Beam关键概念:

    1.2.1.Apache Beam SDKs

    主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。

    1.2.2. Apache Beam Pipeline Runners(Beam的执行器/执行者们),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多个大数据计算框架。可谓是一处Apache Beam编程,多计算框架运行。

    1.2.3. 他们的对如下的支持情况详见

    Apache Beam WordCount编程实战及源码解读

    2.Apache Beam编程实战–Apache Beam源码解读

    基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。

    2.1.源码解析-Apache Beam 数据流处理原理解析:

    关键步骤:

    • 创建Pipeline
    • 将转换应用于Pipeline
    • 读取输入文件
    • 应用ParDo转换
    • 应用SDK提供的转换(例如:Count)
    • 写出输出
    • 运行Pipeline

    Apache Beam WordCount编程实战及源码解读

    2.2.源码解析,完整项目Github源码,附WordCount,pom.xml等

    /**
     * MIT.
     * Author: wangxiaolei(王小雷).
     * Date:17-2-20.
     * Project:ApacheBeamWordCount.
     */
    
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.TextIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.Aggregator;
    import org.apache.beam.sdk.transforms.Count;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SimpleFunction;
    import org.apache.beam.sdk.transforms.Sum;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    
    
    public class WordCount {
    
        /**
         *1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
         */
        static class ExtractWordsFn extends DoFn<String, String> {
            private final Aggregator<Long, Long> emptyLines =
                    createAggregator("emptyLines", Sum.ofLongs());
    
            @ProcessElement
            public void processElement(ProcessContext c) {
                if (c.element().trim().isEmpty()) {
                    emptyLines.addValue(1L);
                }
    
                // 将文本行划分为单词
                String[] words = c.element().split("[^a-zA-Z']+");
                // 输出PCollection中的单词
                for (String word : words) {
                    if (!word.isEmpty()) {
                        c.output(word);
                    }
                }
            }
        }
    
        /**
         *2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
         */
        public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
            @Override
            public String apply(KV<String, Long> input) {
                return input.getKey() + ": " + input.getValue();
            }
        }
        /**
         *3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
         */
        public static class CountWords extends PTransform<PCollection<String>,
                PCollection<KV<String, Long>>> {
            @Override
            public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
    
                // 将文本行转换成单个单词
                PCollection<String> words = lines.apply(
                        ParDo.of(new ExtractWordsFn()));
    
                // 计算每个单词次数
                PCollection<KV<String, Long>> wordCounts =
                        words.apply(Count.<String>perElement());
    
                return wordCounts;
            }
        }
    
        /**
         *4.可以自定义一些选项(Options),比如文件输入输出路径
         */
        public interface WordCountOptions extends PipelineOptions {
    
            /**
             * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
             */
            @Description("Path of the file to read from")
            @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
            String getInputFile();
            void setInputFile(String value);
    
            /**
             * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
             */
            @Description("Path of the file to write to")
            @Required
            String getOutput();
            void setOutput(String value);
        }
        /**
         * 5.运行程序
         */
        public static void main(String[] args) {
            WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                    .as(WordCountOptions.class);
            Pipeline p = Pipeline.create(options);
    
            p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                    .apply(new CountWords())
                    .apply(MapElements.via(new FormatAsTextFn()))
                    .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
    
            p.run().waitUntilFinish();
        }
    }

    3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意pom.xml模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)

    3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序

    • Spark运行

      • 设置VM options

        -DPapex-runner
      • 设置Programe arguments

        --inputFile=pom.xml --output=counts

    Apache Beam WordCount编程实战及源码解读

    3.2.intellij IDEA(社区版)中Apex,Flink等支持的大数据框架均可运行WordCount的Pipeline计算程序,完整项目Github源码

    • Apex运行

      • 设置VM options

        -DPapex-runner
      • 设置Programe arguments

        --inputFile=pom.xml --output=counts
    • Flink运行等等

      • 设置VM options

        -DPflink-runner
      • 设置Programe arguments

        --inputFile=pom.xml --output=counts

    4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)

    4.1.以下命令是下载官方示例源码,第一次运行下载较慢,如果失败了就多运行几次,(推荐下载,完整项目Github源码)直接用上述解读在intellij IDEA中运行。

    mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false
    

    Apache Beam WordCount编程实战及源码解读

    4.2.打包并运行

    mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

    Apache Beam WordCount编程实战及源码解读

    4.3.成功运行结果

    4.3.1.显示运行成功

    Apache Beam WordCount编程实战及源码解读

    4.3.2.WordCount输出计算结果

    这里写图片描述

  • 相关阅读:
    hdu 1455 N个短木棒 拼成长度相等的几根长木棒 (DFS)
    hdu 1181 以b开头m结尾的咒语 (DFS)
    hdu 1258 从n个数中找和为t的组合 (DFS)
    hdu 4707 仓鼠 记录深度 (BFS)
    LightOJ 1140 How Many Zeroes? (数位DP)
    HDU 3709 Balanced Number (数位DP)
    HDU 3652 B-number (数位DP)
    HDU 5900 QSC and Master (区间DP)
    HDU 5901 Count primes (模板题)
    CodeForces 712C Memory and De-Evolution (贪心+暴力)
  • 原文地址:https://www.cnblogs.com/lanzhi/p/6467639.html
Copyright © 2011-2022 走看看