zoukankan      html  css  js  c++  java
  • Java 搭建 RabbitMq 消息中间件

    前言

    当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

    名词

    • exchange: 交换机
    • routingkey: 路由key
    • queue:队列
    • 控制台端口:15672

       exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

    使用场景

    • 1.技能订单3分钟自动取消,改变状态
    • 2.直播开始前15分钟提醒
    • 3.直播状态自动结束

    参考链接

      https://juejin.im/entry/5a17909a518825329314397d
      https://www.jianshu.com/p/ea953f633466

    流程

      生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

      —> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

    第一步:在pom文件中添加

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    第二步:在application.properties文件中添加

    spring.rabbitmq.host=172.xx.xx.xxx
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=rabbit
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

     第三步:配置 OrderQueueConfig 

      1 package com.tuohang.platform.config;
      2 
      3 import org.springframework.amqp.core.Binding;
      4 import org.springframework.amqp.core.BindingBuilder;
      5 import org.springframework.amqp.core.DirectExchange;
      6 import org.springframework.amqp.core.Queue;
      7 import org.springframework.amqp.core.QueueBuilder;
      8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
      9 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
     10 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
     11 import org.springframework.context.annotation.Bean;
     12 import org.springframework.context.annotation.Configuration;
     13 
     14 
     15 /**
     16  * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
     17  * 
     18  * 
     19  * @author Administrator
     20  * @version 1.0
     21  * @Date 2018年9月18日
     22  */
     23 @Configuration
     24 public class OrderQueueConfig {
     25 
     26     /**
     27      * 订单缓冲交换机名称
     28      */
     29     public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";
     30 
     31     /**
     32      * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】
     33      */
     34     public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";
     35 
     36     /**
     37      * 订单的交换机DLX 名字
     38      */
     39     final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";
     40 
     41     /**
     42      * 订单message时间过期后进入的队列,也就是订单实际的消费队列
     43      */
     44     public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";
     45 
     46     /**
     47      * 订单在缓冲队列过期时间(毫秒)30分钟
     48      */
     49     public final static int ORDER_QUEUE_EXPIRATION = 1800000;
     50 
     51     /**
     52      * 订单缓冲交换机
     53      * 
     54      * @return
     55      */
     56     @Bean
     57     public DirectExchange preOrderExange() {
     58         return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
     59     }
     60 
     61     /**
     62      * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
     63      * 
     64      * @return
     65      */
     66     @Bean
     67     public Queue delayQueuePerOrderTTLQueue() {
     68         return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
     69                 .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
     70                 .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
     71                 .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间
     72                 .build();
     73     }
     74 
     75     /**
     76      * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
     77      *
     78      * @param delayQueuePerOrderTTLQueue
     79      * @param preOrderExange
     80      * @return
     81      */
     82     @Bean
     83     public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
     84         return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
     85     }
     86 
     87     /**
     88      * 创建订单的DLX exchange
     89      *
     90      * @return
     91      */
     92     @Bean
     93     public DirectExchange delayOrderExchange() {
     94         return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
     95     }
     96 
     97     /**
     98      * 创建order_delay_process_queue队列,也就是订单实际消费队列
     99      *
    100      * @return
    101      */
    102     @Bean
    103     public Queue delayProcessOrderQueue() {
    104         return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
    105     }
    106 
    107     /**
    108      * 将DLX绑定到实际消费队列
    109      *
    110      * @param delayProcessOrderQueue
    111      * @param delayExchange
    112      * @return
    113      */
    114     @Bean
    115     public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
    116         return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
    117     }
    118 
    119     /**
    120      * 监听订单实际消费者队列order_delay_process_queue
    121      * 
    122      * @param connectionFactory
    123      * @param processReceiver
    124      * @return
    125      */
    126     @Bean
    127     public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
    128             OrderProcessReceiver processReceiver) {
    129         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    130         container.setConnectionFactory(connectionFactory);
    131         container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue
    132         container.setMessageListener(new MessageListenerAdapter(processReceiver));
    133         return container;
    134     }
    135 }

    消费者 OrderProcessReceiver :

     1 package com.tuohang.platform.config;
     2 
     3 import java.util.Objects;
     4 
     5 import org.apache.tools.ant.types.resources.selectors.Date;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 import org.springframework.amqp.core.Message;
     9 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    10 import org.springframework.stereotype.Component;
    11 import com.rabbitmq.client.Channel;
    12 
    13 /**
    14  * 订单延迟处理消费者
    15  * 
    16  * 
    17  * @author Administrator
    18  * @version 1.0
    19  * @Date 2018年9月18日
    20  */
    21 @Component
    22 public class OrderProcessReceiver implements ChannelAwareMessageListener {
    23 
    24     private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);
    25 
    26     String msg = "The failed message will auto retry after a certain delay";
    27 
    28     @Override
    29     public void onMessage(Message message, Channel channel) throws Exception {
    30         try {
    31             processMessage(message);
    32         } catch (Exception e) {
    33             // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
    34             channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
    35                     msg.getBytes());
    36         }
    37     }
    38     
    39     /**
    40      * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
    41      *
    42      * @param message
    43      * @throws Exception
    44      */
    45     public void processMessage(Message message) throws Exception {
    46         String realMessage = new String(message.getBody());
    47         logger.info("Received <" + realMessage + ">");
    48         // 取消订单
    49         if(!Objects.equals(realMessage, msg)) {
    50 //            SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
    51             System.out.println("测试111111-----------"+new Date());
    52             System.out.println(message);
    53         }
    54     }
    55 }

        或者

     1 /**
     2  * 测试 rabbit 消费者
     3  * 
     4  * 
     5  * @author Administrator
     6  * @version 1.0
     7  * @Date 2018年9月25日
     8  */
     9 @Component
    10 @RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
    11 public class TestProcessReceiver {
    12 
    13     private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);
    14 
    15     String msg = "The failed message will auto retry after a certain delay";
    16 
    17     @RabbitHandler
    18     public void onMessage(Message message, Channel channel) throws Exception {
    19         try {
    20             processMessage(message);
    21             //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
    22             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    23         } catch (Exception e) {
    24             // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
    25             channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
    26                     msg.getBytes());
    27         }
    28     }
    29     
    30     /**
    31      * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
    32      *
    33      * @param message
    34      * @throws Exception
    35      */
    36     public void processMessage(Message message) throws Exception {
    37         String realMessage = new String(message.getBody());
    38         logger.info("Received < " + realMessage + " >");
    39         // 取消订单
    40         if(!Objects.equals(realMessage, msg)) {
    41             System.out.println("测试111111-----------"+new Date());
    42         }else {
    43             System.out.println("rabbit  else...");
    44         }
    45     }
    46 }

    生产者

     1 /**
     2      * 测试rabbitmq
     3      * 
     4      * @return
     5      */
     6     @RequestMapping(value = "/testrab")
     7     public String testraa() {
     8         GenericResult gr = null;
     9         try {
    10             String name = "test_pre_ttl_delay_queue";
    11    long expiration = 10000;//10s 过期时间
    12             rabbitTemplate.convertAndSend(name,String.valueOf(123456));
    13  // 在单个消息上设置过期时间
    14  //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));
    15 
    16 
    17         } catch (ServiceException e) {
    18             e.printStackTrace();
    19             gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
    20         }
    21         
    22         return getWrite(gr);
    23     }
  • 相关阅读:
    leetcode33. Search in Rotated Sorted Array
    pycharm 设置sublime text3 monokai主题
    django class Meta
    leetcode30, Substring With Concatenation Of All Words
    Sublime text3修改tab键为缩进为四个空格,
    sublime text3 python打开图像的问题
    安装上imesupport输入法依然不跟随的解决办法,
    sublime text3 的插件冲突弃用问题,
    sublime text3 BracketHighlighter括号匹配的设置
    windows 下wget的使用
  • 原文地址:https://www.cnblogs.com/xhq1024/p/10684519.html
Copyright © 2011-2022 走看看