zoukankan      html  css  js  c++  java
  • Rabbitmq消息接收方通知发送方

    一、需求

    前两篇文章,我们分别介绍了消息发送方的确认消息接收方的消息确认,由此可知,消息的发送方只关注消息有木有到达队列,消息的接收方只关注在什么时候告诉队列这个条消息可以删除了,那么如果有那样的需求,发送方想获取消息的消费情况,例如想修改消息表中消息的状态,也就是得想一个办法,如何在消息到达接收方之后通知发送方。

    二、思路

    消息发送方在发送消息之后,监听一个返回消息队列reply,消息接收方消费完后消息再发送到队列reply,这样根据唯一的messageId原来的发送方就能获取返回的消息了。只不过这个时候发送方和接收方的角色就模糊了,原来的发送方变成了即使发送方又是接收方,原来的接收方同理。

    三、实现

    Spring为我们提供了一个@SendTo("demo.reply-to")注解,跟@RabbitListener一起使用,demo.reply-to 就是一个返回消息队列,这个队列不需要绑定交换器,要返回的消息直接return就可以。

    3.1、yml

    spring:
        rabbitmq:
            host: 192.168.31.70
            port: 5672
            username: guest
            password: guest
            # 发送确认
            publisher-confirms: true
            # 路由失败回调
            publisher-returns: true
            template:
                # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
                mandatory: true
            #消费端
            listener:
                simple:
                    # 每次从RabbitMQ获取的消息数量
                    prefetch: 1
                    default-requeue-rejected: false
                    # 每个队列启动的消费者数量
                    concurrency: 1
                    # 每个队列最大的消费者数量
                    max-concurrency: 1
                    # 签收模式为手动签收-那么需要在代码中手动ACK
                    acknowledge-mode: manual
    #消费者消费之后向生产者的反馈
    reply:
        queue:
            name: demo.reply-to
        exchange:
            name: demoReplyExchange
    sms:
        queue:
            name: demo.sms
    

    3.2、RabbitConfig

    /**
     * @author DUCHONG
     * @since 2020-09-02 21:24
     **/
    @Configuration
    public class RabbitConfig {
    
    
        @Value("${sms.queue.name}")
        private String smsQueue;
        @Value("${reply.queue.name}")
        private String replyQueue;
        @Value("${reply.exchange.name}")
        private String replyExchange;
    
    
        @Bean
        public Queue smsQueue() {
            return new Queue(smsQueue);
        }
        @Bean
        public Queue replyQueue() {
            return new Queue(replyQueue);
        }
    
        @Bean
        TopicExchange replyExchange() {
            return new TopicExchange(replyExchange);
        }
    
    
        @Bean
        Binding bindingReplyQueue() {
            return BindingBuilder.bind(smsQueue()).to(replyExchange()).with(smsQueue+".#");
        }
    
    }
    

    3.3、消息发送方

    /**
     * 生产者
     *
     * @author DUCHONG
     * @since 2020-09-02 21:32
     **/
    @RestController
    @Slf4j
    public class ReplyProviderController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Value("${reply.exchange.name}")
        private String replyExchange;
    
        @GetMapping("/sendReplyMessage")
        public void sendReplyMessage() {
    
            String msgId = UUID.randomUUID().toString().replace("-","").toUpperCase();
            JSONObject reply=new JSONObject();
            reply.put("messageId",msgId);
            reply.put("content","this is a reply demo message");
            CorrelationData correlationData=new CorrelationData(msgId);
            rabbitTemplate.convertAndSend(replyExchange,"demo.sms.x",reply.toJSONString(),correlationData);
            log.info("---provider发送消息---{}",reply);
        }
    
        /**
         * 监听demo.reply-to队列,接收consumer的反馈
         * @param message
         * @param channel
         * @param headers
         * @throws IOException
         */
        @RabbitListener(queues ="demo.reply-to")
        @RabbitHandler
        public void handleReplyMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
    
            try {
    
                String msg=new String(message.getBody(), CharEncoding.UTF_8);
                log.info("---provider接收到reply消息----{}",msg);
                //业务逻辑代码
                //.....
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
            catch (Exception e) {
                log.info("ReplyConsumerController.handleReplyMessage error",e);
            }
        }
    
    }
    

    3.4、消息接收方

    /**
     * 消费者
     *
     * @author DUCHONG
     * @since 2020-09-02 21:33
     **/
    @Component
    @Slf4j
    public class ReplyConsumerController {
    
    
        /**
         * 邮件发送 ack 之后返回消息到demo.reply-to
         * @param message
         * @param channel
         * @param headers
         * @throws IOException
         */
        @RabbitListener(queues ="demo.sms")
        @RabbitHandler
        @SendTo("demo.reply-to")
        public String handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
    
            try {
    
                String msg=new String(message.getBody(), CharEncoding.UTF_8);
                log.info("---consumer接收到消息----{}",msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                return msg;
            }
            catch (Exception e) {
                log.info("ReplyConsumerController.handleEmailMessage error",e);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    
            }
            return null;
        }
    }
    

    3.5、运行结果

    result

  • 相关阅读:
    开启和禁用Wifi热点命令
    C# IE代理操作
    Asp.net QueryString批量插入和更新
    Asp.net 插入或更改查询字符串
    C#如何判断线程池中所有的线程是否已经完成之Demo
    mysql 安装及设置密码
    c# iis回收应用程序池
    判断是否为移动端
    rtmp,m3u8 直播地址
    byte数组转换为字符串
  • 原文地址:https://www.cnblogs.com/geekdc/p/13604995.html
Copyright © 2011-2022 走看看