zoukankan      html  css  js  c++  java
  • Flink的流处理API(二)

    一、Environment

    1,getExecutionEnvironment

      getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

    2,createLocalEnvironment

      返回本地执行环境,需要在调用时指定默认的并行度。

    val env = StreamExecutionEnvironment.createLocalEnvironment(1) //parallelism
    env.execute() //stream流执行

    3,createRemoteEnvironment

      返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

    //hostname port jarFiles
    val env = ExecutionEnvironment.createRemoteEnvironment(host, port,"/flink/wc.jar")

    4,maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    二、Source

    1,基本读取方式

    //文件中读取
    val fileDs = env.readTextFile("in/tbStock.txt")
    //端口读取
    val socketDs = env.socketTextStream("localhost",777)
    //集合中获取
    val collectDs = env.fromCollection(List("aaa","bbb","ccc","aaa"))

    2,kafka source

    //kafka配置文件
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    //接收kafka的topic-demo这个topic发来的数据
    val kafkaDataStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("topic-demo", new SimpleStringSchema(), properties))

    3,Flink Kafa如何实现exactly-once

    可参考: https://www.aboutyun.com/forum.php?mod=viewthread&tid=27395

      Flink通过checkpoint来保存数据是否处理完成的状态

      由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

      执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

      如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。

       

    4,自定义source

    env.addSource(new MySource)
    //自定义source
    class MySource extends SourceFunction[(String,Double)] {
      //flag: 表示数据源是否还在正常运行
      var running: Boolean = true
      override def cancel(): Unit = {
        running = false
      }
      override def run(ctx: SourceFunction.SourceContext[(String,Double)]): Unit = {
        //初始化一个随机数发生器
        val rand = new Random()
        var curTemp = 1.to(10).map(
          i => ("item_" + i, 65 + rand.nextGaussian() * 20)
        )
        while (running) {
          curTemp.foreach(
            t => ctx.collect(t)
          )
          Thread.sleep(5000)  //每5秒钟产生一组数据
        }
      }
    }

    三、Transform

    1,基本转换算子

    //map
    val streamMap = stream.map { x => x * 2 }
    //flatmap
    val streamFlatMap = stream.flatMap{
        x => x.split(" ")
    }
    //filter
    val streamFilter = stream.filter{
        x => x == 1
    }

    2,KeyBy与Reduce

      keyBy(DataStream → KeyedStream):输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

      reduce(KeyedStream → DataStream):一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

    val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
    //reduce //sum
    keyedStream.reduce{  (ch1,ch2)=>
      (ch1._1,ch1._2+ch2._2)
    }.print()

    3,Split和Select

      split(DataStream → SplitStream):根据某些特征把一个DataStream拆分成两个或者多个DataStream。

      select(SplitStream→DataStream):从一个SplitStream中获取一个或者多个DataStream。

    //根据Item的id进行拆分
    val splitStream:SplitStream[Item] = dStream.split {
      item =>
        List(item.id)
    }
    //获取标记为item_1的数据集
    splitStream.select("item_1").print()

    4,Connect和CoMap

      connect(DataStream,DataStream → ConnectedStreams):连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

      CoMap,CoFlatMap(ConnectedStreams → DataStream):作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

    val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream)
    val allStream: DataStream[String] = connStream.map(
      (log1: StartUpLog) => log1.ch,
      (log2: StartUpLog) => log2.ch
    )

    4,Union

      DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

    val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
    unionStream.print("union:::")

    5,ConnectUnion 区别:

      1)Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。

      2)Connect只能操作两个流,Union可以操作多个

     

    四、实现UDF函数

    1,函数类(Function Classes)

      Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如:MapFunction, FilterFunction, ProcessFunction 等等。

    val flinkTweets = tweets.filter(new FlinkFilter)
    //自定义filter类
    class FlinkFilter extends FilterFunction[String] { override def filter(value: String): Boolean = { value.contains("flink") } }

    2,匿名函数(Lamda Functions)

    val flinkTweets = tweets.filter(_.contains("flink"))

    3,富含数(Rich Functions)

      富函数是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

      open()方法是 rich function 的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。

      close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

      getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。

     

    五、Sink

      Flink 没有类似于spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。

    1,kafka

    dstream.addSink(new FlinkKafkaProducer011[String]("linux01:9092","test", new SimpleStringSchema()))

    2,redis

    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    val config = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build()
    resultDStream.addSink(new RedisSink[Item](config,new MyRedisMapper))
    //定义redisMapper
    class MyRedisMapper extends RedisMapper[Item] {
      override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET,"item_test") //hkey
      }
      override def getKeyFromData(data: Item): String = data.id 
      override def getValueFromData(data: Item): String = data.toString
    }

    3,Elasticsearch

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
    //定义es的host集合
    val list = new util.ArrayList[HttpHost]()
    list.add(new HttpHost("linux01", 9200))
    //定义esBuilder
    val esBuilder = new ElasticsearchSink.Builder[Item](list,new ElasticsearchSinkFunction[Item] {
      override def process(element: Item, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        //定义es数据存储方式和存储值
        val json = new util.HashMap[String, String]()
        json.put("data", element.toString)
        //定义存储索引 type 和数据源
        val indexRequest = Requests.indexRequest().index("indexName").`type`("_doc").source(json)
        indexer.add(indexRequest)
      }
    })
    resultDStream.addSink(esBuilder.build())

    4,自定义sink(JDBC)

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>
    resultDStream.addSink(new MyJDBCSink)
    //自定义jdbcsink
    class MyJDBCSink extends RichSinkFunction[Sensor]{
      var conn: Connection = _
      var insertStmt: PreparedStatement = _
      var updateStmt: PreparedStatement = _
      //open 简历连接
      override def open(parameters: Configuration): Unit = {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
        insertStmt = conn.prepareStatement("INSERT INTO item_test (id, num) VALUES (?, ?)")
        updateStmt = conn.prepareStatement("UPDATE item_test SET num = ? WHERE id = ?")
      }
      //调用执行
      override def invoke(value: Sensor, context: SinkFunction.Context[_]): Unit = {
        updateStmt.setDouble(1, value.temp)
        updateStmt.setString(2, value.id)
        updateStmt.execute()
        if (updateStmt.getUpdateCount == 0) {
          insertStmt.setString(1, value.id)
          insertStmt.setDouble(2, value.temp)
          insertStmt.execute()
        }
      }
      //关闭资源
      override def close(): Unit = {
        insertStmt.close()
        updateStmt.close()
        conn.close()
      }
    }
  • 相关阅读:
    FckEditor添加右键菜单;增加编辑区右键图片删除功能(asp.net )(二)
    GridView空数据时显示表头
    FckEditor添加右键菜单;增加编辑区右键图片删除功能(asp.net )(一)
    《InsideUE4》6GamePlay架构(五)Controller
    《Inside UE4》1基础概念
    《Inside UE4》2GamePlay架构(一)Actor和Component
    《InsideUE4》7GamePlay架构(六)PlayerController和AIController
    《InsideUE4》3GamePlay架构(二)Level和World
    Medusa引擎开源了
    《Inside UE4》0开篇
  • 原文地址:https://www.cnblogs.com/bbgs-xc/p/13424553.html
Copyright © 2011-2022 走看看