zoukankan      html  css  js  c++  java
  • Flink-读取文件的方式(三)

    从HDFS上读取文件

    //在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行类型转换
    import org.apache.flink.streaming.api.scala._
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // readTextFile 只读一次
        val textStream = env.readTextFile("hdfs://ke01:8020/xiaoke002/two")
        textStream.print()
        env.execute()
    
    结果:
    6> 1,小明
    8> 2,小红
    11> 3,小柯
    2> 4,wang
    
    readTextFile底层代码:调用readFile,传入了 FileProcessingMode.PROCESS_ONCE(只允许一次)
    
    so: 需要持续读取数据需要自己写readFile()方法, 使用FileProcessingMode.PROCESS_CONTINUOUSLY持续读入
     
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val filePath = "hdfs://ke01:8020/xiaoke002/two"
    val textInputFormat = new TextInputFormat(new Path(filePath))
    // 每隔10s中读取 hdfs上新增文件内容
    val textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
    textStream.print()
    env.execute()
    
    结果:持续运行,每次有新数据进来,都会全部读一遍
    12> 1,小明
    5> 3,小柯
    8> 4,wang
    2> 2,小红
    
    
    hdfs dfs -appendToFile test /xiaoke002/two 
    
    3> 3,小柯
    6> aa bb
    7> cc dd
    5> 4,wang
    1> 2,小红
    12> 1,小明

    从Kafka中读取文件

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.9.2</version>
    </dependency>

    flink-kafka生产文件

    package com.text
    import java.util.Properties
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.kafka.common.serialization.{StringSerializer}
    import scala.io._
    object FlinkKafkaProduct {
      def main(args: Array[String]): Unit = {
        val prop = new Properties()
        prop.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092")
        prop.setProperty("key.serializer", classOf[StringSerializer].getName)
        prop.setProperty("value.serializer", classOf[StringSerializer].getName)
    
        // 创建一个kafaka生产者对象
        var producer = new KafkaProducer[String, String](prop)
    
        val lines = Source.fromFile("D:\code\scala\test\test07\data\carFlow_all_column_test.txt").getLines()
    
        for (i <- 1 to 100) {
          for (elem <- lines) {
            val splits = elem.split(",")
            val monitorId = splits(0).replace("'", "")
            val carId = splits(2).replace("'", "")
            val timestamp = splits(4).replace("'", "")
            val speed = splits(6)
            val stringBuilder = new StringBuilder
            val info = stringBuilder.append(monitorId + "	").append(carId + "	").append(timestamp + "	").append(speed)
          // kafka是kv形式的,默认生产的时候为null, topic: flink-kafka key:i+"" value:info producer.send(
    new ProducerRecord[String, String]("flink-kafka", i+"", info.toString())) Thread.sleep(500) } } } }

    结果

    --ke02,ke03,ke04开启fakfa:
    kafka-server-start.sh -daemon ./server.properties
    
    --启动消费者
    kafka-console-consumer.sh  --new-consumer  --bootstrap-server  ke02:9092  --topic  flink-kafka --property print.key=true

    flink-kafka消费信息(key和value)

    package com.text
    import java.util.Properties
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
    import org.apache.flink.streaming.api.scala._
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    object FlinkKafkaConsumerKV {
      def main(args: Array[String]): Unit = {
        // flink_kafka消费带key和value
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092")
        properties.setProperty("group.id", "flink-kafka-001")
        properties.setProperty("key.deserializer", classOf[StringSerializer].getName)
        properties.setProperty("value.deserializer", classOf[StringSerializer].getName)
        val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
          // 停止条件是什么,false代表不停止
          override def isEndOfStream(nextElement: (String, String)): Boolean = false
          // 要进行序列化的字节流
          override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
            val key = new String(record.key(), "UTF-8")
            val value = new String(record.value(), "UTF-8")
            (key, value)
          }
          // 指定一下返回的数据类型, Flink提供的类型
          override def getProducedType: TypeInformation[(String, String)] = {
            createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
          }
        }, properties))
        stream.print()
        env.execute()
      }
    }

    flink-kafka消费信息(value)

    package com.text
    import java.util.Properties
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.flink.streaming.api.scala._
    object FlinkKafkaConsumerV { def main(args: Array[String]): Unit
    = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") properties.setProperty("group.id", "flink-kafka-001") properties.setProperty("key.deserializer", classOf[StringSerializer].getName) properties.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), properties)) stream.print() env.execute() } }

    自定义读取数据源

    package com.text
    import java.util.Random
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    object CustomSource {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        var flag = true
        val stream = env.addSource(new SourceFunction[String] {
          override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
            // run 读取任何地方数据(redis),  然后将数据发射出去
            val random = new Random()
            while (flag) {
              ctx.collect("send" + random.nextInt(100))
              Thread.sleep(500)
            }
          }
          override def cancel(): Unit = {
            flag = false
          }
        })
        stream.print()
        env.execute()
      }
    }


    结果:

    11> send16
    12> send34
    1> send9
    2> send69
    3> send59

     

    自定义读取数据源多并行度

    package com.text
    import java.util.Random
    import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    
    object CustomSourceParallelism {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        var flag = true
        val stream = env.addSource(new ParallelSourceFunction[String] {
          override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
            // run 读取任何地方数据(redis),  然后将数据发射出去
            val random = new Random()
            while (flag) {
              ctx.collect("send" + random.nextInt(100))
              Thread.sleep(500)
            }
          }
          override def cancel(): Unit = {
            flag = false
          }
        }).setParallelism(3)
        stream.print()
        env.execute()
      }
    }

    结果:

    2> send89
    12> send89
    12> send15
    1> send93
    1> send55



    问: 并行度是3, 线程id为什么这么多? 因为stream.print()没有设置并行度,默认是当前机器核数
    设置:stream.print().setParallelism(3)
    结果:

    3> send27
    2> send60
    1> send7
    3> send24
    2> send1
    1> send54
    2> send2
    3> send59
    1> send51
    3> send49

     
  • 相关阅读:
    CentOS查看系统信息和资源使用已经升级系统的命令
    192M内存的VPS,安装Centos 6 minimal x86,无法安装node.js
    Linux限制资源使用的方法
    多域名绑定同一IP地址,Node.js来实现
    iOS 百度地图大头针使用
    iOS 从app跳转到Safari、从app打开电话呼叫
    设置cell背景色半透明
    AsyncSocket 使用
    iOS 监听键盘变化
    iOS 7 标签栏控制器进行模态视图跳转后变成透明
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14873439.html
Copyright © 2011-2022 走看看