zoukankan      html  css  js  c++  java
  • 【rabbitmq】之业务封装

    1、【rabbitmq】之Exchange

    2、【rabbitmq】之消费端手动ack

    3、【rabbitmq】之confirm和return机制

    4、【rabbitmq】之过期和死信队列

    在前几篇文章中简单介绍了rabbitmq的基本使用,大家应该发现比如,发送端发送消息,消费端手动ack,mq消费异常处理等业务逻辑都是耦合在代码里,这样做的弊端,1、是存在大量重复代码,2、无法统一管理。本文主要介绍了,如何进行封装rabbitmq业务代码。

    发送端:

    MqDto公共消息类

    @Data
    @ToString
    public class MqDto<T> implements Serializable {
    
        /**
         * 消息ID,用于去重,全局唯一,为空则系统自动创建
         */
        private String messageId;
    
        /**
         * 消息体
         */
        private T body;
    
        /**
         * 交换机
         */
        private String exchange;
    
        /**
         * 路由KEY,广播模式可以不传路由KEY
         */
        private String routingKey="";
    
        /**
         * 消息业务
         */
        private String busChannel;
    
        /**
         * 过期,单位毫秒
         */
        private Long expire;
    
    
        public void setMessageId(String messageId) {
            if (StringUtils.isBlank(messageId)) {
                this.messageId = RandomStringUtils.random(10);
            } else {
                this.messageId = messageId;
            }
        }
    }

    发送公共类

    @Service
    @Slf4j
    public class RabbitMQService {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
    
        /**
         * rabbitmq 发送核心方法
         *
         * @param mqDto
         */
        public void send(MqDto mqDto) {
            rabbitTemplate.convertAndSend(mqDto.getExchange(), mqDto.getRoutingKey(), mqDto, message -> {
                message.getMessageProperties().setCorrelationId(mqDto.getMessageId());
                message.getMessageProperties().setMessageId(mqDto.getMessageId());
                return message;
            });
        }
    
    
        /**
         * rabbitmq 发送TTL消息
         *
         * @param mqDto
         */
        public void sendTTl(MqDto mqDto) {
            rabbitTemplate.convertAndSend(mqDto.getExchange(), mqDto.getRoutingKey(), mqDto, message -> {
                message.getMessageProperties().setExpiration(mqDto.getExpire() + "");
                message.getMessageProperties().setCorrelationId(mqDto.getMessageId());
                message.getMessageProperties().setMessageId(mqDto.getMessageId());
                return message;
            });
        }
    
    
    }

    发送创建订单消息

    @RestController
    @RequestMapping("mq")
    @Slf4j
    public class MqController {
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        @Autowired
        private RabbitMQService rabbitMQService;
    
    
        /**
         * 创建订单
         * @return
         */
        @GetMapping("createOrder")
        public String createOrder(String title){
            OrderDto orderDto=new OrderDto();
            orderDto.setOrderNo(RandomStringUtils.randomAlphanumeric(10));
            orderDto.setTitle(title);
            orderDto.setBody("黑色的小米手机");
            MqDto<OrderDto> mqDto=new MqDto<>();
            mqDto.setBusChannel("order");
            mqDto.setExchange(RabbitMQExchangeConfig.TOPIC_EXCHANGE);
            mqDto.setRoutingKey("order.ttl");
            mqDto.setExpire(10000L);
            mqDto.setMessageId(orderDto.getOrderNo());
            mqDto.setBody(orderDto);
            rabbitMQService.sendTTl(mqDto);
            log.info("send ok");
            return "ok";
        }
    }

    消费端:

    消费端公共抽象类

    @Slf4j
    public abstract class DefaultConsumer<T> {
    
        /**
         * 获取rabbitmq配置
         *
         * @return
         */
        public abstract RabbitProperties getRabbitProperties();
    
        /**
         * 业务执行
         *
         * @param t
         */
        public abstract void execute(T t) throws Exception;
    
    
        /**
         * 业务执行失败,处理策略
         */
        public void executeFail(Channel channel, Message message, Exception e) throws Exception {
            //获取当前重试的参数
            RetryContext context = RetrySynchronizationManager.getContext();
            //如果有异常,且没有达到最大投递次数,失败执行策略
            if ((getRabbitProperties().getListener().getSimple().getRetry().getMaxAttempts() - 1 == context.getRetryCount())) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                throw e;
            }
        }
    
        public void run(Channel channel, Message message, T t) throws Exception {
            try {
                execute(t);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                log.error("consumer fail:{}", message.toString());
                executeFail(channel, message, e);
            }
        }
    
    }

    订单过期逻辑处理

    @Component
    @Slf4j
    public class OrderExpireConsumer extends DefaultConsumer<MqDto<OrderDto>> {
    
        @Autowired
        RabbitProperties rabbitProperties;
    
        @Override
        public RabbitProperties getRabbitProperties() {
            return rabbitProperties;
        }
    
        /**
         * 具体的业务逻辑
         * @param orderDtoMqDto
         */
        @Override
        public void execute(MqDto<OrderDto> orderDtoMqDto) {
            //模拟订单过期消费
            log.info("OrderExpireConsumer,orderDto:{}", orderDtoMqDto.toString());
        }
    }

    监听order死信队列

    @Component
    @Slf4j
    public class RabbitMQListener {
    
        @Autowired
        private OrderExpireConsumer orderExpireConsumer;
    
        /**
         * 监听订单死信队列
         *
         * @param channel
         * @param mqDto
         * @param message
         * @throws Exception
         */
        @RabbitListener(queues = {"#{dlxTtlQueue.name}"})
        public void orderDlxTtl(@Header(AmqpHeaders.CHANNEL) Channel channel, MqDto<OrderDto> mqDto, Message message) throws Exception {
            orderExpireConsumer.run(channel, message, mqDto);
        }
    
    }

    image

    代码:https://github.com/gyjx/rabbitmq

  • 相关阅读:
    Salesforce LWC学习(三十七) Promise解决progressindicator的小问题
    Salesforce Consumer Goods Cloud 浅谈篇三之 行动计划(Action Plan)相关配置
    python 3.7环境安装并使用csv
    分享数据库优化文章
    php 5.4 var_export的改进
    CentOS7 启动 firewalld 防火墙失败,查看日志提示超时
    使用 SSL 加密的 JDBC 连接 SAP HANA 数据库
    CAS学习笔记一:CAS 授权服务器简易搭建
    202110期自考总结
    自定义 OpenShift s2i 镜像与模板——OracleJDK8
  • 原文地址:https://www.cnblogs.com/gyjx2016/p/13710152.html
Copyright © 2011-2022 走看看