zoukankan      html  css  js  c++  java
  • 【Flink学习笔记】01、flink快速入门

    学习链接(尚硅谷):https://www.bilibili.com/video/BV1Qp4y1Y7YN

    一、flink简介

    flink的特点

    • 低延迟
    • 高吞吐
    • 能正确地处理数据和容错机制

    什么是事件驱动?

    就像tomcat一样,从启动之后就一直处于运行状态,只要有请求事件过来,就会进行处理

    二、flink快速体验

    当前scala版本为2.11.8
    在这里插入图片描述
    如果在这个版本下,使用scala_2.12和1.10.1版本的话会报错的哦:

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
    

    在这里插入图片描述
    上面的是报错的截图,即使我导入了隐式转换,这是我遇到的问题做个记录,请忽略,直接进行下面的操作

    1、创建maven工程flinklearning

    2、添加pom依赖:

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.8.0</version>
            </dependency>
        </dependencies>
    
        <build>
        <plugins>
        <!-- 该插件用于将Scala代码编译成class文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <!-- 声明绑定到maven的compile阶段 -->
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin></plugins>
        </build>
    

    3、数据文件:

    hello world
    nice to meet you
    how are you
    im fine
    

    4、单词统计类:

    package com.learning.flink
    
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.api.scala._
    
    object WordCount {
      case class WordWithCount(world :String, count: Int)
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
        environment.setParallelism(1)
        var inputPath = "D:\IDEA_WORKSPACE\BIGDATA-LEARNING\flinklearning\src\main\resources\wc.txt"
        val inputData: DataSet[String] = environment.readTextFile(inputPath)
        val result: AggregateDataSet[(String, Int)] = inputData.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
        result.print()
      }
    }
    
    

    5、输出结果:

    (are,1)
    (fine,1)
    (hello,1)
    (how,1)
    (im,1)
    (meet,1)
    (nice,1)
    (to,1)
    (world,1)
    (you,2)
    

    三、flink流式处理

    1、单词统计类:

    package com.learning.flink
    
    import org.apache.flink.streaming.api.scala._
    
    object WordCount {
      case class WordWithCount(world :String, count: Int)
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val textDstream = env.socketTextStream("192.168.66.11", 7777)
        val result = textDstream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
        result.print()
    
        env.execute("启动")
      }
    }
    
    

    有坑,一定要注意导包,不用的包就删掉,不要留着,我之前就是因为留着一些没用的包,导致一直报红

    2、在linux监听一个端口,并且发送数据

    [root@cdh01 ~]# nc -lk 7777
    hello word
    hello you
    

    3、输出结果:

    4> (hello,1)
    9> (word,1)
    7> (you,1)
    4> (hello,2)
    

    4、可以使用工具类获取参数

        val params: ParameterTool = ParameterTool.fromArgs(args)
        val host = params.get("host")
        val port = params.get("port")
    
  • 相关阅读:
    oracle11g数据库安装
    WIN7+QT5.2.0 连接oracle11g问题及解决方法
    WIN7+Qt5.2.0连接oracle数据库的oci驱动的编译
    Qt编译Oracle OCI驱动
    Qt 中 Oracle 数据库 QOCI 驱动问题及解决
    qt QThread
    Qt之模型/视图(自定义风格)
    Qt之模型/视图(实时更新数据)
    Qt之模型/视图(委托)
    AE地图查询
  • 原文地址:https://www.cnblogs.com/tangliping/p/14318290.html
Copyright © 2011-2022 走看看