一、概述:
大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。
1.消息服务中两个重要概念:
消息代理(message broker)和目的地(destination)当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
2.消息队列主要有两种形式的目的地:
①.队列(queue):点对点消息通信(point-to-point)
②.主题(topic):发布(publish)/订阅(subscribe)消息通信
3.点对点式:
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列消息只有唯一的发送者和接受者,但并不是说只能有一个接收者;
4.发布订阅式:
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息;
5.jms(Java Message Service)JAVA消息服务:
基于M消息代理的规范。 ActiveMQ、 HornetMQ是MS实现;
6. AMQP (Advanced Message Queuing Protocol):
①高级消息队列协议,也是一个消息代理的规范,兼容JMS;②RabbitMQ是AMQP的实现;
7. Spring支持
-spring-jms提供了对MS的支持
- spring-rabbit-提供了对AMQP的支持
-需要 Connection Factory的实现来连接消息代理
-提供 JmsTemplate、 RabbitTemplate来发送消息
-@JmsListenerJMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
-@EnableJms、@ EnableRabbit开启支持
8. Spring Boot自动配置
①JmsAutoConfiguration
②RabbitAutoConfiguration
9、市面的MQ产品
ActiveMQ. RabbitMQ. RocketMQ. Kafka
二、RabbitMQ概念图
- Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
- Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
- Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
- Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
- Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
- Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据
三、3种Exchange模式
1、Direct Exchange
消息中的路由键( routing key)如果和Binding中的 bindingkey一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog,则只转发 routing key标记为“dog的消息,不会转发“dogpuppy",也不会转发“dogguard等等。它是完全匹配、单播的模式。
2、Fanout Exchange
每个发到 fanout类型交换器的消息都会分到所有绑定的队列上去。 fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。 fanout类型转发消息是最快的。
3、Topic Exchange
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#和符号“*”。#匹配0个或多个单词,*匹配一个单词。
四、消息确认机制:
1、可靠抵达-ConfirmCallback
•spring.rabbitmqpublisher-confirms=true
一在创建connectionFactory的时候设置PubIisherConfirms(true)选项,开启confirmcallback。
一CorrelationData.用来表示当前消息唯一性。
一消息只要被broker接收到就会执行confirmCallback,如果是cluster(手动)模式,需要所有broker接收到才会调用confirmCallback0
一被broker接收到只能表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里。所以需要用到接下来的returnCallback。
2、可靠抵达-ConfirmCaIIback
•spring.rabbitmqpublisher-confirms=true
一在创建connectionFactory的时候设置PubIisherConfirms(true)选项,开启confirmcallback。
一CorrelationData:用来表示当前消息唯一性。
一消息只要被broker接收到就会执行confirmCallback,如果是cluster模式,需要所有broker接收到才会调用confirmCallback;
一被broker接收到只能表示message已经到达服务器,并不能保证消息一定会被投递到目标queue里。所以需要用到接下来的returnCallback。
3、可靠抵达一Ack消息确认机制
•消费者获取到消息,成功处理,可以回复Ack给Broker
一basic.ack用于皆定确认;broker将移除此消息
—basic.nack#于否定确认;可以指定broker是否丢弃此消息,可以批量
-basic.reject用于否定确认.同上,但不能批量
•默认,消息被消费者收到,就会从broker的queue中移除
•queue无消费者,消息依然会被存储,直到消费者消费
•消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
一消息处理成功ack(),接受下一个消息此消息broker就会移除;
一消息处理失败,nack()/reject(),重新发送给具他人进行处理,或者容惜处理后ack;
一消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人;
此时客户端断开,消息不会被broke除,会投涕给别人
五、代码演示
1、主启动类
/** * 使用 RabbitMQ * 1、引入amp场景; RabbitAutoConfiguration就会自动生效 * 2、给容器自动配置了RabbitTemplate、AmqpAdmin、CachingConnecteionFactory、RabbitMessagingTemplate所有的属性都是 spring.rabbitmq * ConfigurationProperties(prefix ="spring. rabbitmq") * public class RabbitProperties * 3、给配置文件中配置 spring、rabbitmq 信息 * 4、EnableRabbit:EnableXxxxx开启功能 * 5、监听消息:使用 eRabbitListener;必须有 EnableRabbit * Rabbitlistener:类+方法上(监听哪些队列即可) * RabbitHandLer:标在方法上(重载区分不同的消息) */ @EnableRedisHttpSession @EnableRabbit @EnableDiscoveryClient @SpringBootApplication public class GulimallOrderApplication { public static void main(String[] args) { SpringApplication.run(GulimallOrderApplication.class, args); } }
2、配置类
@Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; /** * 使用JSON序列化机制,进行消息转移 */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate */ @PostConstruct//MyRabbitConfig对象创建以后执行这个方法 public void initRabbitTemplate(){ //设置服务端收到消息确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** *只要消息服务器broker,不论消息是否被消费,b都为true * @param correlationData 发送消息是当前消息唯一关联数据(id) * @param b 消失成功还是失败 * @param s 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { } }); //设置消息抵达队列确认回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有都递给指定队列,就出发这个失败回调 * @param message 投递失败消息的详细信息 * @param i 回复的状态码 * @param s 回复文本的内容 * @param s1 当时这个消息发给哪个交换机 * @param s2 当时这个消息用哪个路由键 */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { } }); //客户端确认:1、默认是自动确认,只要消息接收到,客户端会自动确认,服务器会自动移除这个消息 /** * 问题:一旦接受多个消息,自动回复给服务器,只要一个消息处理成功,此时突然宕机服务器仍会删除所有消息 * 解决办法:手动确认 * spring.rabbitmq.listener.direct.acknowledge-mode=manual * 只要没有明确通知mq,消息被确认签收,则消息一直是unacked状态,即使服务器突然宕机,消息也不会丢失,会重置为Ready。 * 利用channel.basicAck()确认消息被签收 */ } }
其余代码
@Controller public class RabbitController { @Autowired RabbitTemplate template; @GetMapping("/sendMq") public String sendMq( @RequestParam(vlaue = "num", devaultValue = "10") Integer num) { for (int i = 0; i < 10; i++) { if ((i % 2) == 0) { OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity(); reasonEntity.setId(1L); reasonEntity.setCreateTime(new Date()); reasonEntity.setName("哈哈-" + i); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity); } else { OrderEntity entity = new OrderEntity(); entity.setOrderSn(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("helo-java-exchange", "hello.java", entity); } } return "ok"; } }
/** * * queues声明需要监听的所有队列 * org. springframework. amap. core. Message * * 参数可以写一下类型 * 1、 Message message:原生消息详细信息。头+体 * 2、T<发送的消息的类型> OrderReturnReasonE content * 3、 Channel channeL:当前传输数据的通道 * * Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息场景: * 1)、订单服务启动多个;同一个消息,只能有一个客户端收到 * 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息 * */ @RabbitListener(queues = {"hello-java-queue"}) @Service public class OrderItemServiceImpl { // Rabbitl istener(queues {" heLlo-javo-queue")) @RabbitHandler public void recieveMessage(Message message,OrderReturnReasonEntity content, Channel channel)throws InterruptedException { //{"id": 1, "name": "" "sort": nult"status": nuLL, "createTime": 1581144531744) System.out.println("接收到消息" + content); byte[] body = message.getBody(); //消息头属性信息 MessageProperties properties = message.getMessageProperties(); Thread.sleep(3000); System.out.println("消息处理完成=>" + content.getName()); } @RabbitHandler public void recieveMessage2(OrderEntity content) throws InterruptedException { //{"id":1,"ngme":"哈哈","sort":null,"status":null,"createTime":1581144531744 } System.out.println("接收到消息" + content); } }
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步方式优先回调我们这个renturnconfirm确认
spring.rabbitmq.template.mandatory=true
#手动确认消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual