官方文档:http://kafka.apache.org/documentation/
简介
多租户
保证:
同一个partition内的顺序性;
consumer能够按序看到日志文件中的记录;
对于副本因子为N的topic,即使N-1个服务器宕机,已经提交到日志的记录能够不被丢失。
用作消息系统:
简化了传统消息系统的两种概念:queuing publish-subscribe
将topic中的每一个partition分配给组里的一个consumer,能够保证同一个partition中的消息被顺序消费。
用作存储系统:
只有数据被完全备份并且保证已经持久化了,数据的写入才被认为成功。
流处理:
用例
Messages
better throughput, built-in partitioning, replication, and fault-tolerance,其他消息队列:
Website Activity Tracking
日志聚合
流处理
其他流处理平台:Storm,Samza
Metrics
Commit Log
配置
broker
topic
producer
consumer
connect
设计
持久化
pagecache-centric design
常量的时间复杂度
性能
Producer
LB:
producer直接将数据发送给对应的partition;
所有的kafka服务器可以在给定的时间内回应哪一个服务器是存储的并且topic的partition leader在哪里;
客户端控制将数据写入到哪一个partition;kafka留了一个指定key的接口,kafka将对该key作hash(hash函数也是可以自定义的)从而确定partition。
异步发送:
批量发送可以配置为不超过固定数量的message或者等到不超过一些固定的延迟限制(比如说64k,10ms);
这种缓冲是可以配置的,并且提供了一种来通过少量的延迟来提高吞吐量的机制。
Consumer
push vs pull
kafka采用的是传统的数据由producer推送给broker,然后由consumer从broker拉去的机制;Scribe及flume采用的pushed机制,这样consumer就比较难处理信息,因为它无法控制broker向他推送数据的频率,相反kafka在这方面就显得更加可控些;
拉式系统有一个问题是如果broker没有数据,consumer会一直空转忙等待至有数据到达;
consumer position
常规的消息系统是由broker记录哪些消息被消费,确定哪些消息被消费后,将之删除这能使数据量变小,但会带来一些其他问题:如果消息被发送后没有被正确消费,一直收不到消费成功的确认等,给每条数据记录状态的性能问题等等;
kafka将topic分为多个有序的partition,对于每一个partition而言,consumer的position仅仅是一个整数;另外,消息可以被重复消费。
离线数据加载
消息传送语义
从producer的角度而言:0.11之后,kafka提供了一种幂等的机制,能够保证重发不会在log中产生重复的项,因为broker给每个producer分配了一个ID并且删除使用已被发送消息的序列码的消息;在这个版本后,kafka还支持向多个topic partitions发送消息的事务语义。
从consumer的角度而言:为了保证"exactly once",我们可以在一个事务中处理数据并将offset写入到topic中;事务默认的隔离级别是未提交读。
kafka默认支持at-least-once发送,并且允许用户通过关闭重试机制实现at-most-once发送以及在处理一批数据之前提交offset。
复制
所有的读写都是由leader partition实现。
kafka对节点存活的定义:
1. 节点维持着与ZK的session(依据ZK的心跳机制);
2. 从节点必须复制主节点上发生的写并且不能落后太多;
leader跟踪从节点的列表,并且发现挂了、出错或者落后的节点后将至从列表中移除。对于出错或者落后的节点的配置在replica.lag.time.max.ms中;
kafka不处理拜占庭一类的问题:比如恶意或者随意的回复。
只有已经提交的消息才能被consumer读到。
producer可以选择是否等待消息被提交,这取决于在时延及持久性之间作权衡,这由相关的producer的ack配置控制。
Replicated Logs:Quorums,ISRs,and State Machines
kafka的选举不是多数决,而是为能够追上leader的副本动态维护一个ISR(In-sync replicas),只有这里的成员能够参与leader选举。写入数据至partition时,只有in-sync的副本都接受到了,这些消息才会确认。ISR集合放在ZK中。
所有ISR都挂了
两种办法:
等待ISR中的副本出现;(0.11后默认,可设置unclean.leader.election.enable改变)
选择第一个出现的副本;
可用性及持久性保证
在多少副本写入成功才认为消息已提交:0,1,-1(all),注意:all只是保证所有处于正在in-sync的副本成功。
关于可用性及持久性,有两个更高级的配置:
Disable unclean leader election;
指定最小ISR大小:不过只有当ack为all时才能起效。
副本管理
partition的分配:
leader的选择:
日志清理
保证
1. min.compaction.lag.ms可以保证消息写入后不被清理的最短时间;
2. 不改变顺序
3. 不改变offset
4. delete.retention.ms
细节
日志清理有一个log cleaner后台线程池执行。
配置log cleaner
log.cleanup.policy=compact
log.cleaner.min.compaction.lag.ms 消息在被清理前的最小保留时间
Quotas
Quotas的配置:
超过配额了怎么办:
操作
基本操作
topic相关
由于文件夹名大小限制255个字符,所以topic的名字长度有限制;
kafka可以增加partition的数目,但是不会改变已有数据的partition归属,同时也不支持减小partition的数目。
优雅关机
1. 同步日志到磁盘,避免重启时作日志恢复;这一点在非hard kill情况下是默认执行的;
2. 关机前迁移所拥有的leader partition,需设置controlled.shutdown.enable=true。
balanced leadership
执行
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
或者配置 auto.leader.rebalance.enable=true
跨机架平衡副本
broker.rack=my-rack-id可以设置broker所属的机架,一个partition将分布于min{racks,replication-factor}个机架中。
跨集群镜像数据
source以及destination集群的partition数量及offset都可以不一样。
kafka-mirror-maker.sh命令,--whitelist指定topic,值需要在引号中,可以是一个正则表达式。
auto.create.topics.enable=true配置可以让集群实现自动创建或者备份数据。
扩容--关键是如何重分配partition
kafka-reassign-partitions.sh
缩容
提升备份因子
限制数据迁移过程中的网络带宽
设置Quota
数据中心
横跨多个数据中心的情形,kakfa更推荐使用镜像集群的方式。
不推荐部署一个跨越多个数据中心的集群,因为会增加分片之间同步的延时,网络不可用时,要么kafka要么ZK不可用。
Kafka配置
producer的关键配置包括:
- acks
- compression
- batch size
consumer的关键配置是fetch size
依赖的Java版本
硬件及OS
监测
kafka服务器使用Yammer Metrics ,Kafka客户端使用内置的Kafka Metrics。这两者都拓展自JMX
依赖的ZooKeeper的配置
典型的ZK服务包含5或7个节点
使ZK隔离运行
为ZK分配足够的Java堆空间
需要思考的问题:
1. subscribe与assign的区别
2. enable.auto.commit
3. 同步发送及异步发送的原理 Batch RecrodAccumulator
4. 如何进行重复消费
5. 如何做到生产者负载均衡(四层负载均衡以及ZooKeeper负载均衡,它们的具体实现)
6. 消费者又是如何做到负载均衡的