zoukankan      html  css  js  c++  java
  • Flink-Sink(四)

    Redis-Sink
    使用Flink内嵌 RedisSink

    <dependency> 
        <groupId>org.apache.bahir</groupId> 
        <artifactId>flink-connector-redis_2.11</artifactId> 
        <version>1.0</version> 
    </dependency>
    
    
    package com.text.sink
    
    import java.net.InetSocketAddress
    import java.util
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
    
    object RedisSinkDemo {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("ke01", 8899)
        val result = stream.flatMap(_.split(" "))
          .map((_, 1)).keyBy(0).sum(1)
        result.print().setParallelism(1);
        // redis是单机
        val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("192.168.75.91").setPort(6390).setPassword("aa123456").build()
    
        result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
          override def getCommandDescription: RedisCommandDescription = {
            // 选择redis存储方式
            new RedisCommandDescription(RedisCommand.HSET, "wc")
          }
    
          override def getKeyFromData(t: (String, Int)): String = {
            println(t._1)
            t._1
          }
    
          override def getValueFromData(t: (String, Int)): String = {
            println(t._2)
            t._2.toString
          }
        }))
        env.execute()
      }
    
    }

    Kafka-Sink

    使用Fink内嵌:FlinkKafkaProducer

    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-connector-kafka_2.11</artifactId>         
        <version>1.9.2</version> 
    </dependency>
    
    
    package com.text.sink
    
    import java.lang
    import java.util.Properties
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
    import org.apache.kafka.clients.producer.ProducerRecord
    
    object KafkaSinkDemo {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("ke01", 8890)
        val result = stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
        val props = new Properties()
        props.setProperty("bootstrap.servers", "ke01:9092,ke02:9092,ke03:9092")
        // 因为下面序列化了,所以这里没必要序列化
        result.addSink(new FlinkKafkaProducer[(String, Int)]("wc", new KafkaSerializationSchema[(String, Int)] {
          override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
            new ProducerRecord("wc", element._1.getBytes(), (element._2.toString).getBytes())
          }
        }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
        env.execute()
      }
    }

    Mysql-Sink

    1.Fink不支持mysql内嵌,所以自定义:RichSinkFunction

    2. MySQL需要幂等性,就是没有数据的时候可正常插入,有数据的时候正常累加

    <dependency> 
        <groupId>mysql</groupId> 
        <artifactId>mysql-connector-java</artifactId>     
        <version>5.1.44</version> 
    </dependency>
    
    package com.text.sink
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    import java.util.Properties
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    case class CarInfo(monitorId: String, carId: String, eventTime: String, speed: Long)
    
    object MysqlSinkDemo {
    
      def main(args: Array[String]): Unit = {
    
        //需要保证幂等性,幂等性:就是多次写入,如何保证数据准确
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val props = new Properties()
        props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092")
        props.setProperty("group.id", "group_id")
        props.setProperty("key.deserializer", classOf[StringSerializer].getName)
        props.setProperty("value.deserializer", classOf[StringSerializer].getName)
    
        val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
          //什么时候停止,停止条件是什么
          override def isEndOfStream(nextElement: (String, String)): Boolean = false
    
          //要进行序列化的字节流
          override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
            val key = new String(consumerRecord.key(), "UTF-8")
            val value = new String(consumerRecord.value(), "UTF-8")
            (key, value)
          }
    
          override def getProducedType: TypeInformation[(String, String)] = {
            createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
          }
        }, props))
    
        stream.map(data => {
          val value = data._2
          val splits = value.split("	")
          val monitorId = splits(0)
          (monitorId, 1)
        }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] {
          // /value1:上次聚合完的结果 value2:当前的数据
          override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
            (value1._1, value1._2 + value2._2)
          }
        }).addSink(new MySQLCustomSink)
        env.execute()
      }
    
    }
    
    class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
      var conn: Connection = _
      var insertPst: PreparedStatement = _
      var updatePst: PreparedStatement = _
    
      override def open(parameters: Configuration): Unit = {
        conn = DriverManager.getConnection("jdbc:mysql://ke01:3306/test", "root", "aa123456")
        insertPst = conn.prepareStatement("insert into car_flow(monitorId, count) values(?, ?)")
        updatePst = conn.prepareStatement("update car_flow set count = ? where monitorId = ?")
      }
    
      override def close(): Unit = {
        insertPst.close()
        updatePst.close()
        conn.close()
      }
    
      override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
        println(value)
        updatePst.setInt(1, value._2)
        updatePst.setString(2, value._1)
        updatePst.execute()
        println(updatePst.getUpdateCount)
        if (updatePst.getUpdateCount == 0) {
          println("insert")
          insertPst.setString(1, value._1)
          insertPst.setInt(2, value._2)
          insertPst.execute()
        }
    
      }
    }

    Socket-Sink

    自定义

    package com.text.sink
    
    import java.io.PrintStream
    import java.net.{InetAddress, Socket}
    import java.util.Properties
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringSerializer
    
    object SocketSink {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val props = new Properties()
        props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092")
        props.setProperty("group.id", "flink-kafka-002")
        props.setProperty("key.deserializer", classOf[StringSerializer].getName)
        props.setProperty("value.deserializer", classOf[StringSerializer].getName)
    
        val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
          // 什么时候停止,停止条件是什么
          override def isEndOfStream(nextElement: (String, String)): Boolean = false
    
          // 要进行序列化的字节流
          override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
            val key = new String(consumerRecord.key(), "UTF-8")
            val value = new String(consumerRecord.value(), "UTF-8")
            (key, value)
          }
    
          // 指定一下返回的数据类型 flink提供的类型
          override def getProducedType: TypeInformation[(String, String)] = {
            createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
          }
        }, props))
    
        stream.map(data => {
          val value = data._2
          val splits = value.split("	")
          val monitorId = splits(0)
          (monitorId, 1)
        }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] {
          override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
            (value1._1, value1._2 + value2._2)
          }
        }).addSink(new SocketCustomSink("ke01", 8891))
        env.execute()
      }
    }
    
    class SocketCustomSink(host: String, port: Int) extends RichSinkFunction[(String, Int)] {
      var socket: Socket = _
      var writer: PrintStream = _
    
      override def open(parameters: Configuration): Unit = {
        socket = new Socket(InetAddress.getByName(host), port)
        writer = new PrintStream(socket.getOutputStream)
      }
    
      override def close(): Unit = {
        socket.close()
        writer.close()
      }
    
      override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
        writer.println(value._1 + "	" + value._2)
        writer.flush()
      }
    }

    File-Sink

    支持分桶写入,每一个桶就是一个目录,默认每隔一个小时会产生一个分桶,每个桶下面会存储每一个
    Thread的处理结果,可以设置一些文件滚动的策略(文件打开、文件大小等),防止出现大量的小文件
    
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-filesystem_2.11</artifactId>
      <version>1.9.2</version>
    </dependency>


    package com.text.sink import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer
    object FileSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092") props.setProperty("group.id", "group_id_03") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(nextElement: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) val restStream = stream.map(data => { val value = data._2 val splits = value.split(" ") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value2._2 + value1._2) } }).map(x => x._1 + " " + x._2) val rolling:DefaultRollingPolicy[String,String] = DefaultRollingPolicy.create() //当文件大小超过256 则滚动产生一个小文件 .withMaxPartSize(256*1024*1024) //当文件超过2s没有写入新数据,则滚动产生一个小文件 .withInactivityInterval(2000) //文件打开时间超过2s 则滚动产生一个小文件 每隔2s产生一个小文件 .withRolloverInterval(2000) .build() val sink = StreamingFileSink.forRowFormat(new Path("D:\code\scala\test\test07\data\tmp"), new SimpleStringEncoder[String]("UTF-8")) .withBucketCheckInterval(1000) .withRollingPolicy(rolling) .build() restStream.addSink(sink) env.execute() } }

    HBase-Sink

    计算结果写入sink 两种实现方式:
    1. map算子写入 频繁创建hbase连接
    2. process写入 适合批量写入hbase
    
    
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.3.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-common</artifactId>
      <version>1.3.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>1.3.3</version>
    </dependency>
    
    
    package com.text.sink
    import java.util.{Date, Properties} import akka.remote.serialization.StringSerializer import com.text.utils.DataUtils import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.util.Bytes
    object HBaseSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") props.setProperty("group.id", "flink-kafka-003") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props)) stream.map(row => { val arr = row.split(" ") (arr(0), 1) }).keyBy(_._1) .reduce((v1: (String, Int), v2: (String, Int)) => { (v1._1, v1._2 + v2._2) }).process(new ProcessFunction[(String, Int), (String, Int)] { var htab: HTable = _ override def open(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "ke02:2181,ke03:2181,ke04:2181") val hbaseName = "car_flow" htab = new HTable(conf, hbaseName) } override def close(): Unit = { htab.close() } override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { val min = DataUtils.getDateStrByDate(new Date()) val put = new Put(Bytes.toBytes(value._1)) put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2)) htab.put(put) } }) env.execute() } }
  • 相关阅读:
    传输问题
    修改对象目录
    传输与系统单点登录问题
    两个小错误
    BW数据库优化过程记录20100529
    SAP ABAP 到外部数据库Oracle问题
    固定资产传输问题
    软件外包的商业模式和软件人员的职业规划
    做有意义的事,继续研究FarMap
    FarMap诞生了!
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14894724.html
Copyright © 2011-2022 走看看