zk kafka mariadb scala flink integration
I do not want to write this paper in the beginning , put the codes onto github.com/git.jd.com, while there some errors since moved to jdd(jd finance) this month .
so in order to put the code in somewhere ,I started this paper .
Here is the summary of this parper
1.set the zk cluster on windows ,three
2.set the kafka cluster on windwos ,three too
3.create a maven scala project by ide
4.create a flink producer sink data to kafka topic named scalatopic ,where create a source program by extends source api
5.create a flink consumer read data from kafka and sink to mariadb ,where create a sink program by extends sink api
Tips : about how to set zk and kafka cluster on windows ,there is a lot of materiel on internet ,it's a easy job will not show here again
Script 1 : Here is a batch start or one button start for zk and kafka cluster and
code :
d:
cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-1in
start cmd /k "zkServer.cmd"
cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-2in
start cmd /k "zkServer.cmd"
cd D:worksJDDjdd_codingkafka_zk_clusterzk_clusterzookeeper-3.4.12-3in
start cmd /k "zkServer.cmd"
ping -n 15 127.0.0.1>nul
cd D:worksJDDkafka_clusterkafka-0inwindows
start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-0configserver.properties"
cd D:worksJDDkafka_clusterkafka-1inwindows
start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-1configserver.properties"
cd D:worksJDDkafka_clusterkafka-2inwindows
start cmd /k "kafka-server-start.bat D:worksJDDkafka_clusterkafka-2configserver.properties"
Script 2 :KafkaProduce ,in this Script througth flink create a environment
package com.ran.xiao.yun.zkkafflink
import com.ran.xiao.yun.jdd.SimpleStringGenerator
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
//https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector
//https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html
object KafkaProduce {
def main(args: Array[String]): Unit = {
val brokers = "localhost:9092,localhost:9093,localhost:9094"
val topic = "ScalaTopic";
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myProducer = new FlinkKafkaProducer[String](
brokers, // broker list
topic, // target topic
new SimpleStringSchema)
myProducer.setWriteTimestampToKafka(true)
var stream: DataStream[String] = env.addSource(new SimpleStringGenerator()) //define source and generate stream data to kafka
stream.addSink(myProducer) //sink data to kafka
env.execute()
env.clean()
}
}
Script 3:
package com.ran.xiao.yun.jdd
import scala.util.Random
import org.apache.commons.lang3.RandomStringUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.slf4j.{Logger, LoggerFactory}
/**
* 自定义source flink 从mysql 中读取数据 while here used random function , not from log or mysql db
* 其实主要就是一个while循环,然后collect一下,关闭的时候让循环停止就好了。
* 必须重写的方法就是run和cancel,
* open这个方法可重写也可以不重写,如果有一些需要初始化的东西,也可以放到这里面。
*/
class SimpleStringGenerator extends RichParallelSourceFunction[String]{
protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)
private var getRuleDuration : Long = 0
var isRunning = true
override def open(parameters: Configuration): Unit = {
print("starting....")
}
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
logger.info("try to generate some value for example...")
while(isRunning){
var rand = new Random();
var josn=rand.nextInt(10000000)+":"+ RandomStringUtils.randomAlphabetic(3)+":"+rand.nextInt(1000);
ctx.collect("element-" + josn); //why need to collect ,and how the SourceContext works
println(josn)
logger.info("generate data from producer is successful...")
Thread.sleep(6*1000) // one minute is too long for a testing ,set to 6 seconds
}
}
override def cancel(): Unit = {
isRunning = false
}
}
Script 4: define a consuer write data to kafka
package com.ran.xiao.yun.zkkafflink
import java.util.Properties
import com.ran.xiao.yun.jdd.MySQLSink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
//https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-100-connector
//define consumer
object Kafka2MySQL {
def main(args: Array[String]): Unit = {
val properties = new Properties()
import java.io.FileInputStream
val prop = Kafka2MySQL.getClass.getClassLoader.getResourceAsStream("Config.properties")
properties.load(prop)
val kafkas=properties.getProperty("kafka.hosts")
val toppic=properties.getProperty("kafka.topic")
properties.setProperty("bootstrap.servers", kafkas)
properties.setProperty("group.id", "test")
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(7000)
val myConsumer = new FlinkKafkaConsumer[String](toppic, new SimpleStringSchema(), properties) // what's this SimpleStringSchema very important
//myConsumer.setStartFromEarliest() //read data from the earliest
myConsumer.setStartFromLatest()
val stream = env.addSource(myConsumer)//.print() //print collect client ,read data from kafka
stream.print()
stream.addSink(new MySQLSink()) //write data to mysql process
env.execute("starting")
}
}
Script 5:
package com.ran.xiao.yun.jdd
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
/*
flink 自定义sink
invoke,这个方法在每条数据进来的时候都会调用,把写下游系统的逻辑写到这里面就行了;
open方法初始化下游系统接口实例;
cancel换成了close。来关闭下游系统的接口;
*/
class MySQLSink extends RichSinkFunction[String]{ //Tuple3<Integer,String,String> want to pass a tuple to the sink
private var getRuleDuration : Long = 0
var isRunning = true
private var connection: Connection = null
private var ps: PreparedStatement = null
//here we try to connect to local mysql db ,so we do not connect to db each time in invoke function and close
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val driver = "org.mariadb.jdbc.Driver" // there is a error on internet which wirite mariadb to maria
val dburlc = "jdbc:mariadb://localhost:3306/data_platformwarehouse" // jdbc:mysql://localhost:3306/data_platformwarehouse works too
val usrnam = "root"
val passwd = "root"
Class.forName(driver) //here we load the driver
connection =DriverManager.getConnection(dburlc,usrnam,passwd) //create the connection
val sql = "insert into flink_kafka_zk_scala(id,name,age) values(?,?,?)" //generate the sql and pass the parameters from invoke function
ps = connection.prepareStatement(sql)
}
override def invoke(str: String ): Unit = { //type tuple3 take type parameter seem not okay
//data like : element-id:sun:age
val values = str.split("-"){1}.split(":")
ps.setInt(1,Integer.parseInt(values{0}))
ps.setString(2,values{1})
ps.setInt(3,Integer.parseInt(values{2}))
ps.executeUpdate()
}
override def close():Unit={
super.close()
if(connection!=null){
connection.close()
}
if(ps !=null){
ps.close()
}
}
}
Tips for self
the relationship for zk and kafka :
Kafka使用zk的分布式协调服务,将生产者,消费者,消息储存(broker,用于存储信息,消息读写等)结合在一起。同时借助zk,kafka能够将生产者,消费者和broker在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。
1. broker在zk中注册
kafka的每个broker(相当于一个节点,相当于一个机器)在启动时,都会在zk中注册,告诉zk其brokerid,在整个的集群中,broker.id/brokers/ids,当节点失效时,zk就会删除该节点,就很方便的监控整个集群broker的变化,及时调整负载均衡。
2. topic在zk中注册
在kafka中可以定义很多个topic,每个topic又被分为很多个分区。一般情况下,每个分区独立在存在一个broker上,所有的这些topic和broker的对应关系都有zk进行维护
3. consumer(消费者)在zk中注册
3.1 注册新的消费者,当有新的消费者注册到zk中,zk会创建专用的节点来保存相关信息,路径ls /consumers/{group_id}/ [ids,owners,offset],Ids:记录该消费分组有几个正在消费的消费者,Owmners:记录该消费分组消费的topic信息,Offset:记录topic每个分区中的每个offset
3.2 监听消费者分组中消费者的变化 ,监听/consumers/{group_id}/ids的子节点的变化,一旦发现消费者新增或者减少及时调整消费者的负载均衡。
---------------------