zoukankan      html  css  js  c++  java
  • 多个rabbitmq的消费问题

    最近碰到一个项目中需要使用多个rabbitmq,连接信息很好配置,配置多连接工厂就可以了,消费者的demo好像不多,做下简单记录
    1.队列信息都是需要指定AmqpAdmin 进行关联,交换机,队列,Binder

         @Autowired
        @Qualifier("customAmqpAdmin1")
        private AmqpAdmin marketAdmin;
    
        public static final String QUEUE_NAME = "market.luck.award.push.message.queue.ttl";
    
        /**
         * 消息中心延迟消费交换配置
         *
         * @return
         */
        @Bean
        CustomExchange messagePushTtlDirect() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            CustomExchange exchange = new CustomExchange(MarketQueueEnum.PUSH_MESSAGE_TTL_QUEUE.getExchange()
                    , "x-delayed-message", true, false, args);
            exchange.setAdminsThatShouldDeclare(marketAdmin);
            return exchange;
        }
        /**
         * 消息中心实际消费队列配置
         *
         * @return
         */
        @Bean(QUEUE_NAME)
        public Queue messagePushQueue() {
            Map<String, Object> arguments = new HashMap<>(4);
            Queue queue = new Queue(MarketQueueEnum.PUSH_MESSAGE_TTL_QUEUE.getQueueName()
                    , true, false, false, arguments);
            queue.setAdminsThatShouldDeclare(marketAdmin);
            return queue;
        }
    
    
        /**
         * 消息中心TTL绑定实际消息中心实际消费交换机
         *
         * @return
         */
        @Bean
        public Binding messageTtlBinding(@Qualifier(QUEUE_NAME) Queue queue) {
            Binding binding = BindingBuilder
                    .bind(queue)
                    .to(messagePushTtlDirect())
                    .with(MarketQueueEnum.PUSH_MESSAGE_TTL_QUEUE.getRouteKey()).noargs();
            binding.setAdminsThatShouldDeclare(marketAdmin);
            return binding;
        }
    2.消费监听需要指定连接工厂即可
    ~~~java
     @RabbitListener(queues = PushAddressRabbitConfig.QUEUE_NAME, containerFactory = "customListenerContainerFactory1")
        public void handPushObject(String messageContent, Channel channel, @Headers Map<String, Object> headers) throws IOException {
            log.info("market.luck.award.push.message.queue.ttl收到消息:{}", messageContent);
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            try {
                String key = MessageConstant.REDIS_PUSH_KEY_PRE + messageContent;
                String id = (String) cacheCommonService.get(key);
                if (org.apache.commons.lang3.StringUtils.isNotBlank(id)) {
                    AwardRecord awardRecord = awardRecordService.getById(messageContent);
                    //地址已经填写 则无需推送消息
                    if (!Objects.isNull(awardRecord) &&
                            org.apache.commons.lang3.StringUtils.isBlank(awardRecord.getReceiveAddress())) {
                        PushMessageRequest pushMessageRequest = new PushMessageRequest();
                        pushMessageRequest.setPushMessageContentTypeEnum(PushMessageContentTypeEnum.FILL_IN_ADDRESS);
                        pushMessageRequest.setAwardRecordId(Long.valueOf(messageContent));
                        pushMessageFacade.pushMessage(pushMessageRequest);
                    } else {
                        log.info("中奖地址已经填写 无需推送填写地址消息");
                    }
                    cacheCommonService.remove(key);
    
                }
                //false不支持批量签收
                channel.basicAck(deliveryTag, false);
                log.info("market.luck.award.push.message.queue.ttl消费完成:" + 
      LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss")));
            } catch (IOException e) {
                log.error("消费券记录的消息异常", e);
                // multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息
                //requeue:被拒绝的是否重新入队列 注意:如果设置为true ,则会添加在队列的末端
                channel.basicNack(deliveryTag, false, true);
            }
        }
  • 相关阅读:
    一些常用的库[转载]
    《三国演义》很给力演绎60条职场真理
    保证你现在和未来不失业的10种关键技【转载】
    百度面试题
    百度的一到算法i题
    FindMaxDeep
    csinglelink
    FindLongArray
    byte转hex,hex转byte
    获取异常信息
  • 原文地址:https://www.cnblogs.com/jinjian91/p/13892754.html
Copyright © 2011-2022 走看看