zoukankan      html  css  js  c++  java
  • spring boot项目12:Kafka-基础使用

    JAVA 8

    Spring Boot 2.5.3

    kafka_2.13-2.8.0

    apache-zookeeper-3.7.0

    ---

    授人以渔:

    1、Spring Boot Reference Documentation

    This document is also available as Multi-page HTML, Single page HTML and PDF.

    有PDF版本哦,下载下来!

    2、Spring for Apache Kafka

    有PDF版本哦,下载下来!

    目录

    Kafka简介

    Spring Boot操作Kafka

    1个主题1个分区1个消费者组2个消费者

    1个主题2个分区

    消息发送手动确认结果

    消息消费手动确认

    参考文档

    本文介绍Kafka的基本使用,包括 新建主题、发送消息、接收&处理消息、消息发送确认、消息消费确认 等。

    本文基于 单机版的ZooKeeper、Kafka 进行测试。

    Kafka简介

    kafka是一个分布式消息队列(MQ)中间件,支持 生产者/消费者、发布者/订阅者 模式。

    Apache软件基金会开发的一个开源流处理平台。

    依赖 Apache软件基金会 的另一个开源软件:ZooKeeper(致力于开发和维护开源服务器,实现高度可靠的分布式协调)。

    关键概念:

    broker、producer、consumer-group、consumer,

    topic、partition、replica、offset。

    ---

    还有更多关键概念,这些一起构建了 分布式的、高可靠性的、流式处理 的 Kafka。

    概念说明:

    broker:运行kafka服务器的进程;

    producer:生产者,发送消息到topic;

    consumer-group:消息消费者的组名,用于将消费者分组;

    consumer:消息消费者,归属于某个consumer-group,真正用于接收&处理消息的实体;

    一个consumer-group下只有一个 consumer,这个consumer将依次消费 主题下各个partition的消息——要是partition有多个,这个consumer会“比较累”吧;

    一个consumer-group下有多个 consumer,系统(?)会协调各个 consumer 匹配 主题下各个partition,要是数量相等——都是N,则可以实现并发处理主题下的消息——一个consumer对应一个partition;

    consumer的数量 需要和 主题下的partition数量 协调——前者大于后者是没有必要的,会存在consumer浪费的情况;

    topic:消息主题,匹配RabbitMQ中的 交换机,消息发送到主题,再由主题发送到具体的分区(partition);

    partition:主题下的分区,可以1~N个,匹配RabbitMQ中的队列,但不需要和主题绑定,创建Topic的时候就建立好,在kafka的数据文件夹下,每个partition都对应一个文件夹;consumer就是从 partition中消费消息的;

    replica:partition的副本,可以1~N个,但是,N的数量必须小于 集群中 broker的数量,replica会被均匀地分布到不同的broker中;

    offset:partition下的概念,记录partition中消息被消费到那里了,由消费者控制——消费后不更新offset、任意指定offset开始消费(比如,从头开始);和RabbitMQ不同的时,消息被消费后,不会立即被清理,故,可以消费已经被消费过的消息;(疑问:消息会一直保留吗?消息会被清理吗?什么时候清理?策略是什么?TODO

    ZooKeeper、Kafka单机启动&停止:

    单机部署
    ZooKeeper
    
    配置文件(Config file):
    conf/zoo.cfg
    
    启动(start):
    ./zkServer.sh start
    停止(stop):
    ./zkServer.sh stop
    
    Kafka
    
    配置文件:
    server.properties
    
    启动(start):
    bin/kafka-server-start.sh config/server.properties
    停止(stop):
    bin/kafka-server-stop.sh
    

    注,启动前需要修改好配置文件。

    注,上面的Kafka启动后,会占用一个Shell,可以使用 nohup CMD & 实现后台启动。

    Kafka启动后,可以使用 bin 目录下的 kafka-topics.sh 管理主题。

    kafka-topics.sh操作
    # help命令
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --help
    ...省略...
    
    # 展示所有Topic
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
    __consumer_offsets
    topic01
    topic02
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    
    # 创建主题 1个分区、1个副本——成功
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --partitions 1 --replication-factor 1 --create
    Created topic topic082901.
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
    __consumer_offsets
    topic01
    topic02
    topic082901
    
    # 创建主题 2个分区、1个副本——成功
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --partitions 2 --replication-factor 1 --create
    Created topic topic082902.
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
    __consumer_offsets
    topic01
    topic02
    topic082901
    topic082902
    
    # 创建主题 1个分区、2个副本——失败!原因是 只有一个 broker——单机版,2个副本没有意义,都在一个服务器上
    # 多副本需要 集群环境才可以演示
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082903 --partitions 1 --replication-factor 2 --create
    Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
    [2021-08-29 14:26:18,293] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
     (kafka.admin.TopicCommand$)
    # 2个副本的主题没有创建成功
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
    __consumer_offsets
    topic01
    topic02
    topic082901
    topic082902
    
    # 展示 主题的信息
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --describe
    Topic: topic082901      TopicId: xKGttQHbSy-Ywc9Mo4e_2w PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: topic082901      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --describe
    Topic: topic082902      TopicId: cEEZFAgCSN2PDkiAArGUTQ PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: topic082902      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
            Topic: topic082902      Partition: 1    Leader: 1       Replicas: 1     Isr: 1
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    
    # 检查kafka服务器的数据目录
    # 多了 3个文件夹,分别对应着各个主题的 分区
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ls -l ~/kafka/logs-1/ | grep topic
    drwxrwxr-x 2 ben ben 4096 8月  29 11:45 topic01-0
    drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-0
    drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-1
    drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082901-0
    drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-0
    drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-1
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    
    # 删除主题
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082902 --delete
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic082901 --delete
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
    __consumer_offsets
    topic01
    topic02
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$
    # 删除后查看数据目录:多了3个 末尾是 delete的目录,等待若干分钟,这些文件夹会被清理掉
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ls -l ~/kafka/logs-1/ | grep topic
    drwxrwxr-x 2 ben ben 4096 8月  29 11:45 topic01-0
    drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-0
    drwxrwxr-x 2 ben ben 4096 8月  29 11:38 topic02-1
    drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082901-0.be9a1e49b81a4b9a8421e52e45281f77-delete
    drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-0.70d6391753e24886a9dcb048db4d5811-delete
    drwxrwxr-x 2 ben ben 4096 8月  29 14:25 topic082902-1.b335d0068eba494d8576937ecb7f9bb6-delete
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$

    Kafka基于ZooKeeper,因此,在Kafka使用过程中,ZooKeeper中的一些节点也会发生变化。

    下面是 使用 ZooKeeper 的 zkCli.sh 查看到的一些信息。

    zkCli.sh部分操作
    ben@ben-VirtualBox:~/apache-zookeeper-3.7.0-bin/bin$ ./zkCli.sh
    /usr/bin/java
    Connecting to localhost:2181
    ...省略...
    
    # 使用 ZooKeeper的ls命令!
    [zk: localhost:2181(CONNECTED) 0] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
    [zk: localhost:2181(CONNECTED) 1]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers
    [ids, seqid, topics]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
    [1]
    [zk: localhost:2181(CONNECTED) 3] ls /brokers/seqid
    []
    [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
    [__consumer_offsets, topic01, topic02]
    [zk: localhost:2181(CONNECTED) 5]
    [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/__consumer_offsets
    [partitions]
    [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions
    [0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 3, 
    30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 4, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 5, 6, 7, 
    8, 9]
    [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/0
    [state]
    [zk: localhost:2181(CONNECTED) 8] ls /brokers/topics/__consumer_offsets/partitions/0/state
    []
    [zk: localhost:2181(CONNECTED) 9]
    [zk: localhost:2181(CONNECTED) 9] ls /brokers/topics/topics/topic01
    Node does not exist: /brokers/topics/topics/topic01
    [zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/topic01
    [partitions]
    [zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/topic01/partitions
    [0]
    [zk: localhost:2181(CONNECTED) 12] ls /brokers/topics/topic01/partitions/0
    [state]
    [zk: localhost:2181(CONNECTED) 13] ls /brokers/topics/topic01/partitions/0/state
    []
    [zk: localhost:2181(CONNECTED) 14]
    
    # help可以看到所有命令
    [zk: localhost:2181(CONNECTED) 14] help
    ZooKeeper -server host:port -client-configuration properties-file cmd args
    ...生路...

    关于Kafka在ZooKeeper上建立了哪些节点?各个节点的意义,需要另文介绍。

    Spring Boot操作Kafka

    在自己7月份的一篇文章中有过介绍,本文再深入一些。

    依赖包:

    Kafka的依赖包
    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka-test</artifactId>
    			<scope>test</scope>
    		</dependency>

    spring-kafka包:

    Spring容器中和 Kafka有关的一些Bean:

    其中的 kafkaTemplate 的 多个 send函数 用来发送消息到主题 或 其下的分区:

    # 部分send函数签名
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
    			@Nullable V data)
    
    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record)
    
    public ListenableFuture<SendResult<K, V>> send(Message<?> message)

    除了send,还有sendDefault函数,发送消息到默认分区,默认分区是什么?可以用 KafkaTemplate 的 getDefaultTopic() 获取。

    另外,还有一些 setXXX函数,用来设置 kafkaTemplate 对象。

    配置application.properties:

    # Kafka
    # mylinux 虚拟机地址:配置hosts文件
    spring.kafka.bootstrap-servers=mylinux:9092
    # consumer:全局
    # 也可以在 @KafkaListener 中指定 单独的 groupId
    #spring.kafka.consumer.group-id=myGroup

    Java文件:

    接口 /try3/send 发送消息,1个@KafkaListener——消费者(注意,指定 groupId=tp03,所以,上面的配置文件中的  spring.kafka.consumer.group-id 可以不配置,否则启动不了应用)。

    try3-源码1
    # Try3Config.java
    @Component
    @Slf4j
    public class Try3Config {
    
    	public static final String TOPIC_03 = "topic03";
    	
    	public static final String TOPIC_03_KEY = "topic03_key";
    
    	/**
    	 * 监听器1
    	 * @author ben
    	 * @date 2021-08-29 16:30:46 CST
    	 * @param record
    	 */
    	@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
    	public void listen01(ConsumerRecord<?, ?> record) {
    //		log.info("try3-消费-a:record={}", record);
    		log.info("try3-消费-a:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.value());
    	}
    	
    }
    
    # Try3Controller.java
    @RestController
    @RequestMapping(value="/try3")
    @Slf4j
    public class Try3Controller {
    
    	@Autowired
    	private KafkaTemplate<String, Object> kafkaTemplate;
    	
    	/**
    	 * 调用接口发送消息
    	 * @author ben
    	 * @date 2021-08-29 16:12:07 CST
    	 * @return
    	 */
    	@GetMapping(value="/send")
    	public Boolean sendMsg() {
    		IntStream.rangeClosed(0, 9).forEach(i->{
    			log.info("sendMsg-{}", i);
    			String msg = String.format("Try3Controller send msg-%d @%s", i, new Date());
    			
    			kafkaTemplate.send(Try3Config.TOPIC_03, Try3Config.TOPIC_03_KEY, "key-" + msg);
    		});
    		
    		return true;
    	}
    	
    }

    启动应用,Kafka上 自动出现了 topic03:

    检查K服务器
    ben@ben-VirtualBox:~/kafka_2.13-2.8.0/bin$ ./kafka-topics.sh --bootstrap-server mylinux:9092 --list
    __consumer_offsets
    topic01
    topic02
    topic03
    $ ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic03 --describe
    Topic: topic03  TopicId: km2zL1MeTKiOWlhCPMx6gw PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
            Topic: topic03  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    

    应用启动信息(部分):

    调用/try3/send 接口,发送消息:发送、接收成功。

    注意,offset的变化。

    注意,发送第一条消息时,producer还有一个初始化工作,会输出更多producer相关日志。再次发送时,就没有了。(是否需要 预热?)

    1个主题1个分区1个消费者组2个消费者

    上一章试验了 1个主题1个分区1个消费者组1个消费者 的消息发送接收,本章增加1个消费者——同一个消费者组下。

    Java源码:2个@KafkaListener,都在 tp03 消费者组 下

    	@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
    	public void listen01(ConsumerRecord<?, ?> record) {
    		log.info("try3-消费-a:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.value());
    	}
    
    	@KafkaListener(topics = {TOPIC_03}, groupId="tp03")
    	public void listen02(ConsumerRecord<?, ?> record) {
    		log.info("try3-消费-b:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.value());
    	}

    启动时,输出了更多的日志,并提示发生:

    # INFO 日志,不是错误
    org.apache.kafka.common.errors.RebalanceInProgressException: The group is rebalancing, so a rejoin is needed.

    Consumer clientId 出现了两个:consumer-tp03-1、consumer-tp03-2。

    并显示了 相关日志:

    最终,consumer-tp03-1 生效了,用来消费 唯一分区的消息。

    这就是 消费者数量 大于 主题的分区数量时的情况。(注意,这两个 监听器 都在同一个应用,要是在不同应用呢?稍后试验)

    调用接口发送消息:

    进一步试验1:

    使用 另一个端口 启动应用,实现 主题的 消费者 再更加2个,看看会发生什么情况。

    执行结果:

    新启动应用的消费者 有加入消费者组,但是,没有分区分配,故,也不会收到消息处理(调用 发送消息 接口后,新应用没有收到)。

    检查kafka上 消费者组tp03 的信息:

    $ ./kafka-consumer-groups.sh --bootstrap-server mylinux:9092 --describe --group tp03
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
    tp03            topic03         0          40              40              0               consumer-tp03-1-0705883f-92ab-47f0-82c3-d7334ad81b4f /192.168.0.112  consumer-tp03-1

    进一步试验2:

    关闭处于消费状态的应用,检查 新启动的应用是否能 作为备用消费者 正确进行消费。

    消费者组 下发生了 rebalancing

    此时,调用新应用的接口发送消息(旧端口的应用 被关闭了哦),发送成功,,新端口的应用也收到并正确处理了消息。

    消费者组tp03 的信息 也发生了变化:CONSUMER-ID 不同了!

    $ ./kafka-consumer-groups.sh --bootstrap-server mylinux:9092 --describe --group tp03
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
    tp03            topic03         0          50              50              0               consumer-tp03-1-a682c694-8018-43a9-8a1e-02a7d5ece6a5 /192.168.0.112  consumer-tp03-1

    小结:

    同一个组中,消费者数量 要小于 主题的分区数,多了的消费者也不会得到分区并发生消费。

    但是,属于不同进程的 消费者 可以实现备用。

    1个主题2个分区

    前面应用启动后,生成了主题,但这个主题只有1个分区。

    下面使用 Kafka命令生成 2个分区 的主题:

     ./kafka-topics.sh --bootstrap-server mylinux:9092 --topic topic02 --partitions 2 --replication-factor 1 --create

    疑问:使用Java程序是否可以生成分区呢?发现一个名为 org.apache.kafka.clients.admin.NewTopic 的类,或许可以实现,后面试验下。

    1组1消(1个消费者组、1个消费者)

    1、发送时 无key

    kafkaTemplate.send(Try2Config.TOPIC_02, "no key-" + msg);

    启动应用:一个监听器 处理 2个分区的数据

    调用接口 /try2/send,结果:

    第一次发送,全部到了 分区0,收到并处理:

    还以为 这种模式下 只会处理一个分区的数据:无key,只转发到某个分区。

    那么,再次测试几次发送

    两个分区都有数据来了!

    疑问

    主题的消息 在 无key的情况下,是按照什么 策略 转发到不同分区的呢?TODO

    2、发送时有1个固定key

    kafkaTemplate.send(Try2Config.TOPIC_02, Try2Config.TOPIC_02_KEY, "fixed key-" + msg);

    试验结果:

    和上面的不同!监听器 只处理了来自 分区0 的消息,连续发送了多次都是如此。

    3、key为 i%4,发送20条消息(前面是10条)

    // 3.发送消息条数改为 20条,key=i%4
    // 注意,第二个参数是 String类型!!
    kafkaTemplate.send(Try2Config.TOPIC_02, "" + i%4, "i%4 key-" + msg);

    试验结果:

    发送了20条消息,分别均匀地转发到了 两个分区,监听器也处理了所有20消息。

    不过,20条消息 没有按照 发送顺序处理,但是,同一个分区内 是 按照发送顺序处理了,0、2、4、..……,1、3、5……

    4、指定分区编号(从0开始,本试验只有0、1)发送

    // 4、指定分区发送
    // 第三个参数为 key = "" + i%3
    kafkaTemplate.send(Try2Config.TOPIC_02, i%Try2Config.TOPIC_02_P_NUMBER, "" + i%3, "fixed partition key-" + msg);

    试验结果:

    消息按照 指定的分区号 发送到了 不同的分区,消费者也对不同分区的消息进行了消费。

    确定性地分配到不同分区的方式,而不像 按照key 转发时,存在不确定性。

    发送时指定分区号,这需要提前知道 主题有多少分区,否则,指定的分区号大于分区数会怎样呢?程序卡住,最后超时发生异常

    // 错误:指定的 分区号 大于 分区数!
    kafkaTemplate.send(Try2Config.TOPIC_02, i%3, "" + i%3, "fixed partition key-" + msg);

    如下所示:

    Exception thrown when sending a message with key='2' and payload='fixed partition key-...' to topic topic02 and partition 2:
    org.apache.kafka.common.errors.TimeoutException: Topic topic02 not present in metadata after 60000 ms.

    疑问

    这个发送超时时间怎么 更改 呢?Spring boot、kafka的文档中、KafkaTemplate、DefaultKafkaProducerFactory 中都没有找到 发送超时 的设置方法。TODO

    注,还有一个普通的 send函数——指定了 timestamp,就不做试验了。

    小结:

    通过上面的试验,知道了 不同方式发送的消息 会怎么分配到不同的分区了。

    send(String topic, @Nullable V data) key=null 不均匀地发送到 各个分区
    send(String topic, K key, @Nullable V data) key=固定值 只会发送到一个分区
    send(String topic, K key, @Nullable V data) key=随机值

    会均匀地发送到各个分区,

    当然,随机值的数量要多于分区数量

    send(String topic, Integer partition, K key, @Nullable V data) 指定分区数

    当然是发送到指定分区。

    异常:

    指定分区号 大于 分区数,会超时、异常,发送消息失败。

    send(String topic, Integer partition, Long timestamp, K key,
                @Nullable V data)
    指定分区数

    同上一个(本文未验证)。

    这里的 timestamp 应该有其它用处,比如,消息去重等

    1组2消

    发送方式采用:不指定分区号,随机key——大于等于分区数2,,确保两个分区都有消息(均衡)。来自博客园

    每个消费者都分配给了一个分区。

    发送消息:

    kafkaTemplate.send(Try2Config.TOPIC_02, "" + i%4, "i%4 key-" + msg);
    1组2消代码
    @Component
    @Slf4j
    public class Try2Config {
    
    	/**
    	 * 主题02:topic02,拥有2个分区
    	 */
    	public static final String TOPIC_02 = "topic02";
    	
    	/**
    	 * 主题topic02分区数:2
    	 */
    	public static final int TOPIC_02_P_NUMBER = 2;
    	
    	/**
    	 * 主题02-KEY:topic02_key
    	 */
    	public static final String TOPIC_02_KEY = "topic02_key";
    	
    	public static final String TRY2_GROUP_ID_01 = "try2group01";
    	public static final String TRY2_GROUP_ID_02 = "try2group02";
    
    	@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    	public void listenTopic1(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-A:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}
    	
    	@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    	public void listenTopic2(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-B:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}
    	
    }

    试验结果:两个消费者,分别处理了 指定分区的消息。来自博客园

    1组3消

    增加一个监听器:

    	// 消费者3:超过分区数量了
    	@KafkaListener(topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    	public void listenTopic3(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-C:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}

    启动应用:来自博客园

    发送消息,试验结果:

    两个分区的消息都得到了处理。

    但是,try2group01-1、try2group01-2 指定了分区,为何 消费的是 try2-消费-C 呢?难道 这个 try2group01-N 和 日志里面的 A、B、C不匹配

    给 @KafkaListener 增加 id属性:

    @KafkaListener(id="listenerA", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    @KafkaListener(id="listenerB", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    @KafkaListener(id="listenerC", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)

    再次测试:

    果然,listenerA 对应的是 consumer-try2group01-3!

    2组2X2消

    两个消费者组,各有两个消费者:来自博客园

    2组2消监听者
    	// groupId = TRY2_GROUP_ID_01
    	// 消费者1
    	@KafkaListener(id="listenerA", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    	public void listenTopic1(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-A:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}
    	
    	// groupId = TRY2_GROUP_ID_01
    	// 消费者2
    	@KafkaListener(id="listenerB", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_01)
    	public void listenTopic2(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-B:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}
    	
    	// ---------------------
    	
    	// groupId = TRY2_GROUP_ID_02
    	// 消费者4
    	@KafkaListener(id="g2listenerH", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_02)
    	public void listenTopic4(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-H:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}
    	
    	// groupId = TRY2_GROUP_ID_02
    	// 消费者5
    	@KafkaListener(id="g2listenerJ", topics = {TOPIC_02}, groupId = TRY2_GROUP_ID_02)
    	public void listenTopic5(ConsumerRecord<?, ?> record) {
    		log.info("try2-消费-I:topic={}, partition={}, offset={}, key=[{}], timestamp={}, value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.timestamp(), record.value());
    	}

    启动应用:ListenerA、B、H、J 都分别指派给了 主题的两个分区。

    注意:TRY2_GROUP_ID_02 的两个消费者的ID 是以 g2 开头的,但是,日志里面的线程名中 却没有 g2!TODO

    发送消息,消费结果:

    小结:

    两个消费者组 都消费了 所有消息。来自博客园

    注意:上面的 @KafkaListener 指定消费的 topic 使用的是 主题全名,其另一个属性 topicPattern 应该是可以用来 根据 某种模式来配置 监听的主题的

    源码里面:the topic pattern or expression (SpEL)TODO

    SpELl:

    SpEL(Spring Expression Language),即Spring表达式语言,是比JSP的EL更强大的一种表达式语言。来自博客园

    不甚了解,后续再DIG。

    注意:@KafkaListener 的 topicPartitions属性 的使用!TODO

    注意:同一个应用中,@KafkaListener 的 id不能重复,否则,启动异常。

    注意:@KafkaListener 除了上面的用在方法上,还可用在类上——@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE }),未用过

    注意:除了 @KafkaListener ,还有一个包含它的 @KafkaListeners,未用过

    消息发送手动确认结果

    主题:topic01,1个分区

    消费者:1个

    调用send函数,发送了消息,但是,消息是否发送成功了呢?网络、服务器故障等,都可能导致消息丢失,而 发送方却没有对 发送是否成功进行检查。来自博客园

    send函数时有返回值的:

    ListenableFuture<SendResult<K, V>>
    
    // 继承了 Future接口
    public interface ListenableFuture<T> extends Future<T> {
    }

    Future接口 的 函数 get() 可以获取 返回消息 SendResult对象,然后,进行判断(同步方式)。

    			ListenableFuture<SendResult<String, Object>> sendRetFuture = kafkaTemplate.send(Try1Config.TOPIC_01, msg);
    			
    			// 1、发送同步确认
    			// 发生异常时,不终止,继续执行下一个发送
    			// 发送消息确认:发送失败时怎么处理?停止继续发送、延迟后再发送?
    			boolean sendErr = false;
    			try {
    				SendResult<String, Object> sendRet  = sendRetFuture.get();
    				RecordMetadata rmd = sendRet.getRecordMetadata();
    				if (rmd != null) {
    					log.info("发送成功:rmd-topic={},partition={},offset={},ts={}", 
    							rmd.topic(), rmd.partition(), rmd.offset(), rmd.timestamp());
    				} else {
    					log.error("发送失败:topic={}, msg={}", Try1Config.TOPIC_01, msg);
    				}
    			} catch (InterruptedException | ExecutionException e) {
    				log.error("发生异常:发送失败,e={}", e);
    				sendErr = true;
    			}
    			if (sendErr) {
    				// 发生异常,休眠30秒
    				// 发送时,断掉服务器的网络,或停掉kafka,这里的30秒就有操作空间了
    				try {
    					TimeUnit.SECONDS.sleep(30);
    				} catch (InterruptedException e) {
    					log.error("发生异常:sleep-30secs, e={}", e);
    				}
    			}
    			
    			// 2、发送异步确认
    //			sendRetFuture.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {

    发送成功时,可以通过 get到的 SendResult对象 获取 消息在服务器上的信息:

    发送失败时,get到的 SendResult对象 为null。

    注,

    应用启动后,把kafka关掉,想测试 发送失败 的,但是,应用一直打印服务器连接的错误日志,没有测试出 发送失败 的情况,并 捕获异常。来自博客园

    那么,怎么测试发送失败的情况呢?TODO

    除了上面的同步方式,还有一种异步确认方式

    send函数的返回值 ListenableFuture 可以添加回调函数。

    			ListenableFuture<SendResult<String, Object>> sendRetFuture = kafkaTemplate.send(Try1Config.TOPIC_01, msg);
    			
    			// 1、发送同步确认
    			// ...省略...
    			
    			// 2、发送异步确认
    			sendRetFuture.addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {
    
    				@Override
    				public void onSuccess(SendResult<String,Object> result) {
    					RecordMetadata rmd = result.getRecordMetadata();
    					log.info("发送成功-回调:rmd-topic={},partition={},offset={},ts={}", 
    							rmd.topic(), rmd.partition(), rmd.offset(), rmd.timestamp());
    				}
    
    				@Override
    				public void onFailure(Throwable ex) {
    					log.info("发送失败-回调:ex={}", ex);
    				}
    			});

    结果:

    消息消费手动确认

    主题:topic01,1个分区

    消费者:1个

    前面的试验,消费消息后都自动确认了,offset也在逐个增加。来自博客园

    怎么实现手动确认?

    全局配置:

    ## 全局 手动确认
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.listener.ack-mode=MANUAL

    确认消息,需要参数 Acknowledgment ack,消费者示例如下:来自博客园

    	@KafkaListener(id="listenerA",topics = {TOPIC_01}, groupId = TOPIC_01_G01)
    	public void listener01(ConsumerRecord<?, ?> record, Acknowledgment ack) {
    		log.info("try1-消费-A:topic={}, partition={}, offset={}, key=[{}], value=[{}]",
    				record.topic(), record.partition(), record.offset(), record.key(), record.value());
    		
    		// 消息确认
    		// 配置手动确认后,若不执行下面的语句,启动后还会消费
    		ack.acknowledge();
    	}

    发送消息,消费消息。执行了 ack.acknowledge() 后,消息被消费&确认了。

    意外情况:

    配置了全局消费者手动确认消息,但是,却没有执行 消息确认,此时,分区的offset是不会改变的,消费没有确认被消费成功。

    再次启动应用,消息会被再次消费

    要是已消费了100万条,但是,没有执行确认,下次启动应用时,这100万条要被重复消费,属于 异常

    注释掉上面的 ack.acknowledge() 可以进行验证。

    org.springframework.kafka.support.Acknowledgment 是一个接口,除了 acknowledge() 函数,还有nack(...)——拒绝确认消费:来自博客园

    上面是 全局消费手动确认 配置,那么,单独配置一个消费者组、一个消费者手动确认要怎么做呢?

    》》》全文完《《《 

    后记:

    花费了太多精时了。

    对kafka也了解的更深入了。

    可是,还有更多细节需要晓得的。

    要知道,本文还没有涉及 KafkaTemplate的定制、Factory的定制,即便是 @KafkaListener 注解也并非全都清楚。

    况且,只是单机版的kafka,集群版的会有什么特别的“坑”呢?

    还有,Kafka架构、原理、动态扩容(增加分区、减少分区),还曾看过一篇大厂迁移kafka消息到新的系统的。

    对了,发送消息时,可以 主动重复消费的。来自博客园

    ……

    先这样吧,技术毕竟是一点一滴积累起来的嘛,

    一天肯定是不行的,搞技术需要日积月累的努力。

    参考文档

    1、Kafka分布式消息系统

    2、kafka生产者消息确认机制和发送方式

    3、Kafka消息保留-清理策略

    4、

  • 相关阅读:
    Sql ----- sqlserver 中的if 判断 case... when
    Bootstrap ---------
    js:定时弹出图片(获取属性、setInterval函数)
    js:轮播图(获取属性、setInterval函数)
    js:表单校验(获取元素、事件)
    js:获取元素的值(id、标签、html5新增、特殊元素的获取)
    js:流程控制(分支结构、顺序结构、循环结构)
    拦截器原理(AOP、责任链模式、拦截器的创建、自定义拦截器)
    Action类
    Struts2配置文件(动态方法调用)
  • 原文地址:https://www.cnblogs.com/luo630/p/15192341.html
Copyright © 2011-2022 走看看