1:local环境
本地环境通过该方法实例化ExecutionEnvironment.createLocalEnvironment()。默认情况下,它将使用尽可能多的本地线程执行,因为您的机器具有CPU核心(硬件上下文)。您也可以指定所需的并行性。本地环境可以配置为使用enableLogging()/ 登录到控制台disableLogging()。
在大多数情况下,ExecutionEnvironment.getExecutionEnvironment()
是更好的方式。LocalEnvironment
当程序在本地启动时(命令行界面外),该方法会返回一个程序,并且当程序由命令行界面调用时,它会返回一个预配置的群集执行环境。
注意:本地执行环境不启动任何Web前端来监视执行。
object LocalEven { def main(args: Array[String]): Unit = { //TODO 初始化本地执行环境 val env: ExecutionEnvironment = ExecutionEnvironment.createLocalEnvironment() val path = "data2.csv" val data = env.readCsvFile[(String, String, String, String,String,Int,Int,Int)]( filePath = path, lineDelimiter = " ", fieldDelimiter = ",", ignoreFirstLine = true ) data.groupBy(0,1).first(100).print() } }
2:集合环境
使用集合的执行CollectionEnvironment是执行Flink程序的低开销方法。这种模式的典型用例是自动化测试,调试和代码重用。
用户也可以使用为批处理实施的算法,以便更具交互性的案例
请注意,基于集合的Flink程序的执行仅适用于适合JVM堆的小数据。集合上的执行不是多线程的,只使用一个线程
//TODO createCollectionsEnvironment val collectionENV = ExecutionEnvironment.createCollectionsEnvironment val path = "data2.csv" val data = collectionENV.readCsvFile[(String, String, String, String,String,Int,Int,Int)]( filePath = path, lineDelimiter = " ", fieldDelimiter = ",", ignoreFirstLine = true ) data.groupBy(0,1).first(50).print()
Flink程序可以在许多机器的集群上分布运行。有两种方法可将程序发送到群集以供执行:
1:命令行界面:
./bin/flink run ./examples/batch/WordCount.jar --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
2:使用代码中的远程环境提交
远程环境允许您直接在群集上执行Flink Java程序。远程环境指向要在其上执行程序的群集
Maven打包:
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.flink.DataStream.RemoteEven</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.10</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
val env: ExecutionEnvironment = ExecutionEnvironment.createRemoteEnvironment("hadoop01", 8081, "target/learning-flink-1.0-SNAPSHOT.jar") val data: DataSet[String] = env.readTextFile("hdfs://hadoop01:9000/README.txt") val flatMap_data: DataSet[String] = data.flatMap(line => line.toLowerCase().split("\W+")) val mapdata: DataSet[(String, Int)] = flatMap_data.map(line => (line , 1)) val groupData: GroupedDataSet[(String, Int)] = mapdata.groupBy(line => line._1) val result = groupData.reduce((x , y) => (x._1 , x._2+y._2)) result.writeAsText("hdfs://hadoop01:9000/remote") env.execute()