zoukankan      html  css  js  c++  java
  • Flink 流处理API之一

    1、 Environment

    1.1 getExecutionEnvironment

    • 创建一个执行环境,表示当前执行程序的上下文。
    • 如果程序是独立调用的,则此方法返回本地执行环境
    • 如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境
    • 也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

    批处理环境

    val env = ExecutionEnvironment.getExecutionEnvironment

    流式数据处理环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1

    1.2 createLocalEnvironment

    返回本地执行环境,需要在调用时指定默认的并行度。

    val env = StreamExecutionEnvironment.createLocalEnvironment(1)

    1.3 createRemoteEnvironment

    返回集群执行环境,将Jar提交到远程服务器。

    需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

    val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")

    2、Source

    2.1 从集合中读取数据

    def main(args: Array[String]): Unit = {
        val env1: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val dataList = List(
            ("1", 1231231L, 200),
            ("2", 1231231L, 201),
            ("3", 1231231L, 202)
        ).map{
            case (id, ts, vc) => {
                WaterSensor( id, ts, vc )
            }
        }
        val dataDS: DataStream[WaterSensor] = env1.fromCollection(dataList)
        dataDS.print()   
        env1.execute()   
    }
    
    case class WaterSensor(id:String, ts:Long, vc:Double)
    
        def main(args: Array[String]): Unit = {
            
            val env: StreamExecutionEnvironment =
     StreamExecutionEnvironment.getExecutionEnvironment
        
            val sensorDS: DataStream[WaterSensor] = env.fromCollection(
                List(
                    WaterSensor("ws_001", 1577844001, 45.0),
                    WaterSensor("ws_002", 1577844015, 43.0),
                    WaterSensor("ws_003", 1577844020, 42.0)
                )
            )
        
            sensorDS.print()
            
            env.execute("sensor")
        }

    2.2 从文件读取数据

    // TODO 从文件中获取数据源
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    
    // 相对路径
    //val fileDS: DataStream[String] = env.readTextFile("input/word.txt")
    // Flink默认无法识别hdfs协议,需要引入相关jar包
    val fileDS: DataStream[String] = env.readTextFile("hdfs://linux1:9000/directory/app-20191213160742-0000")
    fileDS.print("file>>>>")
    env.execute()

    2.3 从Kafka中读取数据

    引入kafka连接器的依赖

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
    
    // TODO 从文件中获取数据源
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    
    // 使用kafka作为数据源
    val properties = new java.util.Properties()
    properties.setProperty("bootstrap.servers", "linux1:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    
    val kafkaDS = env.addSource( new FlinkKafkaConsumer011[String]("waterSensor", new SimpleStringSchema(), properties) )
    kafkaDS.print("kafka>>>>")
    env.execute()

    2.4 自定义source

    def main(args: Array[String]): Unit = {
        
        // TODO 从文件中获取数据源
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
      
        env.addSource( new MySource() ).print("mine>>>>")
        
        env.execute()
        
    }
    // 自定义数据源
    // 1. 继承SourceFunction
    // 2. 重写方法
    class MySource extends SourceFunction[WaterSensor]{
        
        private var flg = true
        
        // 运行数据采集逻辑
         override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
             while ( flg ) {
                 // 将数据由数据源环境进行采集
                 ctx.collect(WaterSensor( "1", 1L, 1 ))
                 Thread.sleep(200)
             }
         }
    
        // 取消数据采集
        override def cancel(): Unit = {
            flg = false
        }
    }

    3、Sink

    • Flink没有类似于spark中foreach方法,让用户进行迭代的操作。
    • 所有对外的输出操作都要利用Sink完成。
    • 最后通过类似如下方式完成整个任务最终输出操作。

    stream.addSink(new MySink(xxxx))

    print方法其实就是一种Sink

    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
        return addSink(printFunction).name("Print to Std. Out");
    }

    官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

    3.1 Kafka

    增加依赖关系:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>

    主函数中添加sink:

    //向kafka中写入数据
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1)
    val ds: DataStream[String] = env.readTextFile("input/word.txt")
    
    ds.addSink( new FlinkKafkaProducer011[String]( "linux1:9092", "waterSensor", new SimpleStringSchema() ) )
    
    env.execute()

    通过kafka消费者控制台查看:

    bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic sensor

    3.2 Redis

    增加依赖关系:

    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>

    定义一个redis的mapper类,用于定义保存到redis时调用的命令:

    // TODO 向kafka中写入数据
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1)
    
    val ds: DataStream[String] = env.readTextFile("input/word.txt")
    
    val conf = new FlinkJedisPoolConfig.Builder().setHost("linux4").setPort(6379).build()
    ds.addSink( new RedisSink[String](conf, new RedisMapper[String] {
        override def getCommandDescription: RedisCommandDescription = {
            new RedisCommandDescription(RedisCommand.HSET, "word")
        }
    
        override def getKeyFromData(t: String): String = {
            t.split(" ")(1)
        }
    
        override def getValueFromData(t: String): String = {
            t.split(" ")(0)
        }
    }))
    
    env.execute()

    访问redis客户端查看数据:

    HGETALL sensor

    3.3 Elasticsearch  

    增加依赖关系:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>

    在主函数中调用:

    // TODO 向kafka中写入数据
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1)
    
    val list = List(
        WaterSensor("sensor_1", 150000L, 25),
        WaterSensor("sensor_1", 150001L, 27),
        WaterSensor("sensor_1", 150005L, 30),
        WaterSensor("sensor_1", 150007L, 40)
    )
    
    val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list)
    
    val httpHosts = new java.util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("linux1", 9200))
    val esSinkBuilder = new ElasticsearchSink.Builder[WaterSensor]( httpHosts, new ElasticsearchSinkFunction[WaterSensor] {
        override def process(t: WaterSensor, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
            println("saving data: " + t)
            val json = new java.util.HashMap[String, String]()
            json.put("data", t.toString)
            val indexRequest = Requests.indexRequest().index("water").`type`("readingData").source(json)
            requestIndexer.add(indexRequest)
            println("saved successfully")
        }
    } )
    
    waterSensorDS.addSink(esSinkBuilder.build())
    
    env.execute()

    在ES中查看:

    • 访问路径:http://linux1:9200/_cat/indices?v
    • 访问路径:http://linux1:9200/sensor/_search

    3.4 JDBC

    增加依赖关系:

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>

    添加MyJdbcSink:

    def main(args: Array[String]): Unit = {
        
        // TODO 向JDBC中写入数据
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
        env.setParallelism(1)
    
        val list = List(
            WaterSensor("sensor_1", 150000L, 25),
            WaterSensor("sensor_1", 150001L, 27),
            WaterSensor("sensor_1", 150005L, 30),
            WaterSensor("sensor_1", 150007L, 40)
        )
    
        val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list)
    
        waterSensorDS.addSink( new MyJDBCSink )
        
        env.execute()
        
    }
    // 自定义Sink
    // 1. 继承 RichSinkFunction
    // 2. 重写方法
    class MyJDBCSink extends RichSinkFunction[WaterSensor] {
        
        private var conn : Connection = _
        private var pstat : PreparedStatement = _
        
        override def open(parameters: Configuration): Unit = {
            //Class.forName()
            conn = DriverManager.getConnection("jdbc:mysql://linux1:3306/rdd", "root", "000000")
            pstat = conn.prepareStatement("insert into user (id, name, age) values (?, ?, ?)")
        }
        override def invoke(ws: WaterSensor, context: SinkFunction.Context[_]): Unit = {
            pstat.setInt(1, 1)
            pstat.setString(2, ws.id)
            pstat.setInt(3, ws.vc)
            pstat.executeUpdate()
        }
    
        override def close(): Unit = {
            pstat.close()
            conn.close()
        }
    }

    3.5 HDFS

    The BucketingSink has been deprecated since Flink 1.9 and will be removed in subsequent releases. Please use the StreamingFileSink instead.

    3.5.1 BucketingSink:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-filesystem_2.11</artifactId>
      <version>1.10.0</version>
    </dependency>
    
    val input: DataStream[String] = ...
    
    input.addSink(new BucketingSink[String]("/base/path"))

    By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern "yyyy-MM-dd--HH" to name the buckets

    There are two configuration options that specify when a part file should be closed and a new one started:

    • By setting a batch size (The default part file size is 384 MB)
    • By setting a batch roll over time interval (The default roll over interval is Long.MAX_VALUE)
    // the SequenceFileWriter only works with Flink Tuples
    import org.apache.flink.api.java.tuple.Tuple2
    val input: DataStream[Tuple2[A, B]] = ... 
    
    val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
    sink.setWriter(new SequenceFileWriter[IntWritable, Text])
    sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    
    input.addSink(sink)

    This will create a sink that writes to bucket files that follow this schema:

    /base/path/{date-time}/part-{parallel-task}-{count}

    3.5.2 StreamingFileSink

    File Formats

    The StreamingFileSink supports both row-wise and bulk encoding formats, such as Apache Parquet. These two variants come with their respective builders that can be created with the following static methods:

    • Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
    • Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)

    Row-encoded Formats

    import org.apache.flink.api.common.serialization.SimpleStringEncoder
    import org.apache.flink.core.fs.Path
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
    
    val input: DataStream[String] = ...
    
    val sink: StreamingFileSink[String] = StreamingFileSink
        .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                .withMaxPartSize(1024 * 1024 * 1024)
                .build())
        .build()
    
    input.addSink(sink)

    Bulk-encoded Formats

    Flink comes with three built-in BulkWriter factories:

    • ParquetWriterFactory
    • SequenceFileWriterFactory
    • CompressWriterFactory

    Parquet format

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-parquet_2.11</artifactId>
      <version>1.10.0</version>
    </dependency>
    
    
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
    import org.apache.avro.Schema
    
    val schema: Schema = ...
    val input: DataStream[GenericRecord] = ...
    
    val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
        .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
        .build()
    
    input.addSink(sink)

    Hadoop SequenceFile format

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sequence-file</artifactId>
      <version>1.10.0</version>
    </dependency>
    
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    import org.apache.flink.configuration.GlobalConfiguration
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.io.LongWritable
    import org.apache.hadoop.io.SequenceFile
    import org.apache.hadoop.io.Text;
    
    val input: DataStream[(LongWritable, Text)] = ...
    val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
    val sink: StreamingFileSink[(LongWritable, Text)] = StreamingFileSink
      .forBulkFormat(
        outputBasePath,
        new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
        .build()
    
    input.addSink(sink)
  • 相关阅读:
    CentOS-7 虚拟机意外断电后的数据恢复
    CentOS7 搭建VNC 服务
    CentOS7-ulimit
    CentOS-7 初始化配置
    Centos Bond设置实例
    服务检测脚本
    sshd服务
    input常用属性
    前端工具-Sublime、WebStorm-快捷方式使用
    随机抽选效果、随机抽选红色球
  • 原文地址:https://www.cnblogs.com/hyunbar/p/12632931.html
Copyright © 2011-2022 走看看