zoukankan      html  css  js  c++  java
  • Flink之API的使用(1):Sink的使用

    相关文章链接

    Flink之API的使用(1):Sink的使用

    Flink之API的使用(2):Transform算子的使用

    Flink之API的使用(3):Source的使用

    具体代码如下所示:

    // 执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    
    // 获取数据
    val fileStream: DataStream[String] = env
        .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
    val sensorStream: DataStream[SensorReading] = fileStream.map(new MyMapToSensorReading)
    
    // 1、kafkaSink
    fileStream.addSink(new FlinkKafkaProducer[String]("cdh1:9092,cdh2:9092,cdh3:9092", "flinkTestTopic", new SimpleStringSchema()))
    
    // 2、Redis Sink(在Flink中,需要定义一个redis的mapper类,用于定义保存到redis时调用的命令)
    // 2.1、定义redis的连接信息
    val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
    // 2.2、构建redis的Mapper对象(通过实现接口方式)
    val redisMapper: RedisMapper[SensorReading] = new RedisMapper[SensorReading] {
        override def getCommandDescription: RedisCommandDescription = {
            new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
        }
    
        override def getKeyFromData(data: SensorReading): String = data.id
    
        override def getValueFromData(data: SensorReading): String = data.temperature.toString
    }
    // 2.3、通过 FlinkJedisPoolConfig 和 RedisMapper 创建 RedisSink 对象,并使用流进行sink添加
    sensorStream.addSink(new RedisSink[SensorReading](conf, redisMapper))
    
    // 3、Elasticsearch Sink
    // 3.1、定义ES的连接地址(httpHosts)
    val httpHosts: util.ArrayList[HttpHost] = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))
    // 3.2、定义一个 ElasticsearchSinkFunction(通过实现接口方式)
    val elasticsearchSinkFunction: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] {
        override def process(sensorReading : SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
            // 包装写入es的数据
            val dataSource: util.HashMap[String, String] = new util.HashMap[String, String]()
            dataSource.put("sensor_id", sensorReading.id)
            dataSource.put("temp", sensorReading.temperature.toString)
            dataSource.put("ts", sensorReading.timestamp.toString)
            // 创建一个IndexRequest(其中包含index,type,source数据)
            val indexRequest: IndexRequest = Requests.indexRequest()
                .index("sensor_temp")
                .opType("readingData")
                .source(dataSource)
            // 用RequestIndexer将包装好的 IndexRequest 数据发送到es(通过http)
            requestIndexer.add(indexRequest)
            println(sensorStream + " saved successfully")
        }
    }
    // 3.3、通过httpHosts和 ElasticsearchSinkFunction构建 ElasticsearchSink,并使用流进行sink添加
    sensorStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts, elasticsearchSinkFunction).build())
    
    // 4、JDBC Sink
    // 4.1、因为没有专门的JDBC Sink,使用直接实现一个RichSinkFunction,将JDBC操作
    val jdbcSinkFunction: RichSinkFunction[SensorReading] = new RichSinkFunction[SensorReading] {
    
        /**
         * 定义连接 和 预编译 语句(这些信息需要全局调用,并在open中初始化,close中关闭)
         */
        var conn: Connection = _
        var insertStmt: PreparedStatement = _
        var updateStmt: PreparedStatement = _
    
        /**
         * 在open生命周期方法中创建连接以及预编译语句
         * @param parameters 配置信息
         */
        override def open(parameters: Configuration): Unit = {
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
            insertStmt = conn.prepareStatement("insert into temp (sensor, temperature) values (?,?)")
            updateStmt = conn.prepareStatement("update temp set temperature = ? where sensor = ?")
        }
    
        /**
         * 流中每进来一条数据,会调用一次此方法
         * @param value 流中进入的数据
         * @param context 环境上下文
         */
        override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
            // 执行更新语句
            updateStmt.setDouble(1, value.temperature)
            updateStmt.setString(2, value.id)
            updateStmt.execute()
            // 如果刚才没有更新数据,那么执行插入操作
            if( updateStmt.getUpdateCount == 0 ){
                insertStmt.setString(1, value.id)
                insertStmt.setDouble(2, value.temperature)
                insertStmt.execute()
            }
        }
    
        /**
         * 关闭资源
         */
        override def close(): Unit = {
            insertStmt.close()
            updateStmt.close()
            conn.close()
        }
    }
    // 4.2、使用流进行sink添加
    sensorStream.addSink(jdbcSinkFunction)
    
    
    // 启动执行环境,执行任务
    env.execute("SinkDemo")
    你现在所遭遇的每一个不幸,都来自一个不肯努力的曾经
  • 相关阅读:
    [转]使用RenderQueueListener针对不同的渲染组改变摄像机的裁剪面
    [转]Calculating Stereo Pairs
    [转]Ogre如何在渲染时切换指定物体的材质技术
    <转>C#操作word
    SQL常用语句二
    SQL高级查询
    C# word打印文档打印不全(数据为完全发送至打印机,程序已经退出)
    access事务插入多条记录
    C#操作word的一些基本方法(word打印,插入文件,插入图片,定位页眉页脚,去掉横线)
    Sqlserver 存储过程中结合事务的代码
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133300.html
Copyright © 2011-2022 走看看