zoukankan      html  css  js  c++  java
  • flink学习-使用IDEA+SCALA编写WordCount

     

    目录

      1 前置条件

        1.1 系统环境

        1.2 确认环境

      2 创建项目

      3 配置项目

      4 编写代码

      5 运行项目

    1 前置条件

    1.1 系统环境

      需要安装maven、Java、Scala。

    1.2 确认条件

      确认Java和Scala版本,Scala版本与后续项目配置有关系。

      1)确认Java版本

        

       2)确认Scala版本

        

    2 创建项目

      使用maven创建:

        创建命令:

            mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.9.0

         注:-DarchetypeVersion=1.9.0为flink版本号,要一致,也可以在创建后的项目进行修改。

    3 配置项目

      1) 查看pom.xml文件

        

          将flink和scala版本号与系统环境保持一致

    <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
    
      http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>groupId</groupId>
        <artifactId>artifactId</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>Flink Quickstart Job</name>
        <url>http://www.myorganization.org</url>
    
        <repositories>
            <repository>
                <id>apache.snapshots</id>
                <name>Apache Development Snapshot Repository</name>
                <url>https://repository.apache.org/content/repositories/snapshots/</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
            </repository>
        </repositories>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <flink.version>1.9.2</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <scala.version>2.12.11</scala.version>
        </properties>
    
        <dependencies>
            <!-- Apache Flink dependencies -->
            <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- Scala Library, provided by Flink as well. -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
                <version>1.9.2</version>
            </dependency>
            <!-- Add connector dependencies here. They must be in the default scope (compile). -->
    
            <!-- Example:
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            -->
    
            <!-- Add logging framework, to produce console output when running in the IDE. -->
            <!-- These dependencies are excluded from the application JAR by default. -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
                <scope>runtime</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
                <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <artifactSet>
                                    <excludes>
                                        <exclude>org.apache.flink:force-shading</exclude>
                                        <exclude>com.google.code.findbugs:jsr305</exclude>
                                        <exclude>org.slf4j:*</exclude>
                                        <exclude>log4j:*</exclude>
                                    </excludes>
                                </artifactSet>
                                <filters>
                                    <filter>
                                        <!-- Do not copy the signatures in the META-INF folder.
                                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>groupId.StreamingJob</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- Java Compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <!-- Scala Compiler -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- Eclipse Scala Integration -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <version>2.8</version>
                    <configuration>
                        <downloadSources>true</downloadSources>
                        <projectnatures>
                            <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
                            <projectnature>org.eclipse.jdt.core.javanature</projectnature>
                        </projectnatures>
                        <buildcommands>
                            <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
                        </buildcommands>
                        <classpathContainers>
                            <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
                            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        </classpathContainers>
                        <excludes>
                            <exclude>org.scala-lang:scala-library</exclude>
                            <exclude>org.scala-lang:scala-compiler</exclude>
                        </excludes>
                        <sourceIncludes>
                            <sourceInclude>**/*.scala</sourceInclude>
                            <sourceInclude>**/*.java</sourceInclude>
                        </sourceIncludes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>build-helper-maven-plugin</artifactId>
                    <version>1.7</version>
                    <executions>
                        <!-- Add src/main/scala to eclipse build path -->
                        <execution>
                            <id>add-source</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/main/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                        <!-- Add src/test/scala to eclipse build path -->
                        <execution>
                            <id>add-test-source</id>
                            <phase>generate-test-sources</phase>
                            <goals>
                                <goal>add-test-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/test/scala</source>
                                </sources>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
        <!-- This profile helps to make things run out of the box in IntelliJ -->
        <!-- Its adds Flink's core classes to the runtime class path. -->
        <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
        <profiles>
            <profile>
                <id>add-dependencies-for-IDEA</id>
    
                <activation>
                    <property>
                        <name>idea.version</name>
                    </property>
                </activation>
    
                <dependencies>
                    <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-scala_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>compile</scope>
                    </dependency>
                    <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>compile</scope>
                    </dependency>
                    <dependency>
                        <groupId>org.scala-lang</groupId>
                        <artifactId>scala-library</artifactId>
                        <version>${scala.version}</version>
                        <scope>compile</scope>
                    </dependency>
    
                    <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka_2.12</artifactId>
                        <version>1.9.2</version>
                    </dependency>
                </dependencies>
            </profile>
        </profiles>
    
    </project>
    pom.xml配置文件

          

      2)Project Structure

        添加scala环境。

        

         注:第一次使用时IDEA默认不安装Scala插件,需要自己安装(打开个scala后缀文件,IDEA会自动提示安装)。

      4 编写代码

        1)新建Scala文件

          

           注:命名规范为首字母大写驼峰形式

          

       

        2)编写代码

          

    import org.apache.flink.api.scala._
    object WordCount {
      def main(args: Array[String]): Unit = {
    
        // set up the execution environment
        val env = ExecutionEnvironment.getExecutionEnvironment
    
        // get input data
        val text = env.fromElements(
          "hello, world!",
          "hello, world!",
          "hello, world!")
    
        val counts = text.flatMap { _.toLowerCase.split(" ") }
          .map { (_, 1) }
          .groupBy(0)
          .sum(1)
    
        // execute and print result
        counts.print()
    
    
      }
    }

      5 运行项目

        1)输出结果  

          

  • 相关阅读:
    快速找到由程序员到CTO发展道路上的问路石
    从大师身上反思
    你真的了解企业虚拟化吗?
    “驱网核心技术丛书”创作团队访谈
    程序员到CTO需要准备什么
    深入搜索引擎的关键——索引
    程序员到CTO必须注意的几个关键点
    微软全球MVP教你如何规划程序人生
    “碟中碟”虚拟光驱软件开发者——万春 读《寒江独钓——Windows内核安全编程 》有感
    常用jar包之commonscollection使用
  • 原文地址:https://www.cnblogs.com/qingkongwuyun/p/12659599.html
Copyright © 2011-2022 走看看