介绍
ApacheKafka®是一个分布式流媒体平台。这到底是什么意思呢?
我们认为流媒体平台具有三个关键功能:
它可以让你发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。它允许您以容错方式存储记录流。它可以让您在发生记录时处理记录流。什么是卡夫卡好?
它被用于两大类的应用程序:
构建可在系统或应用程序之间可靠获取数据的实时流数据管道构建实时流应用程序,可以转换或响应数据流要了解卡夫卡如何做这些事情,让我们深入探索卡夫卡的能力。
首先几个概念:
Kafka作为一个或多个服务器上的集群运行。Kafka集群以称为主题的类别存储记录流。每个记录由一个键,一个值和一个时间戳组成。卡夫卡有四个核心API:
我们认为流媒体平台具有三个关键功能:
它可以让你发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。它允许您以容错方式存储记录流。它可以让您在发生记录时处理记录流。什么是卡夫卡好?
它被用于两大类的应用程序:
构建可在系统或应用程序之间可靠获取数据的实时流数据管道构建实时流应用程序,可以转换或响应数据流要了解卡夫卡如何做这些事情,让我们深入探索卡夫卡的能力。
首先几个概念:
Kafka作为一个或多个服务器上的集群运行。Kafka集群以称为主题的类别存储记录流。每个记录由一个键,一个值和一个时间戳组成。卡夫卡有四个核心API:
该制片API允许应用程序发布的记录流至一个或多个卡夫卡的话题。该消费者API允许应用程序订阅一个或多个主题,并处理所产生的对他们记录的数据流。所述流API允许应用程序充当流处理器,从一个或多个主题消耗的输入流,并产生一个输出流至一个或多个输出的主题,有效地变换所述输入流,以输出流。该连接器API允许构建和运行卡夫卡主题连接到现有的应用程序或数据系统中重用生产者或消费者。例如,连接到关系数据库的连接器可能会捕获对表的每个更改。
在Kafka中,客户端和服务器之间的通信是通过一个简单的,高性能的,与语言无关的TCP协议完成的。这个协议是版本化的,并保持与旧版本的向后兼容性。我们为Kafka提供了一个Java客户端,但客户端可以使用多种语言。
主题和日志
让我们先深入核心抽象Kafka提供了一个记录流 - 主题。
主题是要将记录发布到的类别或供稿源名称。卡夫卡的话题总是多用户的; 也就是说,一个主题可以有零个,一个或多个订阅写入数据的消费者。
对于每个主题,Kafka集群维护一个分区日志,如下所示:
每个分区是一个有序的,不可变的记录序列,不断追加到结构化的提交日志中。分区中的记录每个分配一个连续的id号,称为偏移量,用于唯一标识分区内的每条记录。
Kafka集群使用可配置的保留期限来保留所有已发布的记录(无论是否已被使用)。例如,如果保留策略设置为两天,则在记录发布后的两天内,保留策略可供使用,之后将被丢弃以腾出空间。卡夫卡的性能在数据大小方面是有效的,所以长时间存储数据不成问题。
实际上,以消费者为单位保留的唯一元数据是消费者在日志中的偏移或位置。这个偏移量是由消费者控制的:消费者通常会在读取记录时线性地推进其偏移量,但事实上,由于消费者的位置是由消费者控制的,所以它可以以任何喜欢的顺序消费记录。例如,消费者可以重置为较旧的偏移量以重新处理来自过去的数据,或者跳至最近的记录并从“now”开始消费。
这些功能的组合意味着卡夫卡的消费者非常便宜 - 他们可以来来去去,对集群或其他消费者没有太大的影响。例如,您可以使用我们的命令行工具来“尾巴”任何主题的内容,而不会改变现有的使用者所使用的内容。
日志中的分区有几个用途。首先,它们允许日志的大小超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但是一个主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们作为并行的单位
- 更多的是在一点上。
分配
日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据和请求共享分区。每个分区都通过可配置数量的服务器进行复制,以实现容错。
每个分区有一个服务器充当“领导者”,零个或多个服务器充当“追随者”。领导处理分区的所有读取和写入请求,而追随者被动地复制领导。如果领导失败,其中一个追随者将自动成为新领导。每个服务器充当其中一些分区的领导者和其他人的追随者,因此负载在集群内平衡良好。
生产者
生产者发布数据到他们选择的主题。生产者负责选择哪个记录分配给主题内的哪个分区。这可以以循环的方式完成,只是为了平衡负载,或者可以根据某些语义分区功能(例如基于记录中的某个键)来完成。更多关于使用分区在第二!
消费者
消费者用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程中或在不同的机器上。
如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载均衡。
如果所有消费者实例具有不同的消费者组,则每个记录将被广播给所有消费者进程。
两个服务器Kafka集群托管四个分区(P0-P3)与两个消费者组。消费者组A有两个消费者实例,而组B有四个消费者实例。
然而,更普遍的是,我们发现话题中有少量消费群体,每个“逻辑用户”都有一个消费群体。每个组由许多消费者实例组成,具有可扩展性和容错性。这不过是发布 - 订阅语义,订阅者是一群消费者而不是一个进程。
在Kafka中实现消费的方式是将日志中的分区划分为消费者实例,以便每个实例在任何时间点都是“公平分享”分区的唯一消费者。这个维护组中成员资格的过程是由Kafka协议动态地处理的。如果新实例加入组,他们将接管来自组中其他成员的一些分区; 如果一个实例死亡,其分区将分配给其余的实例。
卡夫卡只提供一个分区内的记录总数,而不是主题中的不同分区之间。每个分区排序与按键分区数据的能力相结合,足以满足大多数应用程序的需求。但是,如果您需要全部订单而不是记录,则可以通过仅具有一个分区的主题来实现,但这意味着每个消费者组只有一个消费者进程。
担保
在一个高层次的卡夫卡提供以下保证:
由制作者发送到特定主题分区的消息将按照它们发送的顺序附加。也就是说,如果记录M1由同一个生产者作为记录M2发送,并且M1被首先发送,则M1将具有比M2更低的偏移并且出现在日志中的更早。消费者实例按照存储在日志中的顺序查看记录。对于具有复制因子N的主题,我们将容忍多达N-1个服务器故障,而不会丢失任何提交给日志的记录。有关这些保证的更多细节在文档的设计部分给出。
卡夫卡作为消息系统
卡夫卡的流概念如何与传统的企业消息传递系统相比较?
消息传统上有两种模式:排队和发布
- 订阅。在队列中,消费者池可以从服务器读取并且每个记录都转到其中的一个; 在发布 - 订阅记录被广播给所有消费者。这两种模式都有其优点和缺点。排队的优势在于它允许您将数据处理划分为多个消费者实例,这样可以扩展处理。不幸的是,队列不是多用户的,一旦一个进程读取了数据,发布
- 订阅允许您将数据广播到多个进程,但无法进行扩展处理,因为每条消息都发送给每个订阅者。
卡夫卡的消费群体概念概括了这两个概念。与队列一样,消费者组允许您将一系列流程(消费者组的成员)的处理分开。与发布 - 订阅一样,Kafka允许您向多个消费者群体广播消息。
Kafka模型的优点是每个主题都具有这些属性 - 它可以扩展处理,也可以是多用户 - 不需要选择其中一个。
Kafka也比传统的消息系统有更强的订单保证。
传统队列在服务器上按顺序保留记录,并且如果多个使用者从队列中消耗,则服务器按照它们存储的顺序来提交记录。但是,虽然服务器按顺序提交记录,但是记录是异步传递给消费者的,所以它们可能会针对不同的消费者而出现故障。这实际上意味着记录的排序在并行消耗的情况下丢失。消息传递系统通常具有“排他消费者”的概念,只允许一个进程从队列中消费,但这当然意味着在处理过程中没有并行性。
卡夫卡做得更好。通过在主题内部具有并行性概念 - 分区概念,Kafka能够提供订单保证和负载平衡。这是通过将主题中的分区分配给使用者组中的使用者来实现的,以便每个分区仅由组中的一个使用者使用。通过这样做,我们确保消费者是该分区的唯一读者,并按顺序使用这些数据。由于有很多分区,这仍然可以平衡许多消费者实例的负载。但请注意,消费群组中的消费者实例不能多于分区。
卡夫卡作为存储系统
任何允许将消息发布出去的消息队列都可以充当存储系统。Kafka的不同之处在于它是一个非常好的存储系统。
写入Kafka的数据写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便在完全复制之前写入不被认为是完整的,并且即使写入的服务器失败也能保证持续。
Kafka的磁盘结构使用了很好的规模 - 无论您在服务器上有50 KB还是50 TB的持久性数据,Kafka都会执行相同的操作。
由于认真考虑存储并允许客户端控制其读取位置,所以可以将Kafka视为专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。
有关Kafka提交日志存储和复制设计的详细信息,请阅读此页面。
卡夫卡流处理
只读取,写入和存储数据流是不够的,目的是启用流的实时处理。
在Kafka中,流处理器是指从输入主题获取连续数据流,对该输入执行一些处理,并产生连续数据流以输出主题的任何东西。
例如,零售应用程序可能会接受销售和发货的输入流,并输出一系列重新排序和对这些数据进行计算的价格调整。
直接使用生产者和消费者API可以做简单的处理。但是对于更复杂的转换,Kafka提供了一个完全集成的Streams
API。这允许构建应用程序进行非平凡的处理,从而计算聚合关闭流或将流连接在一起。
这个工具有助于解决这类应用程序面临的难题:处理乱序数据,重新处理代码更改的输入,执行有状态的计算等等。
流API基于Kafka提供的核心原语构建:它使用生产者和消费者API进行输入,使用Kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。
放在一起
消息传递,存储和流处理的这种组合看起来很不寻常,但对于Kafka作为一个流媒体平台来说,这是非常重要的。
像HDFS这样的分布式文件系统允许存储用于批处理的静态文件。有效地,这样的系统允许存储和处理过去的历史数据。
传统的企业消息传递系统允许处理将来订阅的消息。以这种方式构建的应用程序在到达时处理将来的数据。
Kafka结合了这两种功能,并且这两种组合对于Kafka用作流式传输应用程序平台和流式传输数据管道都是至关重要的。
通过将存储和低延迟订阅相结合,流式应用程序可以同样的方式处理过去和未来的数据。这是一个单一的应用程序可以处理历史,存储的数据,而不是结束,当它达到最后一个记录,它可以继续处理未来的数据到达。这是流处理的概括概念,包括批处理以及消息驱动的应用程序。
同样,对于流式传输数据流水线,订阅实时事件的组合使得可以将Kafka用于非常低延迟的流水线; 但是可靠地存储数据的能力可以将其用于必须保证数据交付的关键数据,或者与只能定期加载数据的离线系统集成,或者可能长时间停机进行维护。流处理设施可以在数据到达时进行转换。
在Kafka中,客户端和服务器之间的通信是通过一个简单的,高性能的,与语言无关的TCP协议完成的。这个协议是版本化的,并保持与旧版本的向后兼容性。我们为Kafka提供了一个Java客户端,但客户端可以使用多种语言。
主题和日志
让我们先深入核心抽象Kafka提供了一个记录流 - 主题。
主题是要将记录发布到的类别或供稿源名称。卡夫卡的话题总是多用户的; 也就是说,一个主题可以有零个,一个或多个订阅写入数据的消费者。
对于每个主题,Kafka集群维护一个分区日志,如下所示:
每个分区是一个有序的,不可变的记录序列,不断追加到结构化的提交日志中。分区中的记录每个分配一个连续的id号,称为偏移量,用于唯一标识分区内的每条记录。
Kafka集群使用可配置的保留期限来保留所有已发布的记录(无论是否已被使用)。例如,如果保留策略设置为两天,则在记录发布后的两天内,保留策略可供使用,之后将被丢弃以腾出空间。卡夫卡的性能在数据大小方面是有效的,所以长时间存储数据不成问题。
实际上,以消费者为单位保留的唯一元数据是消费者在日志中的偏移或位置。这个偏移量是由消费者控制的:消费者通常会在读取记录时线性地推进其偏移量,但事实上,由于消费者的位置是由消费者控制的,所以它可以以任何喜欢的顺序消费记录。例如,消费者可以重置为较旧的偏移量以重新处理来自过去的数据,或者跳至最近的记录并从“now”开始消费。
这些功能的组合意味着卡夫卡的消费者非常便宜 - 他们可以来来去去,对集群或其他消费者没有太大的影响。例如,您可以使用我们的命令行工具来“尾巴”任何主题的内容,而不会改变现有的使用者所使用的内容。
日志中的分区有几个用途。首先,它们允许日志的大小超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但是一个主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们作为并行的单位
- 更多的是在一点上。
分配
日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据和请求共享分区。每个分区都通过可配置数量的服务器进行复制,以实现容错。
每个分区有一个服务器充当“领导者”,零个或多个服务器充当“追随者”。领导处理分区的所有读取和写入请求,而追随者被动地复制领导。如果领导失败,其中一个追随者将自动成为新领导。每个服务器充当其中一些分区的领导者和其他人的追随者,因此负载在集群内平衡良好。
生产者
生产者发布数据到他们选择的主题。生产者负责选择哪个记录分配给主题内的哪个分区。这可以以循环的方式完成,只是为了平衡负载,或者可以根据某些语义分区功能(例如基于记录中的某个键)来完成。更多关于使用分区在第二!
消费者
消费者用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在不同的进程中或在不同的机器上。
如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载均衡。
如果所有消费者实例具有不同的消费者组,则每个记录将被广播给所有消费者进程。
两个服务器Kafka集群托管四个分区(P0-P3)与两个消费者组。消费者组A有两个消费者实例,而组B有四个消费者实例。
然而,更普遍的是,我们发现话题中有少量消费群体,每个“逻辑用户”都有一个消费群体。每个组由许多消费者实例组成,具有可扩展性和容错性。这不过是发布 - 订阅语义,订阅者是一群消费者而不是一个进程。
在Kafka中实现消费的方式是将日志中的分区划分为消费者实例,以便每个实例在任何时间点都是“公平分享”分区的唯一消费者。这个维护组中成员资格的过程是由Kafka协议动态地处理的。如果新实例加入组,他们将接管来自组中其他成员的一些分区; 如果一个实例死亡,其分区将分配给其余的实例。
卡夫卡只提供一个分区内的记录总数,而不是主题中的不同分区之间。每个分区排序与按键分区数据的能力相结合,足以满足大多数应用程序的需求。但是,如果您需要全部订单而不是记录,则可以通过仅具有一个分区的主题来实现,但这意味着每个消费者组只有一个消费者进程。
担保
在一个高层次的卡夫卡提供以下保证:
由制作者发送到特定主题分区的消息将按照它们发送的顺序附加。也就是说,如果记录M1由同一个生产者作为记录M2发送,并且M1被首先发送,则M1将具有比M2更低的偏移并且出现在日志中的更早。消费者实例按照存储在日志中的顺序查看记录。对于具有复制因子N的主题,我们将容忍多达N-1个服务器故障,而不会丢失任何提交给日志的记录。有关这些保证的更多细节在文档的设计部分给出。
卡夫卡作为消息系统
卡夫卡的流概念如何与传统的企业消息传递系统相比较?
消息传统上有两种模式:排队和发布
- 订阅。在队列中,消费者池可以从服务器读取并且每个记录都转到其中的一个; 在发布 - 订阅记录被广播给所有消费者。这两种模式都有其优点和缺点。排队的优势在于它允许您将数据处理划分为多个消费者实例,这样可以扩展处理。不幸的是,队列不是多用户的,一旦一个进程读取了数据,发布
- 订阅允许您将数据广播到多个进程,但无法进行扩展处理,因为每条消息都发送给每个订阅者。
卡夫卡的消费群体概念概括了这两个概念。与队列一样,消费者组允许您将一系列流程(消费者组的成员)的处理分开。与发布 - 订阅一样,Kafka允许您向多个消费者群体广播消息。
Kafka模型的优点是每个主题都具有这些属性 - 它可以扩展处理,也可以是多用户 - 不需要选择其中一个。
Kafka也比传统的消息系统有更强的订单保证。
传统队列在服务器上按顺序保留记录,并且如果多个使用者从队列中消耗,则服务器按照它们存储的顺序来提交记录。但是,虽然服务器按顺序提交记录,但是记录是异步传递给消费者的,所以它们可能会针对不同的消费者而出现故障。这实际上意味着记录的排序在并行消耗的情况下丢失。消息传递系统通常具有“排他消费者”的概念,只允许一个进程从队列中消费,但这当然意味着在处理过程中没有并行性。
卡夫卡做得更好。通过在主题内部具有并行性概念 - 分区概念,Kafka能够提供订单保证和负载平衡。这是通过将主题中的分区分配给使用者组中的使用者来实现的,以便每个分区仅由组中的一个使用者使用。通过这样做,我们确保消费者是该分区的唯一读者,并按顺序使用这些数据。由于有很多分区,这仍然可以平衡许多消费者实例的负载。但请注意,消费群组中的消费者实例不能多于分区。
卡夫卡作为存储系统
任何允许将消息发布出去的消息队列都可以充当存储系统。Kafka的不同之处在于它是一个非常好的存储系统。
写入Kafka的数据写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便在完全复制之前写入不被认为是完整的,并且即使写入的服务器失败也能保证持续。
Kafka的磁盘结构使用了很好的规模 - 无论您在服务器上有50 KB还是50 TB的持久性数据,Kafka都会执行相同的操作。
由于认真考虑存储并允许客户端控制其读取位置,所以可以将Kafka视为专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。
有关Kafka提交日志存储和复制设计的详细信息,请阅读此页面。
卡夫卡流处理
只读取,写入和存储数据流是不够的,目的是启用流的实时处理。
在Kafka中,流处理器是指从输入主题获取连续数据流,对该输入执行一些处理,并产生连续数据流以输出主题的任何东西。
例如,零售应用程序可能会接受销售和发货的输入流,并输出一系列重新排序和对这些数据进行计算的价格调整。
直接使用生产者和消费者API可以做简单的处理。但是对于更复杂的转换,Kafka提供了一个完全集成的Streams
API。这允许构建应用程序进行非平凡的处理,从而计算聚合关闭流或将流连接在一起。
这个工具有助于解决这类应用程序面临的难题:处理乱序数据,重新处理代码更改的输入,执行有状态的计算等等。
流API基于Kafka提供的核心原语构建:它使用生产者和消费者API进行输入,使用Kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。
放在一起
消息传递,存储和流处理的这种组合看起来很不寻常,但对于Kafka作为一个流媒体平台来说,这是非常重要的。
像HDFS这样的分布式文件系统允许存储用于批处理的静态文件。有效地,这样的系统允许存储和处理过去的历史数据。
传统的企业消息传递系统允许处理将来订阅的消息。以这种方式构建的应用程序在到达时处理将来的数据。
Kafka结合了这两种功能,并且这两种组合对于Kafka用作流式传输应用程序平台和流式传输数据管道都是至关重要的。
通过将存储和低延迟订阅相结合,流式应用程序可以同样的方式处理过去和未来的数据。这是一个单一的应用程序可以处理历史,存储的数据,而不是结束,当它达到最后一个记录,它可以继续处理未来的数据到达。这是流处理的概括概念,包括批处理以及消息驱动的应用程序。
同样,对于流式传输数据流水线,订阅实时事件的组合使得可以将Kafka用于非常低延迟的流水线; 但是可靠地存储数据的能力可以将其用于必须保证数据交付的关键数据,或者与只能定期加载数据的离线系统集成,或者可能长时间停机进行维护。流处理设施可以在数据到达时进行转换。
Kafka的用途:(官网翻译)
消息
卡夫卡可以很好地替代传统的信息经纪人。消息代理被用于各种原因(将数据处理与数据生成器分离,缓冲未处理的消息等)。与大多数消息传递系统相比,Kafka具有更好的吞吐量,内置的分区,复制和容错能力,使其成为大型消息处理应用的一个很好的解决方案。
根据我们的经验,消息传递的使用往往是相对较低的吞吐量,但可能需要较低的端到端延迟,并且通常取决于Kafka提供的强大的持久性保证。
在这个领域,Kafka与传统的消息系统(如ActiveMQ或 RabbitMQ)相当。
网站活动跟踪
Kafka的原始用例是能够将用户活动跟踪管道重建为一组实时发布 - 订阅订阅源。这意味着网站活动(用户可能采用的网页浏览量,搜索或其他操作)将发布到每个活动类型一个主题的中心主题。这些订阅源可用于订阅各种用例,包括实时处理,实时监控,加载到Hadoop或离线数据仓库系统以进行离线处理和报告。
活动跟踪通常是非常高的量,因为为每个用户页面视图生成许多活动消息。
度量
卡夫卡通常用于运行监控数据。这涉及从分布式应用程序汇总统计数据以生成操作数据的集中式源。
日志聚合
许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将其置于中央位置(可能是文件服务器或HDFS)进行处理。Kafka提取文件的细节,并将日志或事件数据作为消息流进行更清晰的抽象。这样可以实现更低延迟的处理,并且更容易支持多个数据源和分布式数据消耗。与Scribe或Flume等以日志为中心的系统相比,Kafka提供同样出色的性能,由复制产生的更强大的持久性保证以及更低的端到端延迟。
流处理
Kafka的许多用户在处理管道中处理数据,这些数据由多个阶段组成,其中原始输入数据从Kafka主题中消耗,然后聚合,丰富或以其他方式转化为新的主题,以供进一步消费或后续处理。例如,用于推荐新闻文章的处理流水线可以从RSS提要抓取文章内容并将其发布到“文章”主题; 进一步的处理可以对这个内容进行归一化或者重复删除,并且将已清理的文章内容发布到新的主题; 最终处理阶段可能会尝试将这些内容推荐给用户。这种处理流水线基于各个主题创建实时数据流的图表。从0.10.0.0开始,这是一个轻量但功能强大的流处理库,称为Kafka
Streams 在Apache Kafka中可用于执行如上所述的数据处理。除了Kafka Streams之外,替代性的开源流处理工具还包括Apache
Storm和 Apache Samza。
事件采购
事件源是应用程序设计的一种风格,其中状态更改以时间排序的记录序列进行记录。Kafka对非常大的存储日志数据的支持使得它成为以这种风格构建的应用程序的优秀后端。
提交日志
Kafka可以作为分布式系统的一种外部提交日志。日志有助于复制节点之间的数据,并作为失败节点恢复数据的重新同步机制。Kafka中的日志压缩功能有助于支持这种用法。在这个用法中,Kafka与Apache
BookKeeper项目类似。
1、下载并解压
>
tar
-xzf kafka_2.10-0.8.1.1tgz
>
cd
cd
kafka_2.10-0.8.1.1
2、启动服务器
Kafka使用ZooKeeper,因此如果您还没有ZooKeeper服务器,则需要先启动ZooKeeper服务器。您可以使用与kafka一起打包的便捷脚本来获取快速而简单的单节点ZooKeeper实例。
bin/zookeeper-server-start.sh
config/zookeeper.properties
bin/zookeeper-server-start.sh
config/zookeeper.properties
现在另起一个终端启动Kafka服务器:
bin/kafka-server-start.sh
config/server.properties
bin/kafka-server-start.sh
config/server.properties
3、另起一个窗口创建一个主题
我们用一个分区和一个副本创建一个名为“test”的主题
bin/kafka-topics.sh
--create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
test
我们现在可以看到这个话题,如果我们运行列表主题命令:
bin/kafka-topics.sh
--list --zookeeper localhost:2181
4、发送一些消息
Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台输入一些消息发送到服务器。
bin/kafka-console-producer.sh
--broker-list localhost:9092 --topic test
5、启动两个用户
卡夫卡也有一个命令行消费者,将消息转储到标准输出。
bin/kafka-console-consumer.sh
--zookeeper localhost:2181 localhost:9092 --topic test --from-beginning
---------------------
作者:土豆拍死马铃薯
来源:CSDN
原文:https://blog.csdn.net/csj941227/article/details/78703641
版权声明:本文为博主原创文章,转载请附上博文链接!
运行生产者,然后在控制台输入一些消息发送到服务器。
bin/kafka-console-producer.sh
--broker-list localhost:9092 --topic test
5、启动两个用户
卡夫卡也有一个命令行消费者,将消息转储到标准输出。
bin/kafka-console-consumer.sh
--zookeeper localhost:2181 localhost:9092 --topic test --from-beginning
---------------------
作者:土豆拍死马铃薯
来源:CSDN
原文:https://blog.csdn.net/csj941227/article/details/78703641
版权声明:本文为博主原创文章,转载请附上博文链接!