zoukankan      html  css  js  c++  java
  • zk kafka mariadb scala flink integration

    zk kafka mariadb scala flink integration

    I do not want to write this paper in the beginning , put the codes onto github.com/git.jd.com, while there some errors since moved to jdd(jd finance) this month .
    so in order to put the code in somewhere ,I started this paper .

    Here is the summary of this parper

    1.set the zk cluster on windows ,three
    2.set the kafka cluster on windwos ,three too
    3.create a maven scala project by ide
    4.create a flink producer sink data to kafka topic named scalatopic  ,where create a source program by extends source api
    5.create a flink consumer read data from kafka and sink to mariadb ,where create a sink program by extends sink api

    Tips : about how to set zk and kafka cluster on windows ,there is a lot of materiel on internet ,it's a easy job will not show here again

    Script 1 : Here is a batch start or one button start for zk and kafka cluster  and
    code :

    cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-1in
    start cmd /k "zkServer.cmd"
    cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-2in
    start cmd /k "zkServer.cmd"
    cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-3in
    start cmd /k "zkServer.cmd"
    ping -n 15>nul
    cd D:worksJDDkafka_clusterkafka-0inwindows
    start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-0configserver.properties"
    cd D:worksJDDkafka_clusterkafka-1inwindows
    start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-1configserver.properties"
    cd D:worksJDDkafka_clusterkafka-2inwindows
    start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-2configserver.properties"

    Script 2 :KafkaProduce ,in this Script througth flink create a environment

    package com.ran.xiao.yun.zkkafflink
    import com.ran.xiao.yun.jdd.SimpleStringGenerator
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.datastream.DataStream
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    object KafkaProduce {
      def main(args: Array[String]): Unit = {
        val brokers = "localhost:9092,localhost:9093,localhost:9094"
        val topic = "ScalaTopic";
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        val myProducer = new FlinkKafkaProducer[String](
          brokers,         // broker list
          topic,               // target topic
          new SimpleStringSchema)
        var stream: DataStream[String] = env.addSource(new SimpleStringGenerator())  //define source and generate stream data to kafka
        stream.addSink(myProducer) //sink data to kafka

    Script 3:

    package com.ran.xiao.yun.jdd
    import scala.util.Random
    import org.apache.commons.lang3.RandomStringUtils
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
    import org.slf4j.{Logger, LoggerFactory}
      * 自定义source flink 从mysql 中读取数据 while here used random function , not from log or mysql db 
      * 其实主要就是一个while循环,然后collect一下,关闭的时候让循环停止就好了。
      * 必须重写的方法就是run和cancel,
      * open这个方法可重写也可以不重写,如果有一些需要初始化的东西,也可以放到这里面。
    class SimpleStringGenerator extends RichParallelSourceFunction[String]{
      protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)
      private var getRuleDuration : Long = 0
      var isRunning = true
      override def open(parameters: Configuration): Unit = {
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        logger.info("try to generate some value for example...")
          var rand = new Random();
          var josn=rand.nextInt(10000000)+":"+ RandomStringUtils.randomAlphabetic(3)+":"+rand.nextInt(1000);
          ctx.collect("element-" + josn);   //why need to collect ,and how the SourceContext works 
          logger.info("generate data from producer is successful...")
          Thread.sleep(6*1000)   // one minute is too long for a testing ,set to 6 seconds 
      override def cancel(): Unit = {
        isRunning = false

    Script 4: define a consuer write data to kafka

    package com.ran.xiao.yun.zkkafflink
    import java.util.Properties
    import com.ran.xiao.yun.jdd.MySQLSink
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    //define consumer
            object Kafka2MySQL {
              def main(args: Array[String]): Unit = {
                val properties = new Properties()
                import java.io.FileInputStream
                val prop = Kafka2MySQL.getClass.getClassLoader.getResourceAsStream("Config.properties")
                val kafkas=properties.getProperty("kafka.hosts")
                val toppic=properties.getProperty("kafka.topic")
                properties.setProperty("bootstrap.servers", kafkas)
                properties.setProperty("group.id", "test")
                val env = StreamExecutionEnvironment.getExecutionEnvironment()
                val myConsumer  = new FlinkKafkaConsumer[String](toppic, new SimpleStringSchema(), properties)   // what's this SimpleStringSchema very important
                //myConsumer.setStartFromEarliest()  //read data from the earliest
                val stream = env.addSource(myConsumer)//.print()  //print collect client ,read data from kafka
                stream.addSink(new MySQLSink())  //write data to mysql process 

    Script 5:

    package com.ran.xiao.yun.jdd
    import java.sql.{Connection, DriverManager, PreparedStatement}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    flink 自定义sink
    class MySQLSink extends RichSinkFunction[String]{   //Tuple3<Integer,String,String> want to pass a tuple to the sink
      private var getRuleDuration : Long = 0
      var isRunning = true
      private var connection: Connection = null
      private var ps: PreparedStatement = null
      //here we try to connect to local mysql db ,so we do not connect to db each time in invoke function and close
      override def open(parameters: Configuration): Unit = {
        val driver = "org.mariadb.jdbc.Driver"   // there is a error on internet which wirite mariadb to maria 
        val dburlc = "jdbc:mariadb://localhost:3306/data_platformwarehouse" // jdbc:mysql://localhost:3306/data_platformwarehouse  works too
        val usrnam = "root"
        val passwd = "root"
        Class.forName(driver) //here we load the driver
        connection =DriverManager.getConnection(dburlc,usrnam,passwd) //create the connection
        val sql = "insert into flink_kafka_zk_scala(id,name,age) values(?,?,?)"   //generate the sql and pass the parameters from invoke function
        ps =  connection.prepareStatement(sql)
      override def invoke(str: String ): Unit = {   //type tuple3 take type parameter seem not okay
        //data like : element-id:sun:age
        val values = str.split("-"){1}.split(":")
      override def close():Unit={
        if(ps !=null){

    Tips for self
    the relationship for zk and kafka :


    1. broker在zk中注册


    2. topic在zk中注册


    3. consumer(消费者)在zk中注册

    3.1     注册新的消费者,当有新的消费者注册到zk中,zk会创建专用的节点来保存相关信息,路径ls /consumers/{group_id}/  [ids,owners,offset],Ids:记录该消费分组有几个正在消费的消费者,Owmners:记录该消费分组消费的topic信息,Offset:记录topic每个分区中的每个offset

    3.2     监听消费者分组中消费者的变化 ,监听/consumers/{group_id}/ids的子节点的变化,一旦发现消费者新增或者减少及时调整消费者的负载均衡。

  • 相关阅读:
    [导入]FMS 中文帮助 (下载)
    [导入][Flash开发笔记] 系列
    [导入]mootools框架【三】Array篇: 方法完全解析
    [导入]mootools框架【七】Common篇: mootools的构造应用的基础设施类Common.js
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501149.html
Copyright © 2011-2022 走看看