RabbitMQ学习笔记
1、为什么会产生消息队列(MQ)?
不同进程之间相互通信时,两者如果耦合度过高,改动一个进程则另一个进程也必须改动。为了降低两个进程的耦合度,于是抽离出来一个模块,用来管理两个进程相互通信的消息。
更多详细介绍请参阅
2、设置虚拟主机
1、新建一个虚拟主机(每个虚拟主机相当于MySql中的每个数据库)
2、将登录用户添加到虚拟主机(下图中,guest用户没有添加,则guest用户没有访问/lxy这个虚拟主机的权限)
测试:
新建一个Test类
@Test
public void test1(){
rabbitTemplate.convertAndSend("exchange.direct","lixingyu1",new Person(1,"Rlxy93",21,"重庆"));
System.out.println("发送消息成功!");
}
如果是guest用户(报错,因为guest用户没有访问指定虚拟路径的权限):
org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:510)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:751)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2092)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2065)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1004)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1070)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1063)
at cn.lixingyu.springadvanced.RabbitPractice.test1(RabbitPractice.java:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:403)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1115)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:526)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:473)
... 38 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/lxy' refused for user 'guest', class-id=10, method-id=40)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 43 more
3、RabbitMQ的5种模式
生产者/消费者模式
新建一个Test类
package cn.lixingyu.springadvanced;
import cn.lixingyu.springadvanced.entity.Person;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Rlxy93
* @time 2019/10/21 18:02
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitPractice {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
//发送
rabbitTemplate.convertAndSend("exchange.direct","lixingyu1",new Person(1,"Rlxy93",21,"重庆"));
System.out.println("发送成功!");
//接收
Object lixingyu1 = rabbitTemplate.receiveAndConvert("lixingyu1");
System.out.println(lixingyu1.getClass());
System.out.println((Person)lixingyu1);
System.out.println("接收成功!");
}
}
运行效果:
Work模式
private final String QUEUE_NAME = "Rlxy93";
@Test
public void test1(){
for (int i = 0;i<10;++i){
rabbitTemplate.convertAndSend("exchange.direct",QUEUE_NAME,new Person(i,"Rlxy93",21,"重庆"));
System.out.println("发送成功!"+i);
}
}
@RabbitListener(queues = QUEUE_NAME)
public void rabbitListener1(Person p){
System.out.println("rabbitListener1接收成功!");
System.out.println(p);
}
@RabbitListener(queues = QUEUE_NAME)
public void rabbitListener2(Person p){
System.out.println("rabbitListener2接收成功!");
System.out.println(p);
}
运行结果:
ps:
1、两个listener接收到的消息是不同的。
2、两个listener貌似是有规律的接收的。
3、两个listener接收到的消息数量是相同的。
其中就牵扯到轮询(round-robin)分发消息的概念。
轮询分发:在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
轮询分发暴露出来的弊端:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。
于是有了公平分发:
在application.properties中加
#设置同一时刻服务器只会发1条消息给消费者
spring.rabbitmq.listener.simple.prefetch=1
测试代码:
package cn.lixingyu.springadvanced;
import cn.lixingyu.springadvanced.entity.Person;
import com.rabbitmq.client.Channel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
/**
* @author Rlxy93
* @time 2019/10/21 18:02
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitPractice {
@Autowired
private RabbitTemplate rabbitTemplate;
private final String QUEUE_NAME = "Rlxy93";
@Test
public void test1() {
for (int i = 0; i < 16; ++i) {
rabbitTemplate.convertAndSend("Rlxy93", QUEUE_NAME, new Person(i, "Rlxy93", 21, "重庆"));
}
}
@RabbitListener(queues = QUEUE_NAME)
@RabbitHandler
public void rabbitListener1(Person p, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
Thread.sleep(500);
System.out.println("RabbitMQListener1接收成功!" + p);
}
@RabbitListener(queues = QUEUE_NAME)
@RabbitHandler
public void rabbitListener2(Person p, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
System.out.println("RabbitMQListener2接收成功!" + p);
}
}
运行效果:
订阅模式
学习地址:springboot整合rabbitmq的五种模式示例
广播模式
1、首先新建一个exchange,名叫Rlxy93.fanout,把模式改成fanout。
2、再在Queue中,新建两个和@RabbitListener里@Queue的名字一样的两个Queue,分别把Rlxy93.fanout给绑定上。
代码
package cn.lixingyu.springadvanced;
import cn.lixingyu.springadvanced.entity.Person;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Rlxy93
* @time 2019/10/22 22:54
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutPractice {
@Autowired
private RabbitTemplate rabbitTemplate;
private final String EXCHANGE = "Rlxy93.fanout";
@Test
public void test() {
for (int i = 0; i < 1; ++i) {
rabbitTemplate.convertAndSend(EXCHANGE, "", new Person(i, "Rlxy93", 21, "重庆"));
}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Rlxy93", durable = "true"),
exchange = @Exchange(value = EXCHANGE,type = ExchangeTypes.FANOUT)))
public void rabbitListener1(Person p) {
System.out.println("RabbitMQListener1..." + p);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Rlxy931", durable = "true"),
exchange = @Exchange(value = EXCHANGE,type = ExchangeTypes.FANOUT)))
public void rabbitListener2(Person p) {
System.out.println("RabbitMQListener2..." + p);
}
}
运行效果:
路由模式
新建一个类:
package cn.lixingyu.springadvanced;
import cn.lixingyu.springadvanced.entity.Person;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Rlxy93
* @time 2019/10/22 23:29
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class DirectPractice {
@Autowired
private RabbitTemplate rabbitTemplate;
private final String EXCHANGE = "Rlxy93.direct";
@Test
public void test() {
for (int i = 0; i < 1; ++i) {
rabbitTemplate.convertAndSend(EXCHANGE, "lxxxxxxy", new Person(i, "Rlxy93", 21, "重庆"));
}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Rlxy93", durable = "true"),
exchange = @Exchange(value = EXCHANGE,type = ExchangeTypes.DIRECT), key = {"lxxxxxxy"}))
public void rabbitListener1(Person p) {
System.out.println("RabbitMQListener1..." + p);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Rlxy93"),
exchange = @Exchange(value = EXCHANGE,type = ExchangeTypes.DIRECT), key = {"direct"}))
public void rabbitListener2(Person p) {
System.out.println("RabbitMQListener2..." + p);
}
}
运行效果:
ps:
1、如果监听的两个Queue名称相同,则寻找key值对应的,如果没有找到对应的key值,则返回空。
Queue相同,找到对应key值。
Queue相同,没有对应key值,返回空。
2、如果监听的两个Queue名称不同,则直接找存在key值的Queue,不再找对应的key值,如果不存在带有相同key值的Queue,则返回空。
Queue不同,找存在的对应的key值。
Queue不同,找不存在的key值,返回空。
3、如果监听的两个Queue名称相同,在RabbitMQ中两个都有对应的key,则把两个都返回。
Queue相同,key也相同,两个都能找到。
Topic(主题模式)
首先在RabbitMQ中绑定两个Queue
新建一个类
package cn.lixingyu.springadvanced;
import cn.lixingyu.springadvanced.entity.Person;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Rlxy93
* @time 2019/10/23 08:23
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicPractice {
@Autowired
private RabbitTemplate rabbitTemplate;
private final String TOPIC = "Rlxy93.topic";
@Test
public void test() {
for (int i = 0; i < 1; ++i) {
rabbitTemplate.convertAndSend(TOPIC, "Insert.Rlxy93", new Person(i, "Insert.Rlxy93", 21, "重庆"));
rabbitTemplate.convertAndSend(TOPIC, "Delete.Rlxy93", new Person(i, "Delete.Rlxy93", 21, "重庆"));
rabbitTemplate.convertAndSend(TOPIC, "Delete.Rlxy93", new Person(i, "Delete.Rlxy93", 21, "重庆"));
}
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Rlxy93", durable = "true"),
exchange = @Exchange(value = TOPIC, type = ExchangeTypes.TOPIC), key = {"Insert.*"}))
public void rabbitListener1(Person p) {
System.out.println("RabbitMQListener1..." + p);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "Rlxy931", durable = "true"),
exchange = @Exchange(value = TOPIC, type = ExchangeTypes.TOPIC), key = {"Delete.*"}))
public void rabbitListener2(Person p) {
System.out.println("RabbitMQListener2..." + p);
}
}
运行结果:
ps:Topic主题模式可以根据通配符来绑定不同的消息。