在前几篇文章中简单介绍了rabbitmq的基本使用,大家应该发现比如,发送端发送消息,消费端手动ack,mq消费异常处理等业务逻辑都是耦合在代码里,这样做的弊端,1、是存在大量重复代码,2、无法统一管理。本文主要介绍了,如何进行封装rabbitmq业务代码。
发送端:
MqDto公共消息类
@Data @ToString public class MqDto<T> implements Serializable { /** * 消息ID,用于去重,全局唯一,为空则系统自动创建 */ private String messageId; /** * 消息体 */ private T body; /** * 交换机 */ private String exchange; /** * 路由KEY,广播模式可以不传路由KEY */ private String routingKey=""; /** * 消息业务 */ private String busChannel; /** * 过期,单位毫秒 */ private Long expire; public void setMessageId(String messageId) { if (StringUtils.isBlank(messageId)) { this.messageId = RandomStringUtils.random(10); } else { this.messageId = messageId; } } }
发送公共类
@Service @Slf4j public class RabbitMQService { @Autowired RabbitTemplate rabbitTemplate; /** * rabbitmq 发送核心方法 * * @param mqDto */ public void send(MqDto mqDto) { rabbitTemplate.convertAndSend(mqDto.getExchange(), mqDto.getRoutingKey(), mqDto, message -> { message.getMessageProperties().setCorrelationId(mqDto.getMessageId()); message.getMessageProperties().setMessageId(mqDto.getMessageId()); return message; }); } /** * rabbitmq 发送TTL消息 * * @param mqDto */ public void sendTTl(MqDto mqDto) { rabbitTemplate.convertAndSend(mqDto.getExchange(), mqDto.getRoutingKey(), mqDto, message -> { message.getMessageProperties().setExpiration(mqDto.getExpire() + ""); message.getMessageProperties().setCorrelationId(mqDto.getMessageId()); message.getMessageProperties().setMessageId(mqDto.getMessageId()); return message; }); } }
发送创建订单消息
@RestController @RequestMapping("mq") @Slf4j public class MqController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitMQService rabbitMQService; /** * 创建订单 * @return */ @GetMapping("createOrder") public String createOrder(String title){ OrderDto orderDto=new OrderDto(); orderDto.setOrderNo(RandomStringUtils.randomAlphanumeric(10)); orderDto.setTitle(title); orderDto.setBody("黑色的小米手机"); MqDto<OrderDto> mqDto=new MqDto<>(); mqDto.setBusChannel("order"); mqDto.setExchange(RabbitMQExchangeConfig.TOPIC_EXCHANGE); mqDto.setRoutingKey("order.ttl"); mqDto.setExpire(10000L); mqDto.setMessageId(orderDto.getOrderNo()); mqDto.setBody(orderDto); rabbitMQService.sendTTl(mqDto); log.info("send ok"); return "ok"; } }
消费端:
消费端公共抽象类
@Slf4j public abstract class DefaultConsumer<T> { /** * 获取rabbitmq配置 * * @return */ public abstract RabbitProperties getRabbitProperties(); /** * 业务执行 * * @param t */ public abstract void execute(T t) throws Exception; /** * 业务执行失败,处理策略 */ public void executeFail(Channel channel, Message message, Exception e) throws Exception { //获取当前重试的参数 RetryContext context = RetrySynchronizationManager.getContext(); //如果有异常,且没有达到最大投递次数,失败执行策略 if ((getRabbitProperties().getListener().getSimple().getRetry().getMaxAttempts() - 1 == context.getRetryCount())) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } else { throw e; } } public void run(Channel channel, Message message, T t) throws Exception { try { execute(t); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error("consumer fail:{}", message.toString()); executeFail(channel, message, e); } } }
订单过期逻辑处理
@Component @Slf4j public class OrderExpireConsumer extends DefaultConsumer<MqDto<OrderDto>> { @Autowired RabbitProperties rabbitProperties; @Override public RabbitProperties getRabbitProperties() { return rabbitProperties; } /** * 具体的业务逻辑 * @param orderDtoMqDto */ @Override public void execute(MqDto<OrderDto> orderDtoMqDto) { //模拟订单过期消费 log.info("OrderExpireConsumer,orderDto:{}", orderDtoMqDto.toString()); } }
监听order死信队列
@Component @Slf4j public class RabbitMQListener { @Autowired private OrderExpireConsumer orderExpireConsumer; /** * 监听订单死信队列 * * @param channel * @param mqDto * @param message * @throws Exception */ @RabbitListener(queues = {"#{dlxTtlQueue.name}"}) public void orderDlxTtl(@Header(AmqpHeaders.CHANNEL) Channel channel, MqDto<OrderDto> mqDto, Message message) throws Exception { orderExpireConsumer.run(channel, message, mqDto); } }