从HDFS上读取文件
//在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行类型转换 import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment // readTextFile 只读一次 val textStream = env.readTextFile("hdfs://ke01:8020/xiaoke002/two") textStream.print() env.execute() 结果: 6> 1,小明 8> 2,小红 11> 3,小柯 2> 4,wang readTextFile底层代码:调用readFile,传入了 FileProcessingMode.PROCESS_ONCE(只允许一次) so: 需要持续读取数据需要自己写readFile()方法, 使用FileProcessingMode.PROCESS_CONTINUOUSLY持续读入 val env = StreamExecutionEnvironment.getExecutionEnvironment val filePath = "hdfs://ke01:8020/xiaoke002/two" val textInputFormat = new TextInputFormat(new Path(filePath)) // 每隔10s中读取 hdfs上新增文件内容 val textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10) textStream.print() env.execute() 结果:持续运行,每次有新数据进来,都会全部读一遍 12> 1,小明 5> 3,小柯 8> 4,wang 2> 2,小红 hdfs dfs -appendToFile test /xiaoke002/two 3> 3,小柯 6> aa bb 7> cc dd 5> 4,wang 1> 2,小红 12> 1,小明
从Kafka中读取文件
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.2</version> </dependency>
flink-kafka生产文件
package com.text import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.{StringSerializer} import scala.io._ object FlinkKafkaProduct { def main(args: Array[String]): Unit = { val prop = new Properties() prop.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") prop.setProperty("key.serializer", classOf[StringSerializer].getName) prop.setProperty("value.serializer", classOf[StringSerializer].getName) // 创建一个kafaka生产者对象 var producer = new KafkaProducer[String, String](prop) val lines = Source.fromFile("D:\code\scala\test\test07\data\carFlow_all_column_test.txt").getLines() for (i <- 1 to 100) { for (elem <- lines) { val splits = elem.split(",") val monitorId = splits(0).replace("'", "") val carId = splits(2).replace("'", "") val timestamp = splits(4).replace("'", "") val speed = splits(6) val stringBuilder = new StringBuilder val info = stringBuilder.append(monitorId + " ").append(carId + " ").append(timestamp + " ").append(speed)
// kafka是kv形式的,默认生产的时候为null, topic: flink-kafka key:i+"" value:info producer.send(new ProducerRecord[String, String]("flink-kafka", i+"", info.toString())) Thread.sleep(500) } } } }
结果
--ke02,ke03,ke04开启fakfa: kafka-server-start.sh -daemon ./server.properties --启动消费者 kafka-console-consumer.sh --new-consumer --bootstrap-server ke02:9092 --topic flink-kafka --property print.key=true
flink-kafka消费信息(key和value)
package com.text import java.util.Properties import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.flink.streaming.api.scala._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object FlinkKafkaConsumerKV { def main(args: Array[String]): Unit = { // flink_kafka消费带key和value val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") properties.setProperty("group.id", "flink-kafka-001") properties.setProperty("key.deserializer", classOf[StringSerializer].getName) properties.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { // 停止条件是什么,false代表不停止 override def isEndOfStream(nextElement: (String, String)): Boolean = false // 要进行序列化的字节流 override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(record.key(), "UTF-8") val value = new String(record.value(), "UTF-8") (key, value) } // 指定一下返回的数据类型, Flink提供的类型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, properties)) stream.print() env.execute() } }
flink-kafka消费信息(value)
package com.text import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.common.serialization.StringSerializer import org.apache.flink.streaming.api.scala._
object FlinkKafkaConsumerV { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") properties.setProperty("group.id", "flink-kafka-001") properties.setProperty("key.deserializer", classOf[StringSerializer].getName) properties.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), properties)) stream.print() env.execute() } }
自定义读取数据源
package com.text import java.util.Random import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object CustomSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var flag = true val stream = env.addSource(new SourceFunction[String] { override def run(ctx: SourceFunction.SourceContext[String]): Unit = { // run 读取任何地方数据(redis), 然后将数据发射出去 val random = new Random() while (flag) { ctx.collect("send" + random.nextInt(100)) Thread.sleep(500) } } override def cancel(): Unit = { flag = false } }) stream.print() env.execute() } }
结果:
11> send16
12> send34
1> send9
2> send69
3> send59
自定义读取数据源多并行度
package com.text import java.util.Random import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object CustomSourceParallelism { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var flag = true val stream = env.addSource(new ParallelSourceFunction[String] { override def run(ctx: SourceFunction.SourceContext[String]): Unit = { // run 读取任何地方数据(redis), 然后将数据发射出去 val random = new Random() while (flag) { ctx.collect("send" + random.nextInt(100)) Thread.sleep(500) } } override def cancel(): Unit = { flag = false } }).setParallelism(3) stream.print() env.execute() } }
结果:
2> send89
12> send89
12> send15
1> send93
1> send55
问: 并行度是3, 线程id为什么这么多? 因为stream.print()没有设置并行度,默认是当前机器核数
设置:stream.print().setParallelism(3)
结果:
3> send27
2> send60
1> send7
3> send24
2> send1
1> send54
2> send2
3> send59
1> send51
3> send49