zoukankan      html  css  js  c++  java
  • springBoot + rabbitMQ +手动确认消息 + 控制(接口、定时任务)消费者上下线

    这里只贴消费者的部分代码

    第一部分:手动ack配置

    package com.mybatis.plus.config.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     *
     * 描述: rabbitMQ配置
     *
     * @author 官昌洪
     * @date 2021/12/17 11:24
     * @version V1.0
     */
    @Configuration
    public class MessageListenerConfig {
    
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    
    }

    第二部分:消费消息

    package com.mybatis.plus.config.mq;
    
    import com.alibaba.fastjson.JSONObject;
    import com.mybatis.plus.entity.Log;
    import com.mybatis.plus.utils.EurekaUtils;
    import com.mybatis.plus.utils.hash.ConsistentHash;
    import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    import java.util.Map;
    
    @Slf4j
    @Component
    public class Receiver {
    
        @Value("${server.port}")
        private String port;
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RabbitListener(id = "testDirectQueueId1", autoStartup = "false", queues = "testDirectQueue")
        public void consumer(Message message, Channel channel) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                Thread.sleep(500);
    
                if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) {
                    String msg = new String(message.getBody(), "UTF-8");
                    Log parseObject = JSONObject.parseObject(msg, Log.class);
                    log.info("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                    log.info("消息成功消费到  messageId:" + parseObject.getLogUuid() + "  messageData:" + parseObject.getLogTitle() + "  createTime:" + parseObject.getCreateTime());
                    log.info("================================");
                    // 收到来自主机的消息 进行一致性hash分配 发往不同的服务
                    // 获取服务节点 创建一致hash环
                    ConsistentHash consistentHash = InitConfig.consistentHash;
                    List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2");
                    if (!allServiceAddr.isEmpty()) {
                        for (Map<String, String> stringMap : allServiceAddr) {
                            String instanceId = stringMap.get("routeKey");
                            // 新增1个物理节点和150个对应的虚拟节点
    //                        String instanceId = stringMap.get("queueKey");
                            // 如果hash环中没有该节点 才新增
                            ConsistentHashNode node = consistentHash.getAccurateNode(instanceId);
                            if (null == node) {
                                consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId), instanceId), 150);
                            }
                        }
                    } else {
                        //没有服务提供者 将消息返回队列
                        channel.basicReject(deliveryTag, true);
                        return;
                    }
    
                    channel.basicAck(deliveryTag, false); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                    // 提取消息中的某个代表来源主机的标识 然后在hash环上分配目标节点
                    String logUuid = parseObject.getLogUuid();
                    ConsistentHashNode node = consistentHash.getNode(logUuid);
                    log.info("主机标识:{},分配节点:{}", logUuid, node.getTarget());
                    //向指定路由发送消息
                    // todo 问题 这里怎么保证队列预先创建初始化好 解决方案 先从配置文件获取队列名称 新增服务时 需要重启服务
                    rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg);
    //                planTwo(parseObject);
                    log.info(">>>>>>>>>>>>消费消息成功!");
                }
            } catch (Exception e) {
                log.info(">>>>>>>>>>>>消费消息失败!失败消息ID:{}, 失败原因:{}", deliveryTag, e.getMessage());
                channel.basicReject(deliveryTag, true);
            }
        }
    }

    第三部分:控制消费者开启,关闭

    @Autowired
        private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    
        @RequestMapping("/startCustomer")
        public R startCustomer(){
            MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1");
            consumer.start();
            return R.ok();
        }
    
        @RequestMapping("/stopCustomer")
        public R stopCustomer(){
            MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1");
            consumer.stop();
            return R.ok();
        }

    主要还是指定 RabbitListener 注解的ID属性进行控制

    ⎛⎝官萧何⎠⎞一只快乐的爪哇程序猿;邮箱:1570608034@qq.com
  • 相关阅读:
    (1)java设计模式之简单工厂模式
    QuartZ Cron表达式在java定时框架中的应用
    java.lang.OutOfMemoryError:GC overhead limit exceeded填坑心得
    https实现安全传输的流程
    liunx上运行mybase
    liux之sed用法
    java并发之CyclicBarrier
    java并发之Semaphore
    关于ConcurrentSkipListMap的理解
    java中Iterator和ListIterator的区别与联系
  • 原文地址:https://www.cnblogs.com/guanxiaohe/p/15702728.html
Copyright © 2011-2022 走看看