Kafka 被称为下一代分布式消息系统,由 scala 和 Java 编写,是非营利性组织
ASF(Apache Software Foundation,简称为 ASF)基金会中的一个开源项目,比如
HTTP Server、Hadoop、ActiveMQ、Tomcat 等开源软件都属于 Apache 基金会的开
源软件,类似的消息系统还有 RbbitMQ、ActiveMQ、ZeroMQ。
Kafka®用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,
快速快速性,可在数千家公司中投入生产。
常用消息队列对比:
kafka 最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩
容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。
kafka 优势:
kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 的
消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息。
支持通过 Kafka 服务器分区消息。
支持 Hadoop 并行数据加载。
O(1)就是最低的时空复杂度了,也就是耗时/耗空间与输入数据大小无关,无论
输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的 O(1)时间复杂
度,无论数据规模多大,都可以在一次计算后找到目标
kafka 角色:
Broker:
Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
Topic :
每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic,(物
理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic 的消息虽然保
存于一个或多个 broker 上但用户只需指定消息的 topic 即可生产或消费数据而不
必关心数据存于何处),topic 在逻辑上对 record(记录、日志)进行分组保存,消
费者需要订阅相应的 topic 才能消费 topic 中的消息。
Partition :
是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时
可指定 parition 数量,每个 partition 对应于一个文件夹,该文件夹下存储该
partition 的数据和索引文件,为了实现实现数据的高可用,比如将分区 0 的数据
分散到不同的 kafka 节点,每一个分区都有一个 broker 作为 leader 和一个 broker
作为 Follower。
分区的优势(分区因子为 3):
一:实现存储空间的横向扩容,即将多个 kafka 服务器的空间结合利用
二:提升性能,多服务器读写
三:实现高可用,分区 leader 分布在不同的 kafka 服务器,比如分区 0 的 leader
为服务器 A,则服务器 B 和服务器 C 为 A 的 follower,而分区 1 的 leader 为服务
器 B,则服务器 A 和 C 为服务器 B 的 follower,而分区 2 的 leader 为 C,则服务
器 A 和 B 为 C 的 follower。
Producer:负责发布消息到 Kafka broker。
Consumer:消费消息,每个 consumer 属于一个特定的 consuer group(可为每个
consumer 指定 group name,若不指定 group name 则属于默认的 group),使用
consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group
内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。
kafka 部署:(基于zookeeper)
## 官方教程
https://kafka.apache.org/quickstart
# 下载
https://kafka.apache.org/downloads
部署三台服务器的高可用 kafka 环境。
部署环境:
Server1:172.31.2.41
Server2:172.31.2.42
Server3:172.31.2.43
三台都是一样的操作
下载
[root@mq1 src]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
创建目录
[root@mq1 src]# mkdir -p /apps
[root@mq1 src]# mkdir -p /data/kafka
[root@mq2 src]# mkdir -p /apps
[root@mq2 src]# mkdir -p /data/kafka
[root@mq3 src]# mkdir -p /apps
[root@mq3 src]# mkdir -p /data/kafka
解压
[root@mq1 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
[root@mq2 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
[root@mq3 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
做成软链接
[root@mq1 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
[root@mq2 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
[root@mq3 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
修改配置
[root@mq1 ~]# vim /apps/kafka/config/server.properties
broker.id=41 # 唯一的
listeners=PLAINTEXT://172.31.2.41:9092 # 本机的
# 调优
num.network.threads=8
num.io.threads=12
# 数据目录
log.dirs=/data/kafka
# zookeeper 集群
zookeeper.connect=172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181
以守护进程启动
[root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
[root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
[root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties
查看端口9092
[root@mq1 ~]# ss -tanl
LISTEN 0 50 [::ffff:172.31.2.41]:9092 *:*
验证 zookeeper 中 kafka 元数据:
1、Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信
息都会写入 Zookeeper 的 ZNode 节点中;
2、Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,
会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费;
ps:kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0,9 以后 offset
存储在本地。
3、Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个 Leader,
这个是依托于 Zookeeper 的选举机制实现的;
测试 kafka 读写数据:
创建 topic:
创建名为 logstashtest,partitions(分区)为 3,replication(每个分区的副本数/每个
分区的分区因子)为 3 的 topic(主题):
在任意 kafaka 服务器操作:
[root@mq1 kafka]# bin/kafka-topics.sh --create --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --partitions 3 --replication-factor 3 --topic m66
Created topic m66.
验证 topic:
状态说明:logstashtest 有三个分区分别为 0、1、2,分区 0 的 leader 是 3(broker.id),
分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。
[root@mq1 kafka]# bin/kafka-topics.sh --describe --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m66
Topic: m66 TopicId: NIgq-gm3SFaxsTpvwCoshA PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: m66 Partition: 0 Leader: 43 Replicas: 43,41,42 Isr: 43,41,42
Topic: m66 Partition: 1 Leader: 41 Replicas: 41,42,43 Isr: 41,42,43
Topic: m66 Partition: 2 Leader: 42 Replicas: 42,43,41 Isr: 42,43,41
获取所有 topic:
[root@mq1 kafka]# bin/kafka-topics.sh --list --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 m66
m66
测试发送消息:
[root@mq1 kafka]# bin/kafka-console-producer.sh --broker-list 172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092 --topic m66
>msg1
>msg2
>mgs3
测试获取消息:
可以到任意一台 kafka 服务器测试消息获取,只要有相应的消息获取客户端即可。
[root@mq2 ~]# /apps/kafka/bin/kafka-console-consumer.sh --topic m66 --bootstrap-server 172.31.2.42:9092 --from-beginning msg1
msg1
msg2
mgs3
删除 topic:(必须有的才能删除,没有就会报错)
[root@mq3 src]# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m44
Topic m44 is marked for deletion.
范例:
[root@mq3 src]# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m10
Error while executing topic command : Topic 'm10' does not exist as expected
[2021-08-17 06:30:06,677] ERROR java.lang.IllegalArgumentException: Topic 'm10' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:542)
at kafka.admin.TopicCommand$ZookeeperTopicService.deleteTopic(TopicCommand.scala:500)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)