zoukankan      html  css  js  c++  java
  • zk kafka mariadb scala flink integration

    zk kafka mariadb scala flink integration

    I do not want to write this paper in the beginning , put the codes onto github.com/git.jd.com, while there some errors since moved to jdd(jd finance) this month .
    so in order to put the code in somewhere ,I started this paper .

    Here is the summary of this parper

    1.set the zk cluster on windows ,three
    2.set the kafka cluster on windwos ,three too
    3.create a maven scala project by ide
    4.create a flink producer sink data to kafka topic named scalatopic  ,where create a source program by extends source api
    5.create a flink consumer read data from kafka and sink to mariadb ,where create a sink program by extends sink api

    Tips : about how to set zk and kafka cluster on windows ,there is a lot of materiel on internet ,it's a easy job will not show here again

    Script 1 : Here is a batch start or one button start for zk and kafka cluster  and
    code :

    d:
    cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-1in
    start cmd /k "zkServer.cmd"
    cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-2in
    start cmd /k "zkServer.cmd"
    cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-3in
    start cmd /k "zkServer.cmd"
    
    ping -n 15 127.0.0.1>nul
    cd D:worksJDDkafka_clusterkafka-0inwindows
    start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-0configserver.properties"
    cd D:worksJDDkafka_clusterkafka-1inwindows
    start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-1configserver.properties"
    cd D:worksJDDkafka_clusterkafka-2inwindows
    start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-2configserver.properties"
    

    Script 2 :KafkaProduce ,in this Script througth flink create a environment

    package com.ran.xiao.yun.zkkafflink
    
    import com.ran.xiao.yun.jdd.SimpleStringGenerator
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.datastream.DataStream
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    
    
    //https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector
    //https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html
    
    object KafkaProduce {
    
      def main(args: Array[String]): Unit = {
        val brokers = "localhost:9092,localhost:9093,localhost:9094"
        val topic = "ScalaTopic";
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
    
        val myProducer = new FlinkKafkaProducer[String](
          brokers,         // broker list
          topic,               // target topic
          new SimpleStringSchema)
    
        myProducer.setWriteTimestampToKafka(true)
        var stream: DataStream[String] = env.addSource(new SimpleStringGenerator())  //define source and generate stream data to kafka
        stream.addSink(myProducer) //sink data to kafka
        env.execute()
        env.clean()
    
      }
    
    }
    

    Script 3:

    package com.ran.xiao.yun.jdd
    
    import scala.util.Random
    import org.apache.commons.lang3.RandomStringUtils
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
    import org.slf4j.{Logger, LoggerFactory}
    
    /**
      * 自定义source flink 从mysql 中读取数据 while here used random function , not from log or mysql db 
      * 其实主要就是一个while循环,然后collect一下,关闭的时候让循环停止就好了。
      * 必须重写的方法就是run和cancel,
      * open这个方法可重写也可以不重写,如果有一些需要初始化的东西,也可以放到这里面。
      */
    class SimpleStringGenerator extends RichParallelSourceFunction[String]{
      protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)
      private var getRuleDuration : Long = 0
      var isRunning = true
    
      override def open(parameters: Configuration): Unit = {
        print("starting....")
      }
    
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        logger.info("try to generate some value for example...")
        while(isRunning){
          var rand = new Random();
          var josn=rand.nextInt(10000000)+":"+ RandomStringUtils.randomAlphabetic(3)+":"+rand.nextInt(1000);
          ctx.collect("element-" + josn);   //why need to collect ,and how the SourceContext works 
          println(josn)
          logger.info("generate data from producer is successful...")
          Thread.sleep(6*1000)   // one minute is too long for a testing ,set to 6 seconds 
        }
      }
    
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    

    Script 4: define a consuer write data to kafka

    package com.ran.xiao.yun.zkkafflink
    
    import java.util.Properties
    
    import com.ran.xiao.yun.jdd.MySQLSink
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    
    
    //https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector
    //define consumer
    
            object Kafka2MySQL {
    
              def main(args: Array[String]): Unit = {
    
                val properties = new Properties()
                import java.io.FileInputStream
                val prop = Kafka2MySQL.getClass.getClassLoader.getResourceAsStream("Config.properties")
                properties.load(prop)
                val kafkas=properties.getProperty("kafka.hosts")
                val toppic=properties.getProperty("kafka.topic")
    
                properties.setProperty("bootstrap.servers", kafkas)
                properties.setProperty("group.id", "test")
    
                val env = StreamExecutionEnvironment.getExecutionEnvironment()
                env.enableCheckpointing(7000)
                val myConsumer  = new FlinkKafkaConsumer[String](toppic, new SimpleStringSchema(), properties)   // what's this SimpleStringSchema very important
                //myConsumer.setStartFromEarliest()  //read data from the earliest
                myConsumer.setStartFromLatest()
                val stream = env.addSource(myConsumer)//.print()  //print collect client ,read data from kafka
                stream.print()
                stream.addSink(new MySQLSink())  //write data to mysql process 
                env.execute("starting")
    
              }
    
            }

    Script 5:

    package com.ran.xiao.yun.jdd
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    /*
    flink 自定义sink
    invoke,这个方法在每条数据进来的时候都会调用,把写下游系统的逻辑写到这里面就行了;
    open方法初始化下游系统接口实例;
    cancel换成了close。来关闭下游系统的接口;
     */
    
    class MySQLSink extends RichSinkFunction[String]{   //Tuple3<Integer,String,String> want to pass a tuple to the sink
    
      private var getRuleDuration : Long = 0
      var isRunning = true
      private var connection: Connection = null
      private var ps: PreparedStatement = null
    
      //here we try to connect to local mysql db ,so we do not connect to db each time in invoke function and close
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val driver = "org.mariadb.jdbc.Driver"   // there is a error on internet which wirite mariadb to maria 
        val dburlc = "jdbc:mariadb://localhost:3306/data_platformwarehouse" // jdbc:mysql://localhost:3306/data_platformwarehouse  works too
        val usrnam = "root"
        val passwd = "root"
    
        Class.forName(driver) //here we load the driver
        connection =DriverManager.getConnection(dburlc,usrnam,passwd) //create the connection
        val sql = "insert into flink_kafka_zk_scala(id,name,age) values(?,?,?)"   //generate the sql and pass the parameters from invoke function
        ps =  connection.prepareStatement(sql)
    
    }
    
      override def invoke(str: String ): Unit = {   //type tuple3 take type parameter seem not okay
        //data like : element-id:sun:age
    
        val values = str.split("-"){1}.split(":")
        ps.setInt(1,Integer.parseInt(values{0}))
        ps.setString(2,values{1})
        ps.setInt(3,Integer.parseInt(values{2}))
        ps.executeUpdate()
    
      }
    
      override def close():Unit={
    
        super.close()
        if(connection!=null){
          connection.close()
        }
        if(ps !=null){
          ps.close()
        }
    
    }
    
    }
    

    Tips for self
    the relationship for zk and kafka :

    Kafka使用zk的分布式协调服务,将生产者,消费者,消息储存(broker,用于存储信息,消息读写等)结合在一起。同时借助zk,kafka能够将生产者,消费者和broker在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。

    1. broker在zk中注册

    kafka的每个broker(相当于一个节点,相当于一个机器)在启动时,都会在zk中注册,告诉zk其brokerid,在整个的集群中,broker.id/brokers/ids,当节点失效时,zk就会删除该节点,就很方便的监控整个集群broker的变化,及时调整负载均衡。

    2. topic在zk中注册

    在kafka中可以定义很多个topic,每个topic又被分为很多个分区。一般情况下,每个分区独立在存在一个broker上,所有的这些topic和broker的对应关系都有zk进行维护

    3. consumer(消费者)在zk中注册

    3.1     注册新的消费者,当有新的消费者注册到zk中,zk会创建专用的节点来保存相关信息,路径ls /consumers/{group_id}/  [ids,owners,offset],Ids:记录该消费分组有几个正在消费的消费者,Owmners:记录该消费分组消费的topic信息,Offset:记录topic每个分区中的每个offset

    3.2     监听消费者分组中消费者的变化 ,监听/consumers/{group_id}/ids的子节点的变化,一旦发现消费者新增或者减少及时调整消费者的负载均衡。
    ---------------------

  • 相关阅读:
    2.pt-table-checksum工具
    Mysql8.0新特性01
    12.redis 之阅读大佬文章随笔
    4.Mysql之Mysqldump命令
    5. 关于高负载服务器Kernel的TCP参数优化
    Mysql Oracle 备份表数据、批量删除表数据
    Mysql limit用法
    Java 字符串数组转字符串
    Layui 自定义年份下拉框并且可输入
    Mysql 生成UUID
  • 原文地址:https://www.cnblogs.com/TendToBigData/p/10501149.html
Copyright © 2011-2022 走看看