zoukankan      html  css  js  c++  java
  • flink--DateSet开发--简单入门

    开发流程

    1.    获得一个execution environment,
    2.    加载/创建初始数据,
    3.    指定这些数据的转换,
    4.    指定将计算结果放在哪里,
    5.    触发程序执行

    例子:

    object DataSet_WordCount {
      def main(args: Array[String]) {
        //TODO 初始化环境
        val env = ExecutionEnvironment.getExecutionEnvironment
        //TODO 加载/创建初始数据
        val text = env.fromElements(
          "Who's there?",
          "I think I hear them. Stand, ho! Who's there?")
        //TODO 指定这些数据的转换
        val split_words = text.flatMap(line => line.toLowerCase().split("\W+"))
        val filter_words = split_words.filter(x=> x.nonEmpty)
        val map_words = filter_words.map(x=> (x,1))
        val groupBy_words = map_words.groupBy(0)
        val sum_words = groupBy_words.sum(1)
        //todo 指定将计算结果放在哪里
    //    sum_words.setParallelism(1)//汇总结果
        sum_words.writeAsText(args(0))//"/Users/niutao/Desktop/flink.txt"
        //TODO 触发程序执行
        env.execute("DataSet wordCount")
      }
    }

    将程序打包,提交到yarn

    添加maven打包插件:

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
    
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
    
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!--<arg>-make:transitive</arg>-->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
    
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
    
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                        -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.nt.DEMO.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
    
                </executions>
            </plugin>
        </plugins>
    </build>
    View Code

    使用rz命令上传jar包,然后执行程序:

    bin/flink run -m yarn-cluster -yn 2 /home/elasticsearch/flinkjar/itcast_learn_flink-1.0-SNAPSHOT.jar com.nt.DEMO.WordCount

    在yarn的8088页面可以观察到提交的程序

    去/opt/cdh/flink-1.3.2/flinkJAR文件夹下可以找到输出的运行结果

  • 相关阅读:
    [SQL Basics] Indexes
    [DS Basics] Data structures
    [Algorithm Basics] Search
    [Algorithm Basics] Sorting, LinkedList
    [Java Basics] multi-threading
    [Java Basics] Reflection
    [Java Basics] Collection
    SSH basics
    纯css实现三角形
    2015年百度实习生前端笔试题上海卷a
  • 原文地址:https://www.cnblogs.com/niutao/p/10548371.html
Copyright © 2011-2022 走看看