zoukankan      html  css  js  c++  java
  • Apache Beam WordCount编程实战及源码解读

    概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码

    Apache Beam WordCount编程实战及源码解读

    负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。

    1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。

    Apache Beam 于2017年1月10日成为Apache新的顶级项目。

    1.1.Apache Beam 特点:

    • 统一:对于批处理和流媒体用例使用单个编程模型。
    • 方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
    • 可扩展:编写和分享新的SDKs,IO连接器和transformation库
      部分翻译摘自官网:Apacher Beam 官网

    1.2.Apache Beam关键概念:

    1.2.1.Apache Beam SDKs

    主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。

    1.2.2. Apache Beam Pipeline Runners(Beam的执行器/执行者们),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多个大数据计算框架。可谓是一处Apache Beam编程,多计算框架运行。

    1.2.3. 他们的对如下的支持情况详见

    Apache Beam WordCount编程实战及源码解读

    2.Apache Beam编程实战–Apache Beam源码解读

    基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。

    2.1.源码解析-Apache Beam 数据流处理原理解析:

    关键步骤:

    • 创建Pipeline
    • 将转换应用于Pipeline
    • 读取输入文件
    • 应用ParDo转换
    • 应用SDK提供的转换(例如:Count)
    • 写出输出
    • 运行Pipeline

    Apache Beam WordCount编程实战及源码解读

    2.2.源码解析,完整项目Github源码,附WordCount,pom.xml等

    /**
     * MIT.
     * Author: wangxiaolei(王小雷).
     * Date:17-2-20.
     * Project:ApacheBeamWordCount.
     */
    
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.TextIO;
    import org.apache.beam.sdk.options.Default;
    import org.apache.beam.sdk.options.Description;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.Validation.Required;
    import org.apache.beam.sdk.transforms.Aggregator;
    import org.apache.beam.sdk.transforms.Count;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.SimpleFunction;
    import org.apache.beam.sdk.transforms.Sum;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    
    
    public class WordCount {
    
        /**
         *1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
         */
        static class ExtractWordsFn extends DoFn<String, String> {
            private final Aggregator<Long, Long> emptyLines =
                    createAggregator("emptyLines", Sum.ofLongs());
    
            @ProcessElement
            public void processElement(ProcessContext c) {
                if (c.element().trim().isEmpty()) {
                    emptyLines.addValue(1L);
                }
    
                // 将文本行划分为单词
                String[] words = c.element().split("[^a-zA-Z']+");
                // 输出PCollection中的单词
                for (String word : words) {
                    if (!word.isEmpty()) {
                        c.output(word);
                    }
                }
            }
        }
    
        /**
         *2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
         */
        public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
            @Override
            public String apply(KV<String, Long> input) {
                return input.getKey() + ": " + input.getValue();
            }
        }
        /**
         *3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
         */
        public static class CountWords extends PTransform<PCollection<String>,
                PCollection<KV<String, Long>>> {
            @Override
            public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
    
                // 将文本行转换成单个单词
                PCollection<String> words = lines.apply(
                        ParDo.of(new ExtractWordsFn()));
    
                // 计算每个单词次数
                PCollection<KV<String, Long>> wordCounts =
                        words.apply(Count.<String>perElement());
    
                return wordCounts;
            }
        }
    
        /**
         *4.可以自定义一些选项(Options),比如文件输入输出路径
         */
        public interface WordCountOptions extends PipelineOptions {
    
            /**
             * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
             */
            @Description("Path of the file to read from")
            @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
            String getInputFile();
            void setInputFile(String value);
    
            /**
             * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
             */
            @Description("Path of the file to write to")
            @Required
            String getOutput();
            void setOutput(String value);
        }
        /**
         * 5.运行程序
         */
        public static void main(String[] args) {
            WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                    .as(WordCountOptions.class);
            Pipeline p = Pipeline.create(options);
    
            p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                    .apply(new CountWords())
                    .apply(MapElements.via(new FormatAsTextFn()))
                    .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
    
            p.run().waitUntilFinish();
        }
    }

    3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意pom.xml模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)

    3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序

    • Spark运行

      • 设置VM options

        -DPapex-runner
      • 设置Programe arguments

        --inputFile=pom.xml --output=counts

    Apache Beam WordCount编程实战及源码解读

    3.2.intellij IDEA(社区版)中Apex,Flink等支持的大数据框架均可运行WordCount的Pipeline计算程序,完整项目Github源码

    • Apex运行

      • 设置VM options

        -DPapex-runner
      • 设置Programe arguments

        --inputFile=pom.xml --output=counts
    • Flink运行等等

      • 设置VM options

        -DPflink-runner
      • 设置Programe arguments

        --inputFile=pom.xml --output=counts

    4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)

    4.1.以下命令是下载官方示例源码,第一次运行下载较慢,如果失败了就多运行几次,(推荐下载,完整项目Github源码)直接用上述解读在intellij IDEA中运行。

    mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false
    

    Apache Beam WordCount编程实战及源码解读

    4.2.打包并运行

    mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

    Apache Beam WordCount编程实战及源码解读

    4.3.成功运行结果

    4.3.1.显示运行成功

    Apache Beam WordCount编程实战及源码解读

    4.3.2.WordCount输出计算结果

    这里写图片描述

  • 相关阅读:
    关于EasyUI datagrid 无法在dialog中显示的问题分析及解决方案!
    WPF 矩形框8个控制点伸缩及拖拽
    Socket异步通信及心跳包同时响应逻辑分析(最后附Demo)。
    C#断点续传下载。
    C# 全屏坐标及区域坐标获取。自定义光标及系统光标描边捕捉显示。
    解决项目无法添加VBIDE问题
    python爬虫-入门-了解爬虫
    字符串输入数字
    面试题3--数组中的重复数字(new数组的新写法)
    等号操作符重载为什么不能用友元函数大揭秘,以及函数没有等到重载的时候赋值会出现什么现象(盲点)
  • 原文地址:https://www.cnblogs.com/lanzhi/p/6467639.html
Copyright © 2011-2022 走看看