zoukankan      html  css  js  c++  java
  • Flink(五) —— DataStream API

    Source

    从自定义的集合中读取数据

    /**
        * 从集合中读取数据
        */
      def readDataFromCollection(): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 1.从自定义的集合中读取数据
        val list = List(
          SensorReading("sensor1", 153242, 35.8),
          SensorReading("sensor2", 153222, 15.4),
          SensorReading("sensor3", 153142, 6.7),
          SensorReading("sensor4", 151242, 38.7))
    
        val stream1 = env.fromCollection(list)
    
        stream1.print("stream1").setParallelism(1)
    
        env.execute("source test")
      }
    

    从Kafka中读取数据

    引入依赖

        <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    
    

    代码

      /**
        * 从kafka中读取数据
        */
      def readDataFromKafka(): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val props = new Properties()
        props.setProperty("bootstrap.servers", "localhost:9092")
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.setProperty("group.id", "flink-demo")
        props.setProperty("auto.offset.reset", "latest")
    
    
        val stream1 = env.addSource(new FlinkKafkaConsumer010[String]("flinkdemo",new SimpleStringSchema(),props))
    
        stream1.print("stream1").setParallelism(1)
    
        env.execute("source test")
      }
    
    

    从自定义的Source中读取数据

    
    class SensorSource() extends SourceFunction[SensorReading] {
    
      var running: Boolean = true
    
      // 取消数据源的生成
      override def cancel(): Unit = {
        running = false
      }
    
      // 生成数据
      override def run(sourceContext: SourceContext[SensorReading]): Unit = {
        // 初始化一个随机数发生器
        val rand = new Random()
    
        var curTemp = 1.to(10).map(
          i => ("sensor_" + i, 60 + rand.nextGaussian() * 20)
        )
    
        while (running) {
    
          curTemp = curTemp.map(
            t => (t._1, t._2 + rand.nextGaussian())
          )
    
          val curTime = System.currentTimeMillis()
    
          curTemp.foreach(
            t => sourceContext.collect(SensorReading(t._1, curTime, t._2))
          )
    
          Thread.sleep(500)
    
        }
      }
    }
    
    
    

    Transform

    样例数据

    senor_1,1,10
    senor_2,2,20
    senor_3,3,40
    senor_4,4,30
    senor_5,5,30
    senor_6,6,60
    senor_1,7,70
    

    map、reduce、keyBy

    map

    • DataStream -> DataStream
    • 通过应用给定的函数,对原先DataStream中的每个元素进行处理,获得一个新的DataStream

    keyBy

    • DataStream -> KeyedStream[T,JavaTuple]
    • 对DataStream中的元素按照给定的表达式进行分组

    reduce

    • KeyedStream -> DataStream
    • 通过规约原有DataStream中的元素,返回一个新的DataStream
    
      /**
        * 使用map、reduce
        */
      def testMap(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val streamFromFile = env.readTextFile("senor.txt")
        val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).toLong, dataArray(2).trim.toDouble)
        })
          .keyBy("id")
          .reduce((x, y) => {
            SensorReading(x.id, x.timestamp + 1, y.temperature + x.temperature)
          })
    
        dataStream.print()
    
        env.execute()
      }
    
    
    

    split、select

    split

    • DataStream → SplitStream
    • 按照指定标准将指定的DataStream拆分成多个流用SplitStream来表示

    select

    • SplitStream → DataStream
    • 跟split搭配使用,从SplitStream中选择一个或多个流
    def testSplit(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val streamFromFile = env.readTextFile("senor.txt")
        val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).toLong, dataArray(2).trim.toDouble)
        })
    
        // 多流转换算子
        val splitStream = dataStream.split(data => {
          if (data.temperature > 20) Seq("high") else Seq("low")
        })
    
        val high = splitStream.select("high")
        val low = splitStream.select("low")
        val all = splitStream.select("high", "low")
    
        high.print("high")
        low.print("low")
        all.print("all")
    
        env.execute()
      }
    
    

    connect、coMap、coFlatMap

    connect

    • DataStream,DataStream -> ConnectedStreams

    coMap

    • ConnectedStreams -> DataStream
    
    def testConnect(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val streamFromFile = env.readTextFile("senor.txt")
        val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).toLong, dataArray(2).trim.toDouble)
        })
    
        // 多流转换算子
        val splitStream = dataStream.split(data => {
          if (data.temperature > 20) Seq("high") else Seq("low")
        })
    
        val high = splitStream.select("high")
        val low = splitStream.select("low")
    
        // 创建一个新的数据流,数据类型与high、low不同
        val warning = high.map(data => (data.id, data.temperature))
        // 得到ConnectedStreams[T, T2]
        val connectedStreams = warning.connect(low)
        val coMapDataStreams = connectedStreams.map(data1 => (data1._1, data1._2, "warning"), data2 => (data2.temperature, "health"))
    
        coMapDataStreams.print()
    
        env.execute()
      }
    
    

    UDF函数

    Filter

    
    def testFilter(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val streamFromFile = env.readTextFile("senor.txt")
        val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).toLong, dataArray(2).trim.toDouble)
        })
    
        dataStream.filter(new MyFilter()).print()
    
        env.execute()
      }
    
    class MyFilter() extends FilterFunction[SensorReading] {
        override def filter(value: SensorReading): Boolean = {
          return value.id.startsWith("senor_1")
        }
      }
    
    

    Sink

    
      def testFlinkSink2Kafka(): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val streamFromFile = env.readTextFile("senor.txt")
    
        // Transform操作
        val dataStream = streamFromFile.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).toLong, dataArray(2).trim.toDouble).toString
        })
    
        // sink
        dataStream.addSink(new FlinkKafkaProducer010[String]("localhost:9092", "sinkTest", new SimpleStringSchema()))
    
        env.execute()
      }
    
    

    参考文档

    Basic API Concepts
    Flink算子使用方法及实例演示:union和connect

  • 相关阅读:
    【HTML5】html5在ie8及以下的兼容性问题
    【前端】从登录框看前端
    批量插入
    Mongodb地理空间索引
    Mongodb添加地理位置索引
    记一 次docker-compose build报错
    The method's class, com.google.common.collect.FluentIterable, is available from the following locations
    如果在chrome的新标签中继续打开开发工具
    设置idea 2018 的vmoptions无效
    spring boot(2):activiti整合
  • 原文地址:https://www.cnblogs.com/fonxian/p/12345930.html
Copyright © 2011-2022 走看看