初识中间件Kafka
Author:SimplelWu
什么是消息中间件?
- 非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件
- 关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统。
什么是Kafka?
Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
kafka官方:http://kafka.apache.org/
Kafka作为一个分布式的流平台,这到底意味着什么?
我们认为,一个流处理平台具有三个关键能力:
- 发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统。
- 以
容错
的方式存储消息(流)。 - 在消息流发生时处理它们。
什么是kakfa的优势?
它应用于2大类应用:
- 构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
- 构建实时流的应用程序,对数据流进行转换或反应。
kafka有四个核心API
- 应用程序使用
Producer API
发布消息到1个或多个topic(主题)。 - 应用程序使用
Consumer API
来订阅一个或多个topic,并处理产生的消息。 - 应用程序使用
Streams API
充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。 Connector API
允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,一个关系数据库的连接器可捕获每一个变化。
Client和Server之间的通讯,是通过一条简单、高性能并且和开发语言无关的TCP协议。并且该协议保持与老版本的兼容。Kafka提供了Java Client(客户端)。除了Java Client外,还有非常多的其它编程语言的Client。
主流消息中间件比较
ActiveMQ | RabbitMQ | Kafka | |
---|---|---|---|
跨语言 | 支持(Java优先) | 语言无关 | 支持(Java优先) |
支持协议 | OpenWire,Stomp, XMPP,AMQP | AMQP | |
优点 | 遵循JMS规范,安装部署方便。 | 继承Erlang天生的并发性,最初用于金融行业,稳定性,安全性有保障。 | 依赖zk,可动态扩展节点,高性能,高吞吐量,无线扩容消息可指定追溯。 |
缺点 | 根据其他用户反馈,会莫名丢失消息,目前重心在下一代的apolle上,目前社区不活跃,对5.X维护较少。 | Erlang语言难度较大,不支持动态扩展。 | 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移。 |
综合评价 | 适合中小企业消息应用场景,不适合上千个队列的应用场景。 | 适合对稳定性要求较高的企业应用。 | 一般应用在大数据日志处理或对实时性,可靠性要求稍低的场景。 |
Kafka好处
- 可靠性 - Kafka是分布式,分区,复制和容错的。
- 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
- 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
- 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。
Kafka非常快,并保证零停机和零数据丢失。
应用场景
- 指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
- 日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
- 流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。
Kafka相关术语
序号 | 组件和说明 |
---|---|
1 | Topics(主题)属于特定类别的消息流称为主题。 数据存储在主题中。主题被拆分成分区。 对于每个主题,Kafka保存一个分区的数据。 每个这样的分区包含不可变有序序列的消息。 分区被实现为具有相等大小的一组分段文件。 |
2 | Partition(分区)主题可能有许多分区,因此它可以处理任意数量的数据。 |
3 | Partition offset(分区偏移)每个分区消息具有称为 offset 的唯一序列标识。 |
4 | Replicas of partition(分区备份)副本只是一个分区的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。 |
5 | Brokers(经纪人)代理是负责维护发布数据的简单系统。 每个代理中的每个主题可以具有零个或多个分区。 假设,如果在一个主题和N个代理中有N个分区,每个代理将有一个分区。假设在一个主题中有N个分区并且多于N个代理(n + m),则第一个N代理将具有一个分区,并且下一个M代理将不具有用于该特定主题的任何分区。假设在一个主题中有N个分区并且小于N个代理(n-m),每个代理将在它们之间具有一个或多个分区共享。 由于代理之间的负载分布不相等,不推荐使用此方案。 |
6 | Kafka Cluster(Kafka集群)Kafka有多个代理被称为Kafka集群。 可以扩展Kafka集群,无需停机。 这些集群用于管理消息数据的持久性和复制。 |
7 | Producers(生产者)生产者是发送给一个或多个Kafka主题的消息的发布者。 生产者向Kafka经纪人发送数据。 每当生产者将消息发布给代理时,代理只需将消息附加到最后一个段文件。 实际上,该消息将被附加到分区。 生产者还可以向他们选择的分区发送消息。 |
8 | Consumers(消费者)****Consumers从经纪人处读取数据。 消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息。 |
9 | Leader(领导者) Leader 是负责给定分区的所有读取和写入的节点。每个分区都有一个服务器充当Leader 。 |
10 | Follower(追随者)跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。 |
使用Kafka
- 安装jdk
- 安装zookepper 官方:http://zookeeper.apache.org/
- 安装kafka
我这里jdk是已经安装好的。
安装zookepper:
tar -zxvf zookeeper-3.4.13.tar.gz #解压
cd zookeeper-3.4.13/config #进入配置目录
#zookeeper运行需要config里有config文件。但是解压后默认只有zoo_sample.cfg,我们将名字修改下即可
mv zoo_sample.cfg zoo.cfg #修改配置文件名字
启动zookeper,来到bin目录:
./zkServer.sh start #启动zookepper
停止zookeper,来到bin目录:
./zkServer.sh start #停止zookepper
kafka下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
使用Kafka,
tar -zxvf kafka_2.11-2.1.0.tgz #解压kafka
启动zookpper服务,来到kafka的bin目录:
./zookeeper-server-start.sh config/zookeeper.properties #启动服务
启动kafka服务:
./kafka-server-start.sh config/server.properties #启动kafka服务
创建一个主题:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #topic_name
topic_name:主题的名字'test'。
创建好后查看主题:
kafka-topics.sh --list --zookeeper localhost:2181
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。
发送消息:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test #主题为test
进入之后就可发送消息!!!
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
消费消息:
#topic主题需要与被消费的主题对应上
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Kafka常用命令
#查看所有主题列表
kafka-topics.sh --zookeeper localhost:2181 --list
#查看指定topic信息
kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_name
#控制台向topic生产数据
kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name
#控制台消费topic的数据
kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_name --from-beginning
#查看topic某分区偏移量最大(小)值
kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list localhost:9092 --partitions 0
#增加topic分区数
kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_name --partitions 10
#删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic topic_name