zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ

    • SpringBoot整合RabbitMQ

      公司最近在开发CRM系统的时候,需要将ERP的订单数据实时的传输到CRM系统中,但是由于每天的订单量特别大,采用实时获取后并存储到数据库中,接口的相应速度较慢,性能较差。经过经过多方位评估采用在数据库与接口层添加RabbitMQ作为缓冲层来实现。

      具体为:

      1、ESB将订单数据实时推送至CRM Controller接口
      2、CRM Controller调用RabbitMQ的生产者将数据推送至RabbitMQ服务器
      3、消费者实时消费RabbitMQ的订单并进行处理并存储至数据库中

      这里只是实现了RabbitMQ的生产者和消费者,采用的RabbitMQ Direct模式,代码如下:

      @SpringBootApplication
      @EnableScheduling
      public class RabbitMQApplication {
      
          public static void main(String[] args) {
          
              SpringApplication.run(RabbitMQApplication.class, args);
          
          }
      
      }
      
      @Configuration
      public class ProducerConfig {
      
          private static final Logger logger = LoggerFactory.getLogger(ProducerConfig.class);
          
          @Autowired
          private CachingConnectionFactory connectionFactory;
          
          @Bean
          public RabbitTemplate rabbitTemplate() {
              connectionFactory.setPublisherConfirms(true); //Use full publisher confirms, with correlation data and a callback for each message.
              connectionFactory.setPublisherReturns(true);
              connectionFactory.setConnectionTimeout(6000); //超时时间
              RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //采用默认模式创建模板
              rabbitTemplate.setMandatory(true); //强制
              rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                  @Override
                  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                      logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                  }
              });
              rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                  @Override
                  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                      //TODO 数据没有投递成功,可以在这里进行一些操作,比如讲消息保存到数据库中,等待下次进行投递
                      logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
                  }
              });
              return rabbitTemplate;
          }
      
      }
      
      @Component
      public class ProducerProcessing {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
          
          @Scheduled(fixedDelay = 1000, initialDelay = 500)
          public void directSend() {
          
              rabbitTemplate.convertAndSend(ReciverProcessing.DIRECT_EXCHANGE, ReciverProcessing.DIRECT_ROUTINGKEY, "订单信息");
              System.out.println("消息投递完成");
          }
      
      
         // @Scheduled(fixedDelay = 1000, initialDelay = 500)
          public void fanoutSend() {
      
              rabbitTemplate.convertAndSend(ReciverProcessing.FANOUT_EXCHANGE, "", "订单信息");
              System.out.println("消息投递完成");
          }
      
      
      
      }
      
      @Configuration
      public class ReciverProcessing {
      
          //Direct模式
          public static final String DIRECT_EXCHANGE = "direct_exchange";
          public static final String DIRECT_ROUTINGKEY = "direct_rountingkey";
          public static final String DIRECT_QUEUE = "direct_queue";
          
          @Autowired
          private ConnectionFactory connectionFactory;
          
          //发布订阅模式
          public static final String FANOUT_EXCHANGE = "fanout_exchange";
          public static final String FANOUT_QUEUE1 = "fanout_queue1";
          public static final String FANOUT_QUEUE2 = "fanout_queue2";
      
      
          /**
           * 实例化 DirectExchange
           *
           * @return
           */
          @Bean
          public DirectExchange directExchange() {
              /**
               * Exchange名称
               * Exchange持久化
               * Exchange 是否自动删除
               */
              return new DirectExchange(DIRECT_EXCHANGE, true, false);
          }
          
          /**
           * 创建广播模式交换机
           *
           * @return
           */
          @Bean
          public FanoutExchange fanoutExchange() {
              return new FanoutExchange(FANOUT_EXCHANGE, true, false);
          }
      
      
          @Bean
          public Queue directQueue() {
              /**
               * 队列名称
               * 队列是否持久化
               */
              return new Queue(DIRECT_QUEUE, true); //队列持久
          
          }
      
      
      
      
          @Bean
          public Binding directBinding() {
              /**
               * 交换器与队列绑定
               */
              return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
          }
      
      
      
      
          @Bean
          public SimpleMessageListenerContainer DirectMessageContainer() {
          
              //创建队列监听容器
              SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
              //监听的队列,可多个
              simpleMessageListenerContainer.setQueues(directQueue());
              simpleMessageListenerContainer.setExposeListenerChannel(true);
              //最大消费者数量
              simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
              //设置并发数
              simpleMessageListenerContainer.setConcurrentConsumers(3);
              //一次拉取消息的数量
              simpleMessageListenerContainer.setPrefetchCount(1);
              //确认模式
              simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
              simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
      
      
                  @Override
                  public void onMessage(Message message, Channel channel) throws Exception {
                      String body = new String(message.getBody());
                      //TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功
                      System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body);
                      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
                      //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
                  }
              });
              return simpleMessageListenerContainer;
      
      
          }
          
          /**
           * 队列1
           *
           * @return
           */
          @Bean
          public Queue fanoutQueue1() {
              return new Queue(FANOUT_QUEUE1, true);
          }
          
          /**
           * 队列2
           *
           * @return
           */
          @Bean
          public Queue fanoutQueue2() {
              return new Queue(FANOUT_QUEUE2, true);
          }
          
          /**
           * 队列1绑定
           *
           * @return
           */
          @Bean
          public Binding fanoutBinding1() {
              return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
          }
          
          /**
           * 队列2绑定
           *
           * @return
           */
          @Bean
          public Binding fanoutBinding2() {
              return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
          }
          /**
           * 监听队列进行消费,这里相当于一个消费者监听了两个队列,在实际开发应用中,发布订阅模式监听两个队列,收到的消息是一样的,所以这样做是没有什么意义的
           * 可以设置两个消费者,每个消费者只监听其中的一个队列,收到消息后根据需要来进行不同的处理
           *
           * @return
           */
          
          @Bean
          public SimpleMessageListenerContainer fanoutMessageContainer() {
          
              //创建队列监听容器
              SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
              //监听的队列,可多个
              simpleMessageListenerContainer.setQueues(fanoutQueue1(), fanoutQueue2());
              simpleMessageListenerContainer.setExposeListenerChannel(true);
              //最大消费者数量
              simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
              //设置并发数
              simpleMessageListenerContainer.setConcurrentConsumers(3);
              //一次拉取消息的数量
              simpleMessageListenerContainer.setPrefetchCount(1);
              //确认模式
              simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
              simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
      
      
                  @Override
                  public void onMessage(Message message, Channel channel) throws Exception {
                      String body = new String(message.getBody());
                      //TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功
                      System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body);
                      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
                      //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
                  }
              });
              return simpleMessageListenerContainer;
      
      
          }
      
      }
      

    相关代码链接: https://github.com/albert-liu435/springmq

  • 相关阅读:
    jquery使用
    网站重构?
    WEB应用从服务器主动推送Data到客户端有那些方式?
    异步加载和延迟加载?
    平时如何管理你的项目?
    对前端界面工程师这个职位是怎么样理解的?它的前景会怎么样?
    约定优于配置(convention over configuration)
    JavaEE的13种核心技术
    The Spring Framework ConfigurableListableBeanFactory.java
    mySQL的boolean类型为tinyint(1)
  • 原文地址:https://www.cnblogs.com/haizhilangzi/p/12301742.html
Copyright © 2011-2022 走看看