场景:
kafka先批量拉取消息,完了将拉下来的消息逐条消费,假如此次共拉取40条消息,但在处理第31条时,线程被停掉,就会导致已消费消息offet不会被提交,接下来已经被消费的30条记录还会被重复消费,这就是kafka重复消费的另一场景;
解决思路:
解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+"_"+topic+"_"+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;
代码如下:
1 private void handleRecord(String recordValue,long offset,int partition,String topic) { 2 3 StringBuilder redisKey = new StringBuilder(); 4 redisKey.append(consumergroup).append("_").append(topic).append("_").append(partition).append("_").append(offset); 5 logger.info("redisKey:{}",redisKey.toString()); 6 String consumeDate = (String) redisCacheUtil.get(redisKey.toString()); 7 if (StringUtils.isNotEmpty(consumeDate)) { 8 logger.info("重复消费,跳过!redisKey:{},recordValue:{},消费时间:{}",redisKey,recordValue,consumeDate); 9 return; 10 } 11 redisCacheUtil.set(redisKey.toString(),DateUtils.dateToString()); 12 //设置过期时间,要稍大于kafka消息保存时间 13 redisCacheUtil.expire(redisKey.toString(),ComConstant.REDIS_EXPIRE_TIME_8_DAYS); 14 15 //核心业务逻辑 16 } 17 18 private void consumeMessage() { 19 logger.debug("consumeMessage() -- BEGEN"); 20 ConsumerRecords<String, String> records = consumer.poll(10000); 21 22 logger.info("获取未消费记录数:{}", records.count()); 23 for (ConsumerRecord<String, String> record : records) { 24 logger.info("消费消息:{}", record); 25 try { 26 handleRecord(record.value(),record.offset(),record.partition(),record.topic()); 27 }catch (Exception e){ 28 LoggerUtils.error(logger, e, "kafka处理消息失败!"); 29 //失败时的处理逻辑 30 } 31 } 32 if (records.count() > 0) { 33 logger.info("更新offset"); 34 consumer.commitSync(); 35 } 36 logger.debug("consumeMessage() -- END"); 37 }