zoukankan      html  css  js  c++  java
  • Java 搭建 RabbitMq 消息中间件

    前言

    当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

    名词

    • exchange: 交换机
    • routingkey: 路由key
    • queue:队列
    • 控制台端口:15672

       exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

    使用场景

    • 1.技能订单3分钟自动取消,改变状态
    • 2.直播开始前15分钟提醒
    • 3.直播状态自动结束

    参考链接

      https://juejin.im/entry/5a17909a518825329314397d
      https://www.jianshu.com/p/ea953f633466

    流程

      生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

      —> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

    第一步:在pom文件中添加

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    第二步:在application.properties文件中添加

    spring.rabbitmq.host=172.xx.xx.xxx
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=rabbit
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

     第三步:配置 OrderQueueConfig 

      1 package com.tuohang.platform.config;
      2 
      3 import org.springframework.amqp.core.Binding;
      4 import org.springframework.amqp.core.BindingBuilder;
      5 import org.springframework.amqp.core.DirectExchange;
      6 import org.springframework.amqp.core.Queue;
      7 import org.springframework.amqp.core.QueueBuilder;
      8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
      9 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
     10 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
     11 import org.springframework.context.annotation.Bean;
     12 import org.springframework.context.annotation.Configuration;
     13 
     14 
     15 /**
     16  * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
     17  * 
     18  * 
     19  * @author Administrator
     20  * @version 1.0
     21  * @Date 2018年9月18日
     22  */
     23 @Configuration
     24 public class OrderQueueConfig {
     25 
     26     /**
     27      * 订单缓冲交换机名称
     28      */
     29     public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";
     30 
     31     /**
     32      * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】
     33      */
     34     public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";
     35 
     36     /**
     37      * 订单的交换机DLX 名字
     38      */
     39     final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";
     40 
     41     /**
     42      * 订单message时间过期后进入的队列,也就是订单实际的消费队列
     43      */
     44     public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";
     45 
     46     /**
     47      * 订单在缓冲队列过期时间(毫秒)30分钟
     48      */
     49     public final static int ORDER_QUEUE_EXPIRATION = 1800000;
     50 
     51     /**
     52      * 订单缓冲交换机
     53      * 
     54      * @return
     55      */
     56     @Bean
     57     public DirectExchange preOrderExange() {
     58         return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
     59     }
     60 
     61     /**
     62      * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
     63      * 
     64      * @return
     65      */
     66     @Bean
     67     public Queue delayQueuePerOrderTTLQueue() {
     68         return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
     69                 .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
     70                 .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
     71                 .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间
     72                 .build();
     73     }
     74 
     75     /**
     76      * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
     77      *
     78      * @param delayQueuePerOrderTTLQueue
     79      * @param preOrderExange
     80      * @return
     81      */
     82     @Bean
     83     public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
     84         return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
     85     }
     86 
     87     /**
     88      * 创建订单的DLX exchange
     89      *
     90      * @return
     91      */
     92     @Bean
     93     public DirectExchange delayOrderExchange() {
     94         return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
     95     }
     96 
     97     /**
     98      * 创建order_delay_process_queue队列,也就是订单实际消费队列
     99      *
    100      * @return
    101      */
    102     @Bean
    103     public Queue delayProcessOrderQueue() {
    104         return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
    105     }
    106 
    107     /**
    108      * 将DLX绑定到实际消费队列
    109      *
    110      * @param delayProcessOrderQueue
    111      * @param delayExchange
    112      * @return
    113      */
    114     @Bean
    115     public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
    116         return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
    117     }
    118 
    119     /**
    120      * 监听订单实际消费者队列order_delay_process_queue
    121      * 
    122      * @param connectionFactory
    123      * @param processReceiver
    124      * @return
    125      */
    126     @Bean
    127     public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
    128             OrderProcessReceiver processReceiver) {
    129         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    130         container.setConnectionFactory(connectionFactory);
    131         container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue
    132         container.setMessageListener(new MessageListenerAdapter(processReceiver));
    133         return container;
    134     }
    135 }

    消费者 OrderProcessReceiver :

     1 package com.tuohang.platform.config;
     2 
     3 import java.util.Objects;
     4 
     5 import org.apache.tools.ant.types.resources.selectors.Date;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 import org.springframework.amqp.core.Message;
     9 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    10 import org.springframework.stereotype.Component;
    11 import com.rabbitmq.client.Channel;
    12 
    13 /**
    14  * 订单延迟处理消费者
    15  * 
    16  * 
    17  * @author Administrator
    18  * @version 1.0
    19  * @Date 2018年9月18日
    20  */
    21 @Component
    22 public class OrderProcessReceiver implements ChannelAwareMessageListener {
    23 
    24     private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);
    25 
    26     String msg = "The failed message will auto retry after a certain delay";
    27 
    28     @Override
    29     public void onMessage(Message message, Channel channel) throws Exception {
    30         try {
    31             processMessage(message);
    32         } catch (Exception e) {
    33             // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
    34             channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
    35                     msg.getBytes());
    36         }
    37     }
    38     
    39     /**
    40      * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
    41      *
    42      * @param message
    43      * @throws Exception
    44      */
    45     public void processMessage(Message message) throws Exception {
    46         String realMessage = new String(message.getBody());
    47         logger.info("Received <" + realMessage + ">");
    48         // 取消订单
    49         if(!Objects.equals(realMessage, msg)) {
    50 //            SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
    51             System.out.println("测试111111-----------"+new Date());
    52             System.out.println(message);
    53         }
    54     }
    55 }

        或者

     1 /**
     2  * 测试 rabbit 消费者
     3  * 
     4  * 
     5  * @author Administrator
     6  * @version 1.0
     7  * @Date 2018年9月25日
     8  */
     9 @Component
    10 @RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
    11 public class TestProcessReceiver {
    12 
    13     private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);
    14 
    15     String msg = "The failed message will auto retry after a certain delay";
    16 
    17     @RabbitHandler
    18     public void onMessage(Message message, Channel channel) throws Exception {
    19         try {
    20             processMessage(message);
    21             //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
    22             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    23         } catch (Exception e) {
    24             // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
    25             channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
    26                     msg.getBytes());
    27         }
    28     }
    29     
    30     /**
    31      * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
    32      *
    33      * @param message
    34      * @throws Exception
    35      */
    36     public void processMessage(Message message) throws Exception {
    37         String realMessage = new String(message.getBody());
    38         logger.info("Received < " + realMessage + " >");
    39         // 取消订单
    40         if(!Objects.equals(realMessage, msg)) {
    41             System.out.println("测试111111-----------"+new Date());
    42         }else {
    43             System.out.println("rabbit  else...");
    44         }
    45     }
    46 }

    生产者

     1 /**
     2      * 测试rabbitmq
     3      * 
     4      * @return
     5      */
     6     @RequestMapping(value = "/testrab")
     7     public String testraa() {
     8         GenericResult gr = null;
     9         try {
    10             String name = "test_pre_ttl_delay_queue";
    11    long expiration = 10000;//10s 过期时间
    12             rabbitTemplate.convertAndSend(name,String.valueOf(123456));
    13  // 在单个消息上设置过期时间
    14  //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));
    15 
    16 
    17         } catch (ServiceException e) {
    18             e.printStackTrace();
    19             gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
    20         }
    21         
    22         return getWrite(gr);
    23     }
  • 相关阅读:
    c3p0、dbcp、proxool、BoneCP比较
    velocity的一些优化记录
    JUnit-4.11使用报java.lang.NoClassDefFoundError: org/hamcrest/SelfDescribing错误
    Deployment failure on Tomcat 7.x. Could not copy all resources to
    Spring3.2.3+Quartz2.2.1 整合配置
    mysql批量insert速度超慢
    Fine Uploader + Spring3.2.2(Java+html5上传) SpringMVC+jquery-fineuploader 文件上传
    实现工资的按天统计(X:日期 Y:姓名)
    Java发邮件带附件(且重命名附件)
    微信小程序wx.switchTab传参问题
  • 原文地址:https://www.cnblogs.com/xhq1024/p/10684519.html
Copyright © 2011-2022 走看看