记录下和kafka相关的概念原理以及常用工具,文中很多理解参考文末博文、书籍还有前辈。
kafka特性
kafka是由领英(LinkedIn)提供给Apache,是一个消息引擎,现在发展成为还可以进行流处理(kafka streams)的一个平台,以下是它的架构图。
(1)用的是发布-订阅模式,多个消费者都可以消费Message。
(2)单机kafka吞吐量大概是80~100M/s,性能高,其中写性能基于磁盘顺序写,读性能基于零拷贝,此外还有三层网络模型,这三个是保证其高性能的主要原因。
(3)解耦生产者和消费者,只要消息队列不变化,两者不必要关心对方是否有变化,一个发消息,一个消费消息进行处理就可以了。
(4)削峰限流,在高并发场景下,可以在不需要升级硬件资源的情况下,关键组件可使用kafka顶住峰值流量,正常对外提供服务,不会因为一时处理不了大量请求而奔溃。
(5)顺序性保证,加入消息队列的的消息可以在单个分区保持一致的顺序,对于消息处理顺序有要求的场景,它能保证。
(6)当生产消息和消费消息的速度不一致时,kafka可以提供一个缓冲。
(7)容错性,即使消费消息的进程挂掉了,也可以在恢复后继续消费。就算某台部署了kafka的服务器宕机了,也不影响整个kafka集群继续提供服务,因为它有副本(replica)机制,可以实现高可用。
(8)kafka元数据,在0.8版本之前都交给zookeeper管理,1.0x后,每个kafka节点上都会保存一份元数据。
核心概念
kafka的核心名词有很多,参考博文用一个图简单示意一下。
producer
消息的生产者,如flume sink到kafka。
consumer
消费消息的消费者,一般是spark streaming或flink。
consumer group
多个消费者可以放到一个消费组里,如上图,具体体现就是各个消费者部署在不同的服务器上。kafka的消息对于消费组是组间共享的,比如电商网站中,积分组和订单组都可以消费数据,完成业务处理(没干过电商,网上看文章推测的)。但是消息被消费组获取后,只能被其中的一个组员处理,组内是竞争的。
broker
启动kafka server后,它就是一个broker,它有id号,在server.properties里指定的,每个服务器节点上的broker id必须唯一。如上图启动了三个kafka节点,因此就有三个broker。
topic
发送消息时,需要分类管理,类似数据库中的一张表,一张表存一类型的数据,kafka中一个topic也对应一个类型的数据。如系统消息发送一个topic,业务消息发送一个topic。
partition
topic在kafka中是一个逻辑上的概念,真正体现在物理上就是对应一个个的目录,一个topic至少有一个partition,如果创建topic时指定了多个分区,则它们会均匀分布在不同节点上,文件后缀名以数字区分不同的分区。分区后可以整体提高kafka的吞吐量,因为每个分区都可以存储数据,消费数据。
分区内,会有一个log文件,index文件,和timeindex文件。其中kafka中的message就是写入到log中保存的,底层是二进制数据,而index则是log文件的索引文件,通过它可以快速的定位一条日志,而不需要从头遍历log文件查看,细节后续文章总结。时间索引文件便于通过时间范围来查找消息,但是会存在时间错乱的可能,不常用。
[root@hadoop01 /home/software/kafka-2/kafka-logs/topicA-0]# ll
total 0
-rw-r--r--. 1 root root 10485760 Mar 20 20:29 00000000000000000000.index
-rw-r--r--. 1 root root 0 Mar 20 20:29 00000000000000000000.log
-rw-r--r--. 1 root root 10485756 Mar 20 20:29 00000000000000000000.timeindex
replica
上面partition保证了吞吐量,但是高可用HA还没有保证,这就需要副本机制了。kafka中默认指定一个副本,它是以topic为单位进行备份的,上图topicA除了有3个分区外,还有3个副本,而topicB则只有一个副本,从创建topic的命令也可以看出来。kafka中同一个分区的多个replica不会在同一个broker上,这就实现了HA,反之如果能存在一个broker上,那这个broker宕机了那这个分区上的消息也就不能提供服务了。
# topicA 3个replica
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 3 --partitions 3 --topic topicA
Created topic "topicA".
# topicB 1个replica
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 3 --topic topicB
Created topic "topicB".
leader和follower
上图可以看出来,生产者和消费者都只和红色分区通信,生产和消费数据,这个红色分区就是leader分区,而绿色分区就是follower分区,它只从leader分区fetch数据,并返回ack信号。对于topicA来说,每个分区有三个replica,其中一个是leader,另外两个是follower,对于topicB来说,它没有follower。
controller
当启动了kafka集群,zookeeper会在某个节点启动一个controller进程,它是kafka集群的总控组件,如上图所示在broker2上,在zookeeper中会创建对应临时节点/controller。它的作用很多,简单列举一下:
(1)当某个broker宕机,它上面的leader副本就不能继续提供服务,需要从follower里选举有资格(处于isr列表中的replica)的副本顶上去,这就需要controller完成。
(2)上图中每个broker上都有meta元数据(如kafka集群中broker、partition、leader等信息),在kafka1.0.x后,它不再是存储在zookeeper上了,而是会每个broker上都有一份,这也是controller获取后分发下去的。0.8版本之前,需要连接zookeeper,1.0.x后,可以连任意一个broker就可以获取元数据,如下的例子也可以看出端倪。
# 1.0.x 之消费消息
kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic football --from-beginning
# 0.8.x 之消费消息
kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic football --from-beginning
# 都可以消费到
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic football --from-beginning
hello i am clyang
hello
i am sorry
i am not enter into bigdata
[root@hadoop01 /home/software/kafka-2/bin]# sh kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic footballUsing the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
hello i am clyang
hello
i am sorry
i am not enter into bigdata
Zookeeper
(1)zookeeper管理kafka元数据,但是zookeeper并不是为高并发设计的,后面元数据会保存到各个broker一份。
(2)controller的选举,以及controller很多工作,都是在zookeeper的基础上才能完成。如broker的上下线,controller能监听到随后可以rebalance。此外leader分区宕机后,新的leader分区的选举工作,也是controller完成。
KafkaManager
kafkaManager是一款web可视化管理kafka节点的软件,由雅虎开源,可以方便管理整个集群的kafka,需要自行安装。参考文末博文,直接使用博主编译好的kafkaManager,无需安装sbt(也可以安装sbt后自己编译,可以参考网上博文),只需完成以下步骤就可以使用。
(1)修改安装目录/conf下的application.conf,将里面zookeeper的地址,修改为自己的主机地址。
[root@hadoop01 /home/software/kafka-manager-1.3.3.7/conf]# ll
total 24
-rw-r--r--. 1 root root 1381 Jun 18 2017 application.conf
-rw-r--r--. 1 root root 27 Jun 5 2017 consumer.properties
-rw-r--r--. 1 root root 2108 Jun 5 2017 logback.xml
-rw-r--r--. 1 root root 1367 Jun 5 2017 logger.xml
-rw-r--r--. 1 root root 7423 Jun 5 2017 routes
修改地址。
# 配置成自己的zookeeper的值
# kafka-manager.zkhosts="localhost:2181"
kafka-manager.zkhosts="hadoop01:2181,hadoop02:2181,hadoop03:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
(2)启动。
如果发现后台有ProdServerStart进程,代表已启动成功。
# -Dconfig.file 指定配置文件
# -Dhttp.port=9000 指定应用的端口号
# nohup和&代表后台运行,1和2代表标准输出和错误输出结果,都输出到"黑洞"
[root@hadoop01 /home/software/kafka-manager-1.3.3.7]# nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 1>/dev/null 2>/dev/null &
# 启动
[root@hadoop01 /home/software/kafka-manager-1.3.3.7]# jps
2358 Jps
2220 ProdServerStart
(3)启动zookeeper和kafka后,可以登录web查看,在哪台启动kafkamanager,就连哪台服务器的地址。刚开始需要添加一个集群,配置好zookeeper连接的地址和kafka版本就可以使用。
通过web,可以看到整个kafka节点上的topic,其他还有不少界面供查看,整体使用方便,具体略。
以上,理解不一定正确,但学习就是一个不断了解和纠错的过程,记录使人逐渐准确。
参考博文:
(1)《Apache Kafka实战》
(2)https://www.cnblogs.com/youngchaolin/p/12121812.html
(3)http://kafka.apache.org/10/documentation.html
(4)[https://www.cnblogs.com/dadonggg/p/8205302.html kafkaManager安装