课程内容:
1. 简单的操作一下集群
2. 简单的介绍几个工具(企业)
3. Producer的原理(核心,重点)
4. 简单kafka的代码
5. 介绍里面的核心参数(重点)
========================
消费者原理
--replica-factor 2
--partitions 2我们一般设置分区数,建议是节点的倍数
=========================================
Producer的原理
*******************kafka************************************
topic:TopicA
多个分区
p0:leader parititon hdp1
p1:leader partition hdp2
需要把数据发送到leader partition
生产者,生产数据,需要把数据封装成ProducerRecord
①ProducerRecord
②序列化
③partitioner(获取元数据,找到一个broker就可以
知道有多少个分区,并清除哪个是leader partition,并把数据发送到哪)
存入
缓冲区***
④Sender (一个线程)从缓冲区取出消息,封装为一个批次(Batch)
Batch
Batch
Batch
--------------------------------------------
zookeeper
------------------------------------------------
broker(去zookeeper注册,选举controller)
controller(监听zookeeper元数据,动态变化,进行同步)
(hdp1)
去zookeeper同步元数据信息,(并分发给其他节点)
p0
-------------------------------------------------
broker(去zookeeper注册,选举controller)
(hdp2)
p1
---------------------------------------------------
//创建了一个配置文件的对象
Properties props = new Properties();
//这个参数,目的就是为了获取kafka集群的元数据
//我们写一个主机也行,写多个的话,更安全
//使用的是主机名,原因是server.properties里面填进去的是主机名,必须配置hosts文件
props.put("bootstrap.servers","hadoop1:9092,hadoop2:9092,hadoop3:9092");
//设置序列化器==》kafka的数据是用的网路传输的,所以里面都是二进制的数据
//我们发送消息的时候,默认的情况下就是发送一个消息就可以了
//但是你也可以给你的每条消息都指定一个key也是可以的
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
//调优的参数,后面解释
// acks
//-1: 如何判断一条消息发送成功?首先消息要写入leader partition,这些消息还需要被另外的所有的这个分区的副本同步了,才算发送成功
// 1: 发送的消息写到leader partition 就算写入成功,然后服务器端就返回响应就可以了,默认就是这个参数,有可能会丢数据
// 0: 消息主要发送出去了,就认为是成功的(允许丢数据,只是处理一些不重要的日志,不需要得到准确的数据)
// kafka里面的分区是有副本的,比如一个主题TopicA,这个主题有两个parittion,每个partition有三个副本
// p0: leader parittion ,follower parititon,follower partition
// p1: leader partition,follower partition,follower partition
props.put("acks","-1")
//重试次数,网络抖动(5-10次)
props.put("retries",3)
//每隔多久重试一次 2s
props.put("retry.backoff.ms",2000)
//提升消息吞入量
//设置压缩格式,lz4,
props.put("compression.type","lz4")
//适当增大缓冲区大小,32M(基本上这个参数不需要设置)
props.put("buffer.memory",33554432)
//批次大小,默认16k,这里设置32k;设置这个批次的大小 还跟我们的消息的大小有关
//假设一条消息1k==》设置100k
props.put("batch.size",323840)
//比如我们设置的一个批次的大小是32k,但是size没有满,无论如何到了这个时间都要把消息发送出去了
//默认是0,100ms
props.put("linger.ms",100)
//这个值,默认是1M,代表的是,生产者发送消息的时候,最大的一条消息(注意说的不是批次)
// byte,如果消息超过1M,程序会报错,可以设置为10M
props.put("max.request.size",1024*1024*10)
// 消息发送出去后,多久没有响应,默认为超时
// 如果网络不稳定,可以适当增大
props.put("request.timeout.ms",3000)
//创建生产者
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);
//创建消息
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"xoxo","this_is_key","this_is_value")
//发送消息,
//异步发送:性能比较好,也是我们生产里面使用的方式
//同步发送:等到返回响应,发送下一条,性能不好,我们生产里面一般不用
producer.send(record,new Callback(){
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception == null){
sout("success")
} else {
sout("error")
}
}
})
小案例
创建消息在kafka里面,我们发送消息的时候,可以给消息指定key,也可以不指定key跟我们要把找个消息发送到这个主题的哪个分区欧关系比如:
topicA:
p0:leader partition ,follower partition
p1:leader partition ,follower partition
1)不指定key
发送的一条消息,会以轮询的方式,发送到分区里面
message1 p0
message2 p1
message3 p0
message4 p1
2)如果指定key
test_key 取这个key的hash值 数字 3
数字/分区数 取模 3/2
比如分区数2,结果要么是0,要么是1
test_key===> p1
这样子的话,我们可以保证这样的一个事,key相同的消息一定会被发送到同一个分区
class OrderProducer{}
// 创建生产者
public static KafkaProducer<String,String> getProducer(){
Properties props = new Properties();
props.put();
props.put();
props.put();
props.put();
KafkaProducer<String,String> producer = nre KafkaProducer<String,String>(props);
return producer;
}
public static void createRecord(){
JSONObject order = new JSONObject();
order.put("userId","123");
order.put("orderId","123a");
order.put("amount",1000.0);
order.put("operator","pay");
return order;
}
ProducerRecord<String,String> record = new ProducerRecord<>(
"topicA","test_key",order.toString()
);
用哪个字段作为key
orderId/userId作为key
producer.send(record,new Callback(){
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception == null ){
sout("succ")
} else {
mysql,redis备用别的链路
}
}
});
场景:
消费数据,消费了一段时间以后,我程序停了,
下一次启动的时候,我的程序从哪个地方开始消费?
我上一次消费到哪了?
offset
consumer原理:
在kafka里面,kafka是不帮我们维护这个offset的,这个偏移量需要consumer
自己去维护
consumer这,kafka提供了两个参数
props.put("enable.auto.commit","true")是否开启自动提交偏移量
props.put("auto.commit.ineterval.ms","1000")每次自动提交offset的一个时间间隔
比如我们消费者消费的topicA(p0,p1):
consumerA:
topicAp0:10001(offset)
topicAp1:10008(offset)
下一次启动的时候,直接从offset出开始消费
偏移量数据存到哪?
kafka0.8及之前,消费者偏移量的数据是存储到zookeeper里面
问题:n多个topic,n多个partition,造成zookeeper请求过多
kafka0.8以后,offset存到了kafka内部的一个主题里面,__consumer_offset
这个kafka内部的主题,默认是50个分区
kafka 3个节点,
consumer group ,2个消费者
提交偏移量,类似生产者
指定key:groupid+topic+分区编号
消费同一个分区的数据的偏移量会存储到同一个offset partition里面
用了__consumer_offset这种方式保存offset,把压力分散到各个broker上面
-------------------------------------
consumer group:group id
consumer leader
consumer
consumer
1 两个consumer 去消费3个分区
2 三个consumer 去消费个分区
当两个consumer时,去消费3个分区,会进行负载均衡
负载均衡
coordinator(协调)
消费者组从kafka集群选一台作为coordinator服务器
根据group id号,计算出来一个hash值,数值
然后用这个数值对 __consumer_offset分区数取模(默认50),比如:2
拿到2以后,就去集群,看 partition为2的这个分区的leader partition在哪一台服务器上面
那么哪一台服务器就是coordinator服务器
找到coordinator之后
1 所有consumer group下的consumer都去 coordinator这台服务器注册
2 这台coordinator服务器会从这个消费组里面选一个leader consumer(谁先注册谁就是)
同时这个coordinator服务器也会把你要消费topic的消息发送给leader consumer
3 leader consumer 制定分区的消费方案,发送sync group 请求,把分区消费方案发送给coordinator服务器
4 coordinator 下发分区的消费方案,给各个consumer
5 若一个consumer挂掉,重新执行上面步骤
负载均衡策略
1 range策略
0-3分区 consumer1
4-7分区 consumer2
8-12分区 consuemr3
2 round-robin策略(轮询,)
0,3,6分区 consumer1
1,4,7分区 consumer2
2,5,8分区 consumer3
上面两个方案有个问题:
假设consumer1挂了:p0-3分区 分配给consumer2和3
原本在consumer2上的p3 被分配到consumer3上了
3 sticky策略
最新的一个sticky策略,就是说尽可能保证rebalance的时候,让原本属于这个consumer
的分区还是属于他们
然后再把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配策略
rebalance分代机制
***********kafka**********
--------------------------
broker(coordinator)
topicA p0 leader
--------------------------
broker
topicA p1 leader
--------------------------
broker
topicA p2 leader
==================================================================================
consumer核心参数
1 consumer心跳时间,1000,不要设置得太久,不然coordinator服务器不太容易发现你的消费者宕机了
heartbeat.interval.ms 1000
2 session.timeout.ms 1000*10 多久没发送心跳认为超时
3 max.poll.interval.ms 30*1000 每隔多久拉取一次数据
4 max.poll.records 500 每次拉取500条数据,可增大至500*2
5 connection.max.idle.ms -1 当连接空闲的时候,是否回收。-1 不回收
6 enable.auto.commit true
7 auto.commit.interval.ms 1000
8 auto.offset.reset earliest 下一次从哪开始消费
earliest
当各分区已提交offset时,从提交的offset开始消费,无提交offset时,从头开始消费
latest(用的多)
当各个分区下有已提交的offset时,从提交的offset开始消费,无提交offset时,从当前最新的数据开始消费
none
topic各分区都存在已提交的offset时,从offset后开始消费,只要有一个分区不存在已提交的offset,则抛出异常
二分查找
000001111.log 数据文件
message1 offset:1111 position:234
message2 offset:1112 position:1092
message3 offset:1113 position:1872
message4 offset:1114 position:2098
message5 offset:1115 position:2395
message6 offset:1116 position:3011
物理位置
数据中不存这个position,是指磁盘上面的物理位置
稀松索引
000001111.index索引文件(不会每条都记录,每隔一段时间记录一次)很小几k
1111 234
1113 1872
1116 3011
比如:消费1115的偏移量
从index找比1115大和小的文件,再去log文件定位位置
ISR机制
in-sync-replicas
p0:
p0-leader,<= p0-follower1,p0-follower2
ISR:副本跟leader分区保持同步的副本加入ISR
ISR:p0-leader,p0-follower1,p0-follower2
假设:p0-leader挂了,一区两个分区之一成为leader
假设:副本什么情况下会被踢出ISR列表
如果一个follower超过10s没有去跟leader partition去同步数据,那么这个follower就会被
踢出ISR列表
什么样的情况下又可以重新加入?
看数据差异大不大,下节讲
如果同步了,加进去