zoukankan      html  css  js  c++  java
  • RabbitMQ实战

    2.1 创建表结构

    CREATE TABLE `infrastructure_mq_producer` (
      `message_id` bigint(20) NOT NULL COMMENT '消息标识',
      `content_id` bigint(20) NOT NULL COMMENT '消息内容id,关联 infrastructure_message表',
      `type` varchar(4) NOT NULL DEFAULT '' COMMENT '消息类型',
      `status` int(2) NOT NULL DEFAULT '0' COMMENT '发送状态: 0:下发中;1:下发失败;2下发成功',
      `try_count` int(3) NOT NULL DEFAULT '0' COMMENT '重试次数',
      `next_retry_time` datetime DEFAULT NULL COMMENT '下一次执行时间',
      `error_message_id` bigint(20) DEFAULT NULL COMMENT '错误消息内容id,关联 infrastructure_message表',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      PRIMARY KEY (`message_id`),
      UNIQUE KEY `unq_message_id` (`message_id`) USING BTREE,
      KEY `idx_query` (`status`,`create_time`,`type`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ生产者记录';
    
    CREATE TABLE `infrastructure_mq_consumer` (
      `message_id` bigint(20) NOT NULL  COMMENT '消息标识',
      `content_id` bigint(20) NOT NULL COMMENT '消息内容id,关联 infrastructure_message表',
      `type` varchar(4) NOT NULL DEFAULT '' COMMENT '消息类型', 
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      PRIMARY KEY (`message_id`),
      UNIQUE KEY `unq_message_id` (`message_id`) USING BTREE,  
       KEY `idx_query` (`create_time`,`type`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ消费者记录';
    
    CREATE TABLE `infrastructure_message` (
      `id` bigint(20) NOT NULL COMMENT '消息标识',
      `key1` nvarchar(20) DEFAULT NULL COMMENT '相关主键',
      `content` text COMMENT '消息内容',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      PRIMARY KEY (`id`),
      UNIQUE KEY `unq_id` (`id`) USING BTREE,
      KEY `idx_key` (`key`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='报文数据';
    

    2.2 引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.1.RELEASE</version>
        </dependency>
    </dependencies>
    

    2.3 相关配置

    # 数据库配置
    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
    spring.datasource.username=root
    spring.datasource.password=sa000
    
    #  mybatis 相关配置
    mybatis.mapper-locations=classpath:mapper/*/*Mapper.xml
    
    # rabbitmq 配置
    ## 主机地址
    spring.rabbitmq.host=111.231.83.100
    ## 端口号
    spring.rabbitmq.port=5672
    ## 虚拟主机路径
    spring.rabbitmq.virtual-host=/
    ## 连接超时时间
    spring.rabbitmq.connection-timeout=15000
    ## 消费者设置手动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    ## 消费者每次消费数量
    spring.rabbitmq.listener.simple.concurrency=1
    ## 最大消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=5
    ## 开启 confirm 确认机制
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.publisher-confirm-type=correlated
    ## 开启 Return 确认机制
    spring.rabbitmq.template.mandatory=true
    

    2.4 实现MQ消息数据库持久化

    @Repository
    public class MQRepository {
    
        private final MqProducerMapper producerMapper;
        private final MqConsumerMapper consumerMapper;
        private final MessageMapper messageMapper;
        private final MQFactory mqFactory;
    
        public MQRepository(MqProducerMapper producerMapper, MqConsumerMapper consumerMapper, MessageMapper messageMapper, MQFactory mqFactory) {
            this.producerMapper = producerMapper;
            this.consumerMapper = consumerMapper;
            this.messageMapper = messageMapper;
            this.mqFactory = mqFactory;
        }
    
    
        public void saveMessage(Long messageId, String content, String type) {
            Date currentTime = new Date();
            Message message = mqFactory.createMessage(content, currentTime);
            messageMapper.insertSelective(message);
    
            final MqProducer producer = mqFactory.createProducer(messageId, message.getId(), type, currentTime);
            producerMapper.insertSelective(producer);
        }
    
        public void sendSuccess(Long messageId) {
            MqProducer producer = new MqProducer();
            producer.setMessageId(messageId);
            producer.setStatus(2);
            producer.setUpdateTime(new Date());
    
            MqProducerExample updateExample = new MqProducerExample();
            updateExample.createCriteria()
                    .andMessageIdEqualTo(messageId)
                    .andStatusEqualTo(0);
            producerMapper.updateByExampleSelective(producer, updateExample);
        }
    
        public void sendFailure(Long messageId, String errorInfo) {
            Message errorMessage = mqFactory.createMessage(errorInfo, new Date());
            messageMapper.insertSelective(errorMessage);
    
            MqProducer producer = new MqProducer();
            producer.setMessageId(messageId);
            producer.setStatus(1);
            producer.setUpdateTime(new Date());
            producer.setErrorMessageId(errorMessage.getId());
            producerMapper.updateByPrimaryKeySelective(producer);
        }
    
    
        public void consumeSuccess(Long messageId, String content){
            Date currentTime = new Date();
            Message message = mqFactory.createMessage(content, currentTime);
            messageMapper.insertSelective(message);
    
            final MqConsumer consumer = mqFactory.createConsumer (messageId, message.getId(), currentTime);
            consumerMapper.insertSelective(consumer);
        }
    }
    
    @Component
    public class MQFactory {
    
    
        public MqProducer createProducer(Long messageId, Long contentId, String type, Date currentTime) {
            MqProducer producer = new MqProducer();
            producer.setMessageId(messageId);
            producer.setContentId(contentId);
            producer.setType(type);
            producer.setStatus(0);
            producer.setTryCount(0);
            producer.setNextRetryTime(currentTime);
            producer.setCreateTime(currentTime);
            producer.setUpdateTime(currentTime);
            return producer;
        }
    
        public Message createMessage(String content, Date currentTime) {
            Message message = new Message();
            message.setId(new Date().getTime());
            message.setCreateTime(currentTime);
            message.setContent(content);
            return message;
        }
    
        public MqConsumer createConsumer(Long messageId, Long contentId, Date currentTime) {
            MqConsumer consumer = new MqConsumer();
            consumer.setMessageId(messageId);
            consumer.setContentId(contentId);
            consumer.setType("");
            consumer.setCreateTime(currentTime);
            consumer.setUpdateTime(currentTime);
            return consumer;
        }
    }
    

    2.5 新建回调事件实现接口

    @Component
    public class ProducerSendConfirmCallback implements RabbitTemplate.ConfirmCallback {
    
        private final MQRepository mqRepository;
    
        public ProducerSendConfirmCallback(MQRepository mqRepository) {
            this.mqRepository = mqRepository;
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            Long messageId = Long.valueOf(correlationData.getId());
            if (ack) {
                mqRepository.sendSuccess(messageId);
            } else {
                mqRepository.sendFailure(messageId, cause);
            }
        }
    }
    
    @Component
    public class ProducerSendReturnedCallback implements RabbitTemplate.ReturnCallback {
    
        private final MQRepository mqRepository;
    
        public ProducerSendReturnedCallback(MQRepository mqRepository) {
            this.mqRepository = mqRepository;
        }
    
    
        /**
         * @param message    消息对象
         * @param replyCode  错误码
         * @param replyText  错误文本
         * @param exchange
         * @param routingKey
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            Long messageId = Long.valueOf(message.getMessageProperties()
                    .getHeader("spring_returned_message_correlation"));
            mqRepository.sendFailure(messageId, replyText);
        }
    }
    

    2.6 新建 RabbitMQConfig 配置

    @Configuration
    public class RabbitMQConfig {
    
        private final RabbitTemplate rabbitTemplate;
        private final ProducerSendReturnedCallback producerSendReturnedCallback;
        private final ProducerSendConfirmCallback producerSendConfirmCallback;
    
        public RabbitMQConfig(RabbitTemplate rabbitTemplate, ProducerSendReturnedCallback producerSendReturnedCallback,
                              ProducerSendConfirmCallback producerSendConfirmCallback) {
            this.rabbitTemplate = rabbitTemplate;
            this.producerSendReturnedCallback = producerSendReturnedCallback;
            this.producerSendConfirmCallback = producerSendConfirmCallback;
        }
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(producerSendConfirmCallback);
            rabbitTemplate.setReturnCallback(producerSendReturnedCallback);
        }
    
    
        @Bean
        public TopicExchange orderExchange() {
            return new TopicExchange("orderExchange");
        }
    
        @Bean
        public Queue orderQueue() {
            return new Queue("orderQueue");
        }
    
        @Bean
        public Binding orderBinding(TopicExchange orderExchange, Queue orderQueue) {
            return BindingBuilder
                    // 创建队列
                    .bind(orderQueue)
                    // 创建交换机
                    .to(orderExchange)
                    // 指定路由 Key
                    .with("order.#");
        }
    }
    

    2.7 创建生产者

    @RestController
    public class ProducerController {
    
    
        private final RabbitTemplate rabbitTemplate;
        private final MQRepository mqRepository;
    
        public TestController(RabbitTemplate rabbitTemplate, MQRepository mqRepository) {
            this.rabbitTemplate = rabbitTemplate;
            this.mqRepository = mqRepository;
        }
    
    
        @GetMapping("/sendNotExistExchangeAndNotExistRoutingKeyMessage")
        public void sendNotExistExchangeAndNotExistRoutingKeyMessage() {
            Long messageId = new Date().getTime();
            String type = "test";
            String content = "123";
            CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
            mqRepository.saveMessage(messageId, content, type);
            rabbitTemplate.convertAndSend("NotExistExchange", "NotExistRoutingKey", content, correlationData);
        }
    
        @GetMapping("/sendNotExistRoutingKeyMessage")
        public void sendNotExistRoutingKeyMessage() {
            Long messageId = new Date().getTime();
            String type = "test";
            String content = "123";
            CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
            mqRepository.saveMessage(messageId, content, type);
            rabbitTemplate.convertAndSend("orderExchange", "NotExistRoutingKey", "123", correlationData);
    
        }
    
        @GetMapping("/sendMessage")
        public void sendMessage() {
            Long messageId = new Date().getTime();
            String type = "test";
            CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
            Order order = new Order("10001", BigDecimal.valueOf(150));
            mqRepository.saveMessage(messageId, order.toString(), type);
            rabbitTemplate.convertAndSend("orderExchange", "order.create", order, correlationData);
    
        }
    
    }
    

    2.8 创建消费者

    @Component
    public class OrderConsumer {
    
        private final MQRepository mqRepository;
    
        public OrderConsumer(MQRepository mqRepository) {
            this.mqRepository = mqRepository;
        }
    
        @RabbitListener(queues = "orderQueue")
        public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            Long messageId = Long.valueOf((String)headers.get("spring_returned_message_correlation"));
            mqRepository.consumeSuccess(messageId, order.toString());
            System.err.println("消费端消费:" + order.toString());
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    
    }
    
  • 相关阅读:
    VirtualBox 安装增强工具Guest Additions 【转载】
    全面认识计算机启动过程【转载】
    Linux下使用mke2fsk格式化虚拟磁盘分区的方法
    什么是posix
    bochs默认没有pci的
    剑指offer中二进制中1的个数
    原码、补码、反码的概念
    初识hadoop
    chord原理的解读
    深度学习概述:从感知机到深度网络
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13305464.html
Copyright © 2011-2022 走看看