1、 基础知识
有关RabbitMQ,RocketMQ,Kafka的区别这个网上很多,了解一下区别性能,分清什么场景使用。分布式环境下的消息中间件Kafka做的比较不错,在分布式环境下使用频繁,我也不免其俗钻研一下Kafka的使用。
任何消息队列都遵循AMQP协议,AMQP协议(Advanced Message Queuing Protocol,高级消息队列协议)
AMQP是一个标准开放的应用层的消息中间件(Message Oriented Middleware)协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好,任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互,可以很容易做到跨语言,跨平台。
Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。
我们先看一些基本的概念:
- 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
- 生产者:(Producer) :向broker发布消息的应用程序
- AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于Kafka一个broker就是一个应用程序的实例
- 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
- 分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是Kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
Kafka将消息以topic为单位进行归纳,每个broker其实就是一个应用服务器,一个broker中会有很多的topic,每个topic其实就是不同的服务需要消息的消息的聚集地。因为每个topic其实会很大,所以就出现了partition个概念,将每个topic的消息分区存储。
Kafka中的消费者有一个分组的概念,每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费(而不是该group下的所有consumer,一定要注意这点)
- 如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
- 如果所有的consumer都具有不同的group,那这就是”发布-订阅”;消息将会广播给所有的消费者.
在Kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个”订阅”者,一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.
分布式环境中,Kafka默认使用zookeeper作为注册中心,Kafka集群几乎不维护任何consumer和producer的信息状态,这些信息都由zookeeper保存,所以consumer和producer非常的轻量级,随时注册和离开都不会对Kafka造成震荡。
producer和consumer通过zookeeper去发现topic,并且通过zookeeper来协调生产和消费的过程。
producer、consumer和broker均采用TCP连接,通信基于NIO实现。Producer和consumer能自动检测broker的增加和减少。
上面图中没有说明partition的组成,partition物理上由多个segment组成,每一个segment 数据文件都有一个索引文件对应。每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.
相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。
在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。
Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。
1.1、 message 被分配到 partition 的过程
每一条消息被发送到broker时,会根据paritition规则(有两种基本的策略,一是采用Key Hash算法,一是采用Round Robin算法)选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。
在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition.class这一参数来指定,该class必须实现Kafka.producer.Partitioner接口。
1.2、 segment文件存储结构
segment file由2大部分组成,分别为index file和data file,这两个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
文件类似于下面这种形式:
0000000000000000001.index
0000000000000000001.log
0000000000000036581.index
0000000000000036581.log
0000000000000061905.index
0000000000000061905.log
index和data-file的对应关系如下:
index file 存储索引文件,文件中的元数据指向对应数据文件中message的物理偏移地址。
2、 Kafka单机环境搭建
下载Kafka,解压缩
配置环境变量:
export Kafka_HOME=/usr/local/Kafka
export PATH=$PATH:$Kafka_HOME/bin
重启生效
source /etc/profile
Kafka用到了zeekeeper,所以需要先启动zookeeper,没有安装的需要先安装zk,安装好了以后我们可以启动,我们先来实现单机版的Kafka,先启动一个单单例的zk服务,可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
# bin/zookeeper-server-start.sh config/zookeeper.properties &
再启动Kafka:
# bin/Kafka-server-start.sh config/server.properties
创建topic:
# bin/Kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建producer,可以在控制台手动输入消息:
# bin/Kafka-console-producer.sh --broker-list localhost:9092 --topic test
this is a message
ctrl+c 可以退出发送。
创建consumer:
# bin/Kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is a message
会收到刚才的发送的消息
我们的一个简单的单机环境就搭建好了。