zoukankan      html  css  js  c++  java
  • kafka

    一、入门

    kafka是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点,并已在成千上万家公司运行。

    简单说明什么是kafka

    Apache kafka是消息中间件的一种,我发现很多人不知道消息中间件是什么,在开始学习之前,我这边就先简单的解释一下什么是消息中间件,只是粗略的讲解,目前kafka已经可以做更多的事情。
    举个例子,生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
    鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。
    消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
    各位现在知道kafka是干什么的了吧,它就是那个"篮子"。

    kafka名词解释

    • producer:生产者,就是它来生产“鸡蛋”的。
    • consumer:消费者,生出的“鸡蛋”它来消费。
    • topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
    • broker:就是篮子了。

    Kafka作为一个分布式的流平台,这到底意味着什么?

    我们认为,一个流处理平台具有三个关键能力:

    1.发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统。
    2.以容错(故障转移)的方式存储消息(流)。
    3.在消息流发生时处理它们。

    什么是kafka的优势?它主要应用于2大类应用:

    1.kafka作为一个集群运行在一个或多个服务器上。
    2.kafka集群存储的消息是以topic为类别记录的。
    3.每个消息(也叫记录record,我习惯叫消息)是由一个key,一个value和时间戳构成。
    4.构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
    5.构建实时流的应用程序,对数据流进行转换或反应。

    kafka有四个核心API:

    • 应用程序使用 Producer API 发布消息到1个或多个topic(主题)中。
    • 应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
    • 应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
    • Connector API 可构建或运行可重用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,连接到关系数据库的连接器可以捕获表的每个变更。


    Client和Server之间的通讯,是通过一条简单、高性能并且和开发语言无关的TCP协议。并且该协议保持与老版本的兼容。Kafka提供了Java Client(客户端)。除了Java客户端外,还有非常多的其它编程语言的客户端。

    主题和日志 (Topic和Log)

    让我们更深入的了解Kafka中的Topic。
    Topic是发布的消息的类别名,一个topic可以有零个,一个或多个消费者订阅该主题的消息。
    对于每个topic,Kafka集群都会维护一个分区log,就像下图中所示:

    每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

    Kafka集群保持所有的消息,直到它们过期(无论消息是否被消费)。实际上消费者所持有的仅有的元数据就是这个offset(偏移量),也就是说offset由消费者来控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更早的位置,重新读取消息。可以看到这种设计对消费者来说操作自如,一个消费者的操作不会影响其它消费者对此log的处理。

    再说说分区。Kafka中采用分区的设计有几个目的。一是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。第二,分区可以作为并行处理的单元,稍后会谈到这一点。

    分布式(Distribution)

    Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

    Geo-Replication(异地数据同步技术)

    Kafka MirrorMaker为群集提供geo-replication支持。借助MirrorMaker,消息可以跨多个数据中心或云区域进行复制。 您可以在active/passive场景中用于备份和恢复; 或者在active/passive方案中将数据置于更接近用户的位置,或数据本地化。

    生产者(Producers)

    生产者往某个Topic上发布消息。生产者也负责选择发布到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。开发者负责如何选择分区的算法。

    消费者(Consumers)

    通常来讲,消息模型可以分为两种, 队列和发布-订阅式。 队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了queue模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 更通用的, 我们可以创建一些消费者组作为逻辑上的订阅者。每个组包含数目不等的消费者, 一个组内多个消费者可以用来扩展性能和容错。正如下图所示:

    2个kafka集群托管4个分区(P0-P3),2个消费者组,消费组A有2个消费者实例,消费组B有4个。

    正像传统的消息系统一样,Kafka保证消息的顺序不变。 再详细扯几句。传统的队列模型保持消息,并且保证它们的先后顺序不变。但是, 尽管服务器保证了消息的顺序,消息还是异步的发送给各个消费者,消费者收到消息的先后顺序不能保证了。这也意味着并行消费将不能保证消息的先后顺序。用过传统的消息系统的同学肯定清楚,消息的顺序处理很让人头痛。如果只让一个消费者处理消息,又违背了并行处理的初衷。 在这一点上Kafka做的更好,尽管并没有完全解决上述问题。 Kafka采用了一种分而治之的策略:分区。 因为Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

    Kafka的保证(Guarantees)

    • 生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,如果一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,并且优先的出现在日志中。
    • 消费者收到的消息也是此顺序。
    • 如果一个Topic配置了复制因子(replication factor)为N, 那么可以允许N-1服务器宕机而不丢失任何已经提交(committed)的消息。

    kafka作为一个消息系统

    Kafka的流与传统企业消息系统相比的概念如何?
    传统的消息有两种模式:队列和发布订阅。 在队列模式中,消费者池从服务器读取消息(每个消息只被其中一个读取); 发布订阅模式:消息广播给所有的消费者。这两种模式都有优缺点,队列的优点是允许多个消费者瓜分处理数据,这样可以扩展处理。但是,队列不像多个订阅者,一旦消息者进程读取后故障了,那么消息就丢了。而发布和订阅允许你广播数据到多个消费者,由于每个订阅者都订阅了消息,所以没办法缩放处理。

    kafka中消费者组有两个概念:队列:消费者组(consumer group)允许同名的消费者组成员瓜分处理。发布订阅:允许你广播消息给多个消费者组(不同名)。

    kafka的每个topic都具有这两种模式。
    kafka有比传统的消息系统更强的顺序保证。
    传统的消息系统按顺序保存数据,如果多个消费者从队列消费,则服务器按存储的顺序发送消息,但是,尽管服务器按顺序发送,消息异步传递到消费者,因此消息可能乱序到达消费者。这意味着消息存在并行消费的情况,顺序就无法保证。消息系统常常通过仅设1个消费者来解决这个问题,但是这意味着没用到并行处理。

    kafka做的更好。通过并行topic的parition —— kafka提供了顺序保证和负载均衡。每个partition仅由同一个消费者组中的一个消费者消费到。并确保消费者是该partition的唯一消费者,并按顺序消费数据。每个topic有多个分区,则需要对多个消费者做负载均衡,但请注意,相同的消费者组中不能有比分区更多的消费者,否则多出的消费者一直处于空等待,不会收到消息。

    kafka作为一个存储系统

    所有发布消息到消息队列和消费分离的系统,实际上都充当了一个存储系统(发布的消息先存储起来)。Kafka比别的系统的优势是它是一个非常高性能的存储系统。

    写入到kafka的数据将写到磁盘并复制到集群中保证容错性。并允许生产者等待消息应答,直到消息完全写入。

    kafka的磁盘结构 - 无论你服务器上有50KB或50TB,执行是相同的。

    client来控制读取数据的位置。你还可以认为kafka是一种专用于高性能,低延迟,提交日志存储,复制,和传播特殊用途的分布式文件系统。

    kafka的流处理

    仅仅读,写和存储是不够的,kafka的目标是实时的流处理。

    在kafka中,流处理持续获取输入topic的数据,进行处理加工,然后写入输出topic。例如,一个零售APP,接收销售和出货的输入流,统计数量或调整价格后输出。

    可以直接使用producer和consumer API进行简单的处理。对于复杂的转换,Kafka提供了更强大的Streams API。可构建聚合计算或连接流到一起的复杂应用程序。

    助于解决此类应用面临的硬性问题:处理无序的数据,代码更改的再处理,执行状态计算等。

    Sterams API在Kafka中的核心:使用producer和consumer API作为输入,利用Kafka做状态存储,使用相同的组机制在stream处理器实例之间进行容错保障。

    拼在一起

    消息传递,存储和流处理的组合看似反常,但对于Kafka作为流式处理平台的作用至关重要。

    像HDFS这样的分布式文件系统允许存储静态文件来进行批处理。这样系统可以有效地存储和处理来自过去的历史数据。

    传统企业的消息系统允许在你订阅之后处理未来的消息:在未来数据到达时处理它。

    Kafka结合了这两种能力,这种组合对于kafka作为流处理应用和流数据管道平台是至关重要的。

    批处理以及消息驱动应用程序的流处理的概念:通过组合存储和低延迟订阅,流处理应用可以用相同的方式对待过去和未来的数据。它是一个单一的应用程序,它可以处理历史的存储数据,当它处理到最后一个消息时,它进入等待未来的数据到达,而不是结束。

    同样,对于流数据管道(pipeline),订阅实时事件的组合使得可以将Kafka用于非常低延迟的管道;但是,可靠地存储数据的能力使得它可以将其用于必须保证传递的关键数据,或与仅定期加载数据或长时间维护的离线系统集成在一起。流处理可以在数据到达时转换它。

    二、kafka的使用场景

    消息

    kafka更好的替换传统的消息系统,消息系统被用于各种场景(解耦数据生产者,缓存未处理的消息,等),与大多数消息系统比较,kafka有更好的吞吐量,内置分区,副本和故障转移,这有利于处理大规模的消息。

    根据我们的经验,消息往往用于较低的吞吐量,但需要低的端到端延迟,并需要提供强大的耐用性的保证。

    在这一领域的kafka比得上传统的消息系统,如ActiveMQ或RabbitMQ。

    网站活动追踪

    kafka原本的使用场景:用户的活动追踪,网站的活动(网页游览,搜索或其他用户的操作信息)发布到不同的主题中心,这些消息可实时处理,实时监测,也可加载到Hadoop或离线处理数据仓库。

    每个用户页面视图都会产生非常高的量。

    指标

    kafka也常常用于监测数据。分布式应用程序生成的统计数据集中聚合。

    日志聚合

    许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器中收集物理日志文件,并将它们放在中央位置(可能是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。这允许更低延迟的处理并更容易支持多个数据源和分布式数据消费。

    流处理

    kafka中消息处理一般包含多个阶段。其中原始输入数据是从kafka主题消费的,然后汇总,丰富,或者以其他的方式处理转化为新主题,例如,一个推荐新闻文章,文章内容可能从“articles”主题获取;然后进一步处理内容,得到一个处理后的新内容,最后推荐给用户。这种处理是基于单个主题的实时数据流。从0.10.0.0开始,轻量,但功能强大的流处理,就可以这样进行数据处理了。

    除了Kafka Streams,还有Apache Storm和Apache Samza可选择。

    事件采集

    事件采集是一种应用程序的设计风格,其中状态的变化根据时间的顺序记录下来,kafka支持这种非常大的存储日志数据的场景。

    提交日志

    kafka可以作为一种分布式的外部日志,可帮助节点之间复制数据,并作为失败的节点来恢复数据重新同步,kafka的日志压缩功能很好的支持这种用法,这种用法类似于Apacha BookKeeper项目。

    三、kafka安装和启动

    下载代码

    下载并且解压它。

    > tar -xzf kafka_2.13-2.8.0.tgz
    > cd kafka_2.13-2.8.0
    

    启动服务

    运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。

    # 注意:很快,Apache Kafka将不再需要ZooKeeper。
    > bin/zookeeper-server-start.sh config/zookeeper.properties
    ...
    

    打开另一个命令终端启动kafka服务:

    > bin/kafka-server-start.sh config/server.properties &
    

    一旦所有服务成功启动,那Kafka已经可以使用了。

    创建一个主题(topic)

    创建一个名为“test”的Topic,只有一个分区和一个备份:

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    

    创建好之后,可以通过运行以下命令,查看已创建的topic信息:

    > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
    Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    

    或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

    发送消息

    Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

    运行 producer(生产者),然后在控制台输入几条消息到服务器。

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    

    消费消息

    Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message
    

    如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

    可以使用Ctrl-C停止消费者客户端。

    kafka集群安装搭建

    接上一步,单集群已经搭建好之后,设置多个broker集群。
    到目前,我们只是单一的运行一个broker,没什么意思。对于Kafka,一个broker仅仅只是一个集群的大小,所有让我们多设几个broker。

    首先为每个broker创建一个配置文件:

    > cp config/server.properties config/server-1.properties 
    > cp config/server.properties config/server-2.properties
    

    现在编辑这些新建的文件,设置以下属性:

    config/server-1.properties: 
        broker.id=1 
        listeners=PLAINTEXT://:9093 
        log.dir=/tmp/kafka-logs-1
    
    config/server-2.properties: 
        broker.id=2 
        listeners=PLAINTEXT://:9094 
        log.dir=/tmp/kafka-logs-2
    

    broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。

    我们已经运行了zookeeper和刚才的一个kafka节点,所有我们只需要在启动2个新的kafka节点。

    > bin/kafka-server-start.sh config/server-1.properties &
    ... 
    > bin/kafka-server-start.sh config/server-2.properties &
    ...
    

    现在,我们创建一个新topic,把备份设置为:3

    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    

    好了,现在我们已经有了一个集群了,我们怎么知道每个集群在做什么呢?运行命令“describe topics”

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    

    输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息,因为我们只有一个分区,所以只有一行。

    • "leader":该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
    • "replicas":备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
    • "isr":“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
      我们运行这个命令,看看一开始我们创建的那个节点:
    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    

    这并不奇怪,刚才创建的主题没有Replicas,并且在服务器“0”上,我们创建它的时候,集群中只有一个服务器,所以是“0”。

    让我们来发布一些信息在新的topic上:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
     ...
    my test message 1
    my test message 2
    ^C
    

    现在,消费这些消息。

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
     ...
    my test message 1
    my test message 2
    ^C
    

    我们要测试集群的容错,kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。

    > ps | grep server-1.properties
    7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... 
    > kill -9 7564
    

    在Windows上使用:

    > wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
    ProcessId
    6016
    > taskkill /pid 6016 /f
    

    备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0
    

    但是,消息仍然没丢:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    使用 Kafka Connect 来 导入/导出 数据

    你可能在现有的系统中拥有大量的数据,如关系型数据库或传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统提取数据到Kafka,反之亦然。用Kafka整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。

    使用Kafka Stream来处理数据

    一旦你的数据存储在Kafka中,你就可以用Kafka Streams客户端库来处理这些数据,该库适用于Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams将在客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势相结合,使这些应用程序具有可扩展性、弹性、容错性和分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。

    为了给你一个初步的体验,这里是如何实现流行的WordCount算法的:

    KStream<String, String> textLines = builder.stream("quickstart-events");
    
    KTable<String, Long> wordCounts = textLines
                .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
                .groupBy((keyIgnored, word) -> word)
                .count();
    
    wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
    

    停止Kafka

    现在你已经达到了快速入门的终点,可以随时卸载Kafka环境,或者继续玩下去。

    1.使用Ctrl-C停止生产者和消费者客户端。
    2.使用Ctrl-C停止Kafka broker。
    2.最后,用Ctrl-C停止ZooKeeper。
    如果你还想删除你的本地Kafka环境的数据,包括你创建的消息,运行命令。

    $ rm -rf /tmp/kafka-logs /tmp/zookeeper
    
    

    四、以系统服务方式启动kafka

    创建服务文件

    创建 /usr/lib/systemd/system/zookeeper.service 并写入

    [Unit]
    Requires=network.target
    After=network.target
    [Service]
    Type=simple
    LimitNOFILE=1048576
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
    Restart=Always
    [Install]
    WantedBy=multi-user.target
    

    创建 /usr/lib/systemd/system/kafka.service 并写入

    [Unit]
    Requires=zookeeper.service
    After=zookeeper.service
    [Service]
    Type=simple
    LimitNOFILE=1048576
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties 
    ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
    Restart=Always
    [Install]
    WantedBy=multi-user.target
    

    注意:示例的kafka安装地址在 /usr/local/kafka

    启动服务

    重载系统服务并启动

    systemctl daemon-reload
    systemctl enable zookeeper && systemctl enable kafka
    systemctl start zookeeper && systemctl start kafka
    systemctl status zookeeper && systemctl status kafka
    

    五、kafka命令大全

    管理

    ## 创建topic(4个分区,2个副本)
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
    
    ## kafka版本 >= 2.2
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    ## 分区扩容
    # kafka版本 < 2.2
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
    
    # kafka版本 >= 2.2
    bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
    
    ## 删除topic
    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
    

    查询

    ## 查询集群描述
    bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
    
    ## topic列表查询
    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
    
    ## topic列表查询(支持0.9版本+)
    bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    
    ## 新消费者列表查询(支持0.9版本+)
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
    
    ## 新消费者列表查询(支持0.10版本+)
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    
    ## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
    
    ## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
    
    ## 显示某个消费组的消费详情(0.10.1.0版本+)
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
    

    发送和消费

    ## 生产者
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    ## 消费者
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
    
    ## 新生产者(支持0.9版本+)
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
    
    ## 新消费者(支持0.9版本+)
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
    
    ## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
    bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
    
    ## 高级点的用法
    bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10
    

    切换leader

    # kafka版本 <= 2.4
    > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
    
    # kafka新版本
    > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
    

    kafka自带压测命令

    bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
    

    kafka持续发送消息

    持续发送消息到指定的topic中,且每条发送的消息都会有响应信息:

    kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 100000
    

    zookeeper-shell.sh

    如果kafka集群的zk配置了chroot路径,那么需要加上/path。

    bin/zookeeper-shell.sh localhost:2181[/path]
    ls /brokers/ids
    get /brokers/ids/0
    

    迁移分区

    1.创建规则json

    cat > increase-replication-factor.json <<EOF
    {"version":1, "partitions":[
    {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
    }
    EOF
    

    2.执行

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
    

    3.验证

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
    
  • 相关阅读:
    USACO Milk2 区间合并
    Codeforces 490B Queue【模拟】
    HDU 3974 Assign the task 简单搜索
    HDU 5119 Happy Matt Friends(2014北京区域赛现场赛H题 裸背包DP)
    Cin、Cout 加快效率方法
    POJ 1159 回文LCS滚动数组优化
    POJ 2479 不相交最大子段和
    POJ 1458 最长公共子序列 LCS
    在阿里最深刻的,还是职场之道给我的震撼
    精细化
  • 原文地址:https://www.cnblogs.com/liping0826/p/15184975.html
Copyright © 2011-2022 走看看