zoukankan      html  css  js  c++  java
  • kafka 暂停消费

    1、代码实现

    kafkaListener

    需要指定id,例如这里是:full-part-id。

    @KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group")
    public void listenFullPart(ConsumerRecord<String, String> record) {
        Optional<String> recordOptional = Optional.fromNullable(record.value());
        if (recordOptional.isPresent()) {
            List<PartStockInfoVo> partStockInfoVos = JSONObject.parseArray(recordOptional.get(), PartStockInfoVo.class);
            esPartInfoClient.updateFullIndex(partStockInfoVos);
        }
    }

    消费开关

    @RestController
    public class KafkaManageController {
    
        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @RequestMapping("/stop")
        public void stop() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
            listenerContainer.stop();
        }
    
        @RequestMapping("/start")
        public void start() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
            listenerContainer.start();
        }
    }

    参考:

    1、How can i stop consumers from consuming?

  • 相关阅读:
    jchdl
    jchdl
    jchdl
    jchdl
    jchdl
    jchdl
    jchdl
    UVa 437 (变形的LIS) The Tower of Babylon
    UVa 1025 (动态规划) A Spy in the Metro
    UVa 10129 (并查集 + 欧拉路径) Play on Words
  • 原文地址:https://www.cnblogs.com/huangfox/p/9798446.html
Copyright © 2011-2022 走看看