zoukankan      html  css  js  c++  java
  • SpringBoot RabbitMQ 实战

    RabbitMQ

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。

    RabbitMQ 七种队列模式

    • 简单模式(Hello World):
      做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。
    • 工作队列模式(Work queues):
      在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理。
    • 订阅模式(Publish/Subscribe):
      一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。
    • 路由模式(Routing):
      有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息。
    • 主题模式(Topics):
      根据主题(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。
    • 远程过程调用(RPC):客户端发送一个请求消息,服务端以一个响应消息回应。
    • 发布者确认(Publisher Confirms):与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

    四种交换机

    • 直连交换机(Direct exchange):
      具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列。
    • 扇形交换机(Fanout exchange):
      广播消息到所有队列,没有任何处理,速度最快。
    • 主题交换机(Topic exchange):
      在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,*代表一个单词,#代表多个单词。
    • 首部交换机(Headers exchange):
      忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则。

    安装环境(centos7)

    安装包下载

    Erlang和RabbitMQ官网下载速度特别慢,可用 erlang-solutions.com 下载,速度特别快。

    https://www.erlang-solutions.com/downloads/

    安装版本
    • Erlang:esl-erlang_23.0-1_centos_7_amd64.rpm

    • RabbitMQ:rabbitmq-server-generic-unix-3.7.7.tar.xz

    一、安装Erlang

    1.编译依赖

    yum install esl-erlang_23.0-1_centos_7_amd64.rpm
    

    2.查看Erlang版

    erl -version
    

    二、安装RabbitMq

    1.解压包

    xz -d rabbitmq-server-generic-unix-3.7.7.tar.xz
    
    tar -xvf rabbitmq-server-generic-unix-3.7.7.tar
    

    2.设置RabbitMq的环境变量

    进入到rabbit文件内,其命令文件存在于sbin文件夹下,因此需要将sbin文件夹的路径添加到PATH中:修改/etc/profile

    cd rabbitmq_server-3.7.7/sbin
    
    export PATH=/usr/local/rabbitmq_server-3.7.7/sbin:$PATH
    

    3.PATH路径更新

    source /etc/profile
    

    4.开启 web 管理插件

    rabbitmq-plugins enable rabbitmq_management
    

    5.启动

    #启动后台管理
    ./rabbitmq-plugins enable rabbitmq_management   
    
    #后台运行rabbitmq
    ./rabbitmq-server -detached   
    

    6.新建用户

    由于guest用户被限制,只能通过localhost访问,因此我们需要新建一个用户,并授予管理员权限。

    新建一个用户名为admin,密码为admin的用户,并授予管理员(administrator)权限

    ./rabbitmqctl add_user admin admin
    
    ./rabbitmqctl set_user_tags admin administrator
    

    7.浏览器访问

    默认端口为:15672

    http://127.0.0.1:15672

    效果

    在这里插入图片描述

    SpringBoot整合

    以下贴出的代码只是关键代码,涉及业务相关的已删除,请自行处理。

    1、说明

    使用是主题模式 + 主题交换机 + 消息确认机制

    2、添加依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    3、核心配置文件
    spring:
      # 配置rabbitMq 服务器
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 开启confirm确认机制
        publisher-confirms: true 
        # 开启return确认机制
        publisher-returns: true 
        template:
          # 设置为true后,消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
          mandatory: true
        listener:
          simple:
            # 设置消费端手动 ack
            acknowledge-mode: manual
            # 消费者最小数量
            concurrency: 1
            # 消费之最大数量
            max-concurrency: 20
            # 每次只处理一个消息
            prefetch: 1
            retry:
              # 是否支持重试
              enabled: true
        
    
    4、常量配置(AmqpConstant.java)
    public class AmqpConstant {
    
    	/**
    	 * 支付路由key
    	 * *:匹配不多不少一个词
    	 * #:匹配一个或多个词
    	 */
    	public static final String PAY_QUEUE = "pay_queue";
    	public static final String PAY_ROUTING_KEY = "pay.order";
    	public static final String _PAY_ROUTING_KEY = "pay.#";
    
    	/**
    	 * 查询路由key
    	 */
    	public static final String QUERY_QUEUE = "query_queue";
    	public static final String QUERY_ROUTING_KEY = "query.order";
    	public static final String _QUERY_ROUTING_KEY = "query.#";
    
    	/**
    	 * 交换机名称
    	 */
    	public static final String PAY_EXCHANGE_NAME = "PAY-EXCHANGE";
    
    	//===================================================================================
    
    	/**
    	 * 使用Redis保证消息幂等性
    	 */
    	public static final String PAY_QUEUE_REDIS_KEY = "pay-queue:";
    
    	/**
    	 * Redis消息ID key 的过期时间,单位:秒
    	 */
    	public static final Integer PAY_QUEUE_REDIS_KEY_TIMEOUT = 7200;
    
    	/**
    	 * 轮询查询订单状态的睡眠时间,单位:秒
    	 */
    	public static final Integer POLLING_STATUS_TIME = 2000;
    
    }
    
    
    5、配置队列、路由交换机,TopicQueueConfig.java
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.PostConstruct;
    
    @Configuration
    public class TopicQueueConfig {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    
    	/**
    	 * 支付队列
    	 *
    	 * @return
    	 */
    	@Bean
    	public Queue payTopicQueue() {
    		return new Queue(AmqpConstant.PAY_QUEUE, true);
    	}
    
    	/**
    	 * 查询队列
    	 *
    	 * @return
    	 */
    	@Bean
    	public Queue queryTopicQueue() {
    		return new Queue(AmqpConstant.QUERY_QUEUE, true);
    	}
    
    	/**
    	 * 路由交换机
    	 *
    	 * @return
    	 */
    	@Bean
    	public TopicExchange topicExchange() {
    		return new TopicExchange(AmqpConstant.PAY_EXCHANGE_NAME);
    	}
    
    	/**
    	 * 队列绑定交换机,指定routingKey,也可在可视化工具中进行绑定
    	 *
    	 * @return
    	 */
    	@Bean
    	Binding payBindingTopicExchange(Queue payTopicQueue, TopicExchange exchange) {
    		return BindingBuilder.bind(payTopicQueue).to(exchange).with(AmqpConstant._PAY_ROUTING_KEY);
    	}
    
    	/**
    	 * 队列绑定交换机,指定routingKey,也可在可视化工具中进行绑定
    	 *
    	 * @return
    	 */
    	@Bean
    	Binding queryBindingTopicExchange(Queue queryTopicQueue, TopicExchange exchange) {
    		return BindingBuilder.bind(queryTopicQueue).to(exchange).with(AmqpConstant._QUERY_ROUTING_KEY);
    	}
    
    	@PostConstruct
    	public void initRabbitTemplate() {
    		// 设置生产者消息确认
    		rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
    		rabbitTemplate.setReturnCallback(new RabbitConfirmReturnCallBack());
    	}
    
    }
    
    
    6、生产端 Confirm 消息确认机制

    消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!

    RabbitConfirmCallback.java

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    /**
     * 消息发送确认类
     */
    @Slf4j
    public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
    
    	@Override
    	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    		log.info("=======ConfirmCallback===correlationData:{}=====ack:{}=====cause:{}========", correlationData, ack, cause);
    	}
    }
    
    
    7、Return 消息机制

    Return Listener 用于处理一-些不可路 由的消息!
    当exchange不存在或者指定的路由 key 路由不到时,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !

    RabbitConfirmReturnCallBack.java

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    /**
     * 消息发送交换机返回机制
     */
    @Slf4j
    public class RabbitConfirmReturnCallBack implements RabbitTemplate.ReturnCallback{
    
    	@Override
    	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    		log.info("===RabbitConfirmReturnCallBack====exchange: {}======routingKey:{}=====replyCode:{}====replyText: {}======", exchange, routingKey, replyCode, replyText);
    	}
    }
    
    

    在基础API中有一个关键的配置项:Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broker 端自动删除该消息!

    8、消息生产者相关

    一个发送支付生产者、一个查询订单转态的生产者。

    8.1、生产者 OrderProvider.java
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.List;
    import java.util.Objects;
    
    /**
     * 消息生产者
     */
    @Slf4j
    @Service
    public class OrderProvider {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    
    	/**
    	 * 支付消息生产者方法
    	 *
    	 * @param messages
    	 */
    	public void sendObject(List<Message> messages) {
    		try {
    			if (!Objects.isNull(messages) && messages.size() > 0) {
    				log.info("========支付生产消息开始=====总消息个数为:{}==========", messages.size());
    				messages.forEach(message -> {
    					// 全局唯一
    					CorrelationData correlationData = new CorrelationData(message.getMessageId());
    					rabbitTemplate.convertAndSend(AmqpConstant.PAY_EXCHANGE_NAME, AmqpConstant.PAY_ROUTING_KEY, message, correlationData);
    				});
    				log.info("========支付生产消息结束=====总消息个数为:{}==========", messages.size());
    			}
    		} catch (Exception e) {
    			log.error("====支付生产消息异常=======:{}", e.getMessage());
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 查询消息生产者方法
    	 *
    	 * @param message
    	 */
    	public void sendQuery(Message message) {
    		try {
    			if (Objects.nonNull(message)) {
    				log.info("========查询消息生产者开始=====");
    				// 全局唯一
    				CorrelationData correlationData = new CorrelationData(message.getMessageId());
    				rabbitTemplate.convertAndSend(AmqpConstant.PAY_EXCHANGE_NAME, AmqpConstant.QUERY_ROUTING_KEY, message, correlationData);
    				log.info("========查询消息生产者结束=======消息ID:{}", correlationData);
    			}
    		} catch (Exception e) {
    			log.error("====查询消息生产者异常=======:{}", e.getMessage());
    			e.printStackTrace();
    		}
    	}
    
    
    }
    
    
    8.2、消息实体 Message.java
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.experimental.SuperBuilder;
    
    import java.io.Serializable;
    import java.time.LocalDateTime;
    
    @Data
    @SuperBuilder
    @AllArgsConstructor
    @NoArgsConstructor
    public class Message implements Serializable {
    
    	/**
    	 * 消息Id
    	 */
    	private String messageId;
    
    	/**
    	 * 消息内容
    	 */
    	private Object messageData;
    
    	/**
    	 * 消息创建时间
    	 */
    	private LocalDateTime createTime;
    
    }
    
    
    8.3、生产者Controller OrderProviderController.java
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.List;
    
    @Slf4j
    @RestController
    @RequestMapping(value = "/mqprovider")
    @AllArgsConstructor
    public class OrderProviderController {
    
    	@Autowired
    	private OrderProvider orderProvider;
    
    	/**
        *支付消息生产者
        */
    	@PostMapping("/sendobject")
    	public String sendobject(@RequestBody List<Message> messageList) {
    		orderProvider.sendObject(messageList);
    		return "success";
    	}
    
        /**
        *查询消息生产者
        */
    	@PostMapping("/sendquery")
    	public String sendQuery(@RequestBody Message message) {
    		orderProvider.sendQuery(message);
    		return "success";
    	}
    
    }
    
    9、消费者相关
    9.1、支付消息接收监听 OrderPayReceiver.java
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    import java.util.Map;
    import java.util.Objects;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 支付消息接收监听
     */
    @Slf4j
    @Component
    public class OrderPayReceiver {
    
    	@Autowired
    	private OrderProvider orderProvider;
    
    	@Autowired
    	private RedisTemplate redisTemplate;
    
    	@RabbitListener(queues = AmqpConstant.PAY_QUEUE)
    	public void payMessage(@Payload Message message, Channel channel, @Headers Map<String, Object> headers) {
    		try {
    			// 保证消息幂等性
    			Object redisMessageId = redisTemplate.opsForValue().get(AmqpConstant.PAY_QUEUE_REDIS_KEY + message.getMessageId());
    			if (!Objects.isNull(redisMessageId) && message.getMessageId().equals(String.valueOf(redisMessageId))) {
    				// 手工ack
    				channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), true);
    				return;
    			}
    			log.info("==============支付消费者开始======消息ID:{}====消息创建时间:{}", message.getMessageId(), message.getCreateTime());
    
    			// 调用支付接口  TODO
    			 
    
    			// 消息ID存入Redis
    			redisTemplate.opsForValue().set(AmqpConstant.PAY_QUEUE_REDIS_KEY + message.getMessageId(), message.getMessageId(), AmqpConstant.PAY_QUEUE_REDIS_KEY_TIMEOUT, TimeUnit.SECONDS);
    
    			// 将支付调用成功的消息放入查询队列中 TODO
    			// orderProvider.sendQuery();
    
    			// 手工ack
    			channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), true);
    			log.info("==============支付消费者结束==========");
    		} catch (Exception e) {
    			log.error("==============支付消费者异常,异常信息:{}", e.getMessage());
    			e.printStackTrace();
    		}
    	}
    
    }
    
    9.2、查询消息接收监听 OrderQueryReceiver.java
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    import java.util.Objects;
    
    /**
     * 查询消息接收监听
     */
    @Slf4j
    @Component
    public class OrderQueryReceiver {
    
    	@RabbitListener(queues = AmqpConstant.QUERY_QUEUE)
    	public void onUserMessage(@Payload Message message, Channel channel, @Headers Map<String, Object> headers) {
    		try {
    			log.info("==============查询消费者开始======消息ID:{}====消息创建时间:{}", message.getMessageId(), message.getCreateTime());
    			// 调用查询订单状态接口 TODO
    			 
    
    
    			log.info("=====查询支付接口返回======支付状态:{}====支付状态描述:{}=====", result.getCode(), result.getMessage());
    			// 手工ack
    			long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    			channel.basicAck(deliveryTag, true);
    			log.info("==============查询消费者结束======deliveryTag:{}====", deliveryTag);
    		} catch (Exception e) {
    			log.error("==============查询消费者异常,异常信息:{}", e.getMessage());
    			e.printStackTrace();
    		}
    	}
    
    }
    
    

    业务架构

    需求:每天按照需求批量生产订单数据,进行调用三方支付接口进行支付操作。有实时放回支付状态的,也有异步返回状态的,具需求要求本业务不提供回调更新状态机制,需自身请求三方接口更新订单转态(业务中有批量支付操作)。

    简易业务架构图如下:

    在这里插入图片描述

    以上是结合项目需求设计的业务实现。

    如何保证消息不被重复消费?
    生产时消息重复

    为什么会出现消息重复?消息重复的原因有两个:

    • 1.生产时消息重复,

    • 2.消费时消息重复。

    由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。

    生产者中如果消息未被确认,或确认失败,可以使用定时任务+(redis/db)来进行消息重试。

    消费时消息重复

    以订单ID作为消息ID,即可保证消息的幂等性,消费过程为:

    • 消费者获取到消息后先根据id去查询redis/db是否存在该消息;

    • 如果不存在,则正常消费,消费完毕后写入redis/db;

    • 如果存在,则证明消息被消费过,直接丢弃。

  • 相关阅读:
    DNS服务器原理简述、搭建主/从DNS服务器并实现智能解析
    JQuery02
    JQuery01
    python05
    python04
    python03
    Liunx命令
    Python运算符及注释
    python01
    原生JDBC+mybatis
  • 原文地址:https://www.cnblogs.com/typ1805/p/14648552.html
Copyright © 2011-2022 走看看