zoukankan      html  css  js  c++  java
  • Spring-Kafka —— KafkaListener手动启动和停止

    一、KafkaListener消费

        /**
         * 手动提交监听.
         *
         * @param record 消息记录
         * @param ack    确认实例
         */
        @Override
        @KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning")
        public void ackListener(ConsumerRecord record, Acknowledgment ack) {
            if (LOG.isInfoEnabled()) {
                LOG.info("###################预警ackListener接收到消息###################");
            }
    
            boolean ackFlag = true;
            long beginTime = System.currentTimeMillis();
            try {
                WarningInfo warningInfo = parseConsumerRecord(record);
                if (null == warningInfo) {
                    dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()}));
                } else {
                    warningBusinessHandle.doHandle(record, warningInfo);
                }
    //      } catch (BusinessException ex) {
    //            LOG.error(record.topic() + "消费失败:" + ex.getMessage(), ex);
    //            // 业务处理失败(目前暂无此场景),把消息发送至重试主题
    //            this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage()));
            } catch (Exception e) {
                LOG.error("[" + record.topic() + "]消费发生运行时异常:" + e.getMessage(), e);
                ackFlag = false;
                consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING);
                dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()}));
            } finally {
                if (ackFlag) {
                    // 手动提交offset
                    ack.acknowledge();
                }
                LOG.info("###################预警ackListener处理完消息,耗时" + (System.currentTimeMillis()-beginTime) + "ms ###################");
            }
        }

    二、使用KafkaListenerEndpointRegistry实现启动和停止功能

    下面参数里面的listenerId值,必须是消费时@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
    package com.macaupass.kafka.consumer.service.impl;
    
    import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
    import org.springframework.stereotype.Service;
    
    /**
     * Kafka消费监听服务实现类.
     *
     * @author weixiong.cao
     * @date 2019/7/2
     */
    @Service
    public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {
    
        /**
         * LOG.
         */
        private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class);
    
        /**
         * registry.
         */
        @Autowired
        private KafkaListenerEndpointRegistry registry;
    
        /**
         * 开启监听.
         *
         * @param listenerId 监听ID
         */
        @Override
        public void startListener(String listenerId) {
            //判断监听容器是否启动,未启动则将其启动
            if (!registry.getListenerContainer(listenerId).isRunning()) {
                registry.getListenerContainer(listenerId).start();
            }
            //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
            registry.getListenerContainer(listenerId).resume();
            LOG.info(listenerId + "开启监听成功。");
        }
    
        /**
         * 停止监听.
         *
         * @param listenerId 监听ID
         */
        @Override
        public void stopListener(String listenerId) {
            registry.getListenerContainer(listenerId).stop();
            LOG.info(listenerId + "停止监听成功。");
        }
    
    }

    三、Controller

    package com.macaupass.kafka.consumer.controller;
    
    import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Kafka消费监听Controller.
     *
     * @author weixiong.cao
     * @date 2019/7/2
     */
    @Controller
    @RequestMapping(value = "/listener")
    public class KafkaConsumerListenerController {
    
        /**
         * LOG.
         */
        private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class);
    
        /**
         * 注入监听服务.
         */
        @Autowired
        private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService;
    
        /**
         * 开启监听.
         *
         * @param listenerId 监听ID
         */
        @RequestMapping("/start")
        @ResponseBody
        public Map<String, String> startListener(@RequestParam(required=false) String listenerId) {
            if (LOG.isInfoEnabled()) {
                LOG.info("开启监听...listenerId=" + listenerId);
            }
    
            Map<String, String> retMap = new HashMap<>();
            try {
                kafkaConsumerListenerService.startListener(listenerId);
                retMap.put("respCode", "0000");
                retMap.put("respMsg", "启动成功。");
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                retMap.put("respCode", "0001");
                retMap.put("respMsg", "启动失败:" + e.getMessage());
            }
            return retMap;
        }
    
        /**
         * 停止监听.
         *
         * @param listenerId 监听ID
         */
        @RequestMapping("/stop")
        @ResponseBody
        public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) {
            if (LOG.isInfoEnabled()) {
                LOG.info("停止监听...listenerId=" + listenerId);
            }
    
            Map<String, String> retMap = new HashMap<>();
            try {
                kafkaConsumerListenerService.stopListener(listenerId);
                retMap.put("respCode", "0000");
                retMap.put("respMsg", "停止成功。");
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                retMap.put("respCode", "0001");
                retMap.put("respMsg", "停止失败:" + e.getMessage());
            }
            return retMap;
        }
    
        /**
         * 访问入口.
         */
        @RequestMapping("/index")
        public String index() {
            return "kafka/listener";
        }
    
    }

    四、JSP界面

    <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
    <html>
    <head>
        <title>消费监听管理</title>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
        <link rel="stylesheet" href="../css/common.css">
        <script src="../jquery/jquery-3.3.1.min.js"></script>
        <script type="text/javascript">
            var contextPath = "<%=request.getContextPath() %>";
    
            /**
             * 开启监听.
             *
             * @param listenerId 监听ID
             */
            function startListener(listenerId) {
                ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId);
            }
    
            /**
             * 停止监听.
             *
             * @param listenerId 监听ID
             */
            function stopListener(listenerId) {
                ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId);
            }
    
            /**
             * ajax请求.
             *
             * @param url 请求url
             */
            function ajaxPostByJson(url) {
                $.ajax({
                    type: "POST",
                    url: url,
                    dataType:"json",
                    contentType : 'application/json;charset=utf-8',
                    success: function(respData){
                        alert(respData.respMsg);
                    },
                    error: function(res){
                        alert("系統異常:" + res.responseText);
                    }
                });
            }
        </script>
    </head>
    <body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4>
    <div id="main">
        <div id="head">
            <dl class="alipay_link">
                <a target="_blank" href=""><span>&nbsp;</span></a>
            </dl>
            <span class="title">Kafka消费手动管理</span>
        </div>
        <div class="cashier-nav">
        </div>
        <form name=query method=post>
            <div id="body" style="clear:left">
                <dl class="content">
                    <dd>
                        <span class="new-btn-login-sp">
                                <button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">开启【预警】消费</button>
                        </span>
                        <span class="new-btn-login-sp">
                                <button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【预警】消费</button>
                        </span>
                    </dd>
                </dl>
            </div>
        </form>
    </div>
    </body>
    </html>

    五、功能界面

  • 相关阅读:
    第四章 springboot + swagger
    第三章 springboot + jedisCluster
    第二章 第二个spring-boot程序
    mac下的一些命令
    Redis(二十一):Redis性能问题排查解决手册(转)
    TreeMap升序|降序排列和按照value进行排序
    关于java集合类TreeMap的理解(转)
    Redis(二十):Redis数据过期和淘汰策略详解(转)
    Redis(十九):Redis压力测试工具benchmark
    try、finally代码块有无return时的执行顺序
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/11181386.html
Copyright © 2011-2022 走看看