zoukankan      html  css  js  c++  java
  • Flink--本地执行和集群执行

    本地执行

    1:local环境

    LocalEnvironment是Flink程序本地执行的句柄。用它在本地JVM中运行程序 - 独立运行或嵌入其他程序中。

    本地环境通过该方法实例化ExecutionEnvironment.createLocalEnvironment()。默认情况下,它将使用尽可能多的本地线程执行,因为您的机器具有CPU核心(硬件上下文)。您也可以指定所需的并行性。本地环境可以配置为使用enableLogging()/ 登录到控制台disableLogging()。

    在大多数情况下,ExecutionEnvironment.getExecutionEnvironment()是更好的方式。LocalEnvironment当程序在本地启动时(命令行界面外),该方法会返回一个程序,并且当程序由命令行界面调用时,它会返回一个预配置的群集执行环境。

    注意:本地执行环境不启动任何Web前端来监视执行。

    object LocalEven {
      def main(args: Array[String]): Unit = {
        //TODO 初始化本地执行环境
        val env: ExecutionEnvironment = ExecutionEnvironment.createLocalEnvironment()
        val path = "data2.csv"
        val data = env.readCsvFile[(String, String, String, String,String,Int,Int,Int)](
            filePath = path,
            lineDelimiter = "
    ",
            fieldDelimiter = ",",
            ignoreFirstLine = true
        )
        data.groupBy(0,1).first(100).print()
      }
    }
    2:集合环境

    使用集合的执行CollectionEnvironment是执行Flink程序的低开销方法。这种模式的典型用例是自动化测试,调试和代码重用。

    用户也可以使用为批处理实施的算法,以便更具交互性的案例

    请注意,基于集合的Flink程序的执行仅适用于适合JVM堆的小数据。集合上的执行不是多线程的,只使用一个线程

    //TODO createCollectionsEnvironment
    val collectionENV = ExecutionEnvironment.createCollectionsEnvironment
    val path = "data2.csv"
    val data = collectionENV.readCsvFile[(String, String, String, String,String,Int,Int,Int)](
        filePath = path,
        lineDelimiter = "
    ",
        fieldDelimiter = ",",
        ignoreFirstLine = true
    )
    data.groupBy(0,1).first(50).print()

    集群执行:

    Flink程序可以在许多机器的集群上分布运行。有两种方法可将程序发送到群集以供执行:

    1:命令行界面:
    ./bin/flink run ./examples/batch/WordCount.jar 
                             --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    2:使用代码中的远程环境提交

    远程环境允许您直接在群集上执行Flink Java程序。远程环境指向要在其上执行程序的群集

    Maven打包:

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.flink.DataStream.RemoteEven</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    View Code
    val env: ExecutionEnvironment = ExecutionEnvironment.createRemoteEnvironment("hadoop01", 8081, "target/learning-flink-1.0-SNAPSHOT.jar")
    val data: DataSet[String] = env.readTextFile("hdfs://hadoop01:9000/README.txt")
    val flatMap_data: DataSet[String] = data.flatMap(line => line.toLowerCase().split("\W+"))
    val mapdata: DataSet[(String, Int)] = flatMap_data.map(line => (line , 1))
    val groupData: GroupedDataSet[(String, Int)] = mapdata.groupBy(line => line._1)
    val result = groupData.reduce((x , y) => (x._1 , x._2+y._2))
    result.writeAsText("hdfs://hadoop01:9000/remote")
    env.execute()
  • 相关阅读:
    把SVN和jenkins连起来--有人提代码就能自动build!
    Redis设置认证密码 Redis使用认证密码登录 在Redis集群中使用认证密码
    JwtAuthenticationTokenFilter 实现shiro 利用 token 信息完成令牌登录
    Jenkins war deploy Shell
    jenkins 不执行部署 tomcat
    CentOS 7 设置 svn 开机启动
    502 Bad Gateway nginx/1.12.2 tomcat
    shiro 集成 JWT 自动获取token对应的用户信息
    org.apache.shiro.session.UnknownSessionException: There is no session with
    多角色分库情况下shiro开发
  • 原文地址:https://www.cnblogs.com/niutao/p/10548478.html
Copyright © 2011-2022 走看看