一 rabbitmq 介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced MessageQueue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,
消息队列在分布式系统开发中应用非常广泛
RabbitMQ官方地址:http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景:
1、任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
为什么使用RabbitMQ呢?
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
6 可以避免分布式事务问题,从而解决了分布式事务导致的速度慢的问题
二 消息协议
Amqp:
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。elang中的实现有Rabbitmq等。
JMS:

组成部分说明如下:
Broker :消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息
问题一:消息会丢失吗?
producer到broker中可以设置一个确认模式,当确认了之后,才确认已经发送,因为开启事务是一个比较慢的操作,所以采用确认模式。
cousumer到broker中,可以设置一个回签模式,当消息确认被消费之后,才停止发送,可以采用重试机制。
当然也可以采用这用,在发送的时候,放入broker中,同事存入redis数据中,然后进行对账。
问题二·:万一挂了怎么办?
首先在目前市场上,不是所有的公司都有1秒内100万订单,导致他挂了,或者更多的订单,像我同事以前公司,某旅游网,一天的订单说只有几十万,而且是全球性事务,所以不是所有的都必须考虑
只是在某些时候,面试时候,那些蠢得面试官过于纠结这些方面。只是我们需要会用就可以了,并不需要过多于在这个方面纠结,设置回签机制,确认机制,在某种程度上,牺牲了速度,那么不使用消息中间件,可以使用disruptor这个内存中间队列。或者使用阿里得rocketmq,毕竟他是经过双十一洗礼得。
问题三:幂等性问题
就是如何防止重复消息,可以采用一个唯一标识来避免这个问题
四 hello-world案例:
连接rabbitmq就和连接mysql的形式差不多,首先采用最简单的方法进行操作:
producer代码:
package com.cxy.rabbitmq.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Hellowolrd { private static final String QUEUE ="HELLO-WORLD"; public static void main(String[] args) throws Exception{ //首先建立连接 Connection connection=null; Channel channel=null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); //通过连接工厂获取tcp连接 connection = connectionFactory.newConnection(); //创建通道 channel = connection.createChannel(); /** * 声明队列,如果Rabbit中没有此队列将自动创建 * * param1:队列名称 * * param2:是否持久化 * * param3:队列是否独占此连接 * * param4:队列不再使用时是否自动删除此队列 * * param5:队列参数 */ channel.queueDeclare(QUEUE,true,false,false,null); String message ="hello -world"; /** * 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 */ /** * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 示绑定或解除绑定 * 默认的交换机,routingKey等于队列名称 */ channel.basicPublish("",QUEUE,null,message.getBytes()); System.out.println(message); }catch (Exception e){ e.printStackTrace(); }finally { if(channel!=null) { channel.close(); } if(connection != null) { connection.close(); } } } }
步骤:发送端操作流程
1)创建连接
2)创建通道
3)声明队列
4)发送消息
consumer代码:
package com.cxy.rabbitmq.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class HelloWorldConsumer { //队列 private static final String QUEUE = "HELLO-WORLD"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); //监听队列 //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE,true,false,false,null); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE,true,defaultConsumer); } }
接收端
1)创建连接
2)创建通道
3)声明队列
4)监听队列
5)接收消息
6)发送ack
pom依赖:
<dependencies> <!-- <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.3</version><!–此版本与spring boot 1.5.9版本匹配–> </dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies>
再看控制就收消息:
receive message:hello -world
这个是rabbimq的控制台
五 rabbitmq的工作模式有多少种呢:
RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
workQueue的模式演示图:
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
Publish/Subscribe 发布订阅模式:
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
3使用场景:就是当用户注册时候,需要发送邮件和短信,两个是同时进行的。那么这种非常符合要求的,例如在用netty做群聊的时候,也是可以的,当消费者监听到
自己队列的消息,拉去就可以了,
代码:这个地方会涉及到两个消费者的代码,一个是发送短信,一个是发送邮件
package com.cxy.rabbitmq.ps; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PsDemo { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ for(int i=0;i<5;i++){ //消息内容 String message = "send inform message to user"; channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费者一:
package com.cxy.rabbitmq.ps; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PsDemoConsumer { //队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, ""); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
消费者二:
package com.cxy.rabbitmq.ps; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PsDemocoun2 { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, ""); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
这两种比较:
1、publish/subscribe与work queues有什么区别。
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认
交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑
定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实质工作用什么 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换
机。
routing工作模式
1 消息发送到交换机的时候,必须要绑定一个routingkey,然后交换机会根据routingkey来判断该发送给哪个队列
2 上图表示的意思就是,生产者,将消息发送给交换机,指定路由key
3 消费者 消费者监听自己的队列,然后会根据指定的rouetingkey,去消费消息
生产者代码示例:
package com.cxy.rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RoutingP { private static final String QUEUE_INFORM_EMAIL="queue_inform_email"; private static final String QUEUE_INFORM_SMS="queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing"; private static final String ROUTINGKEY_EMAIL="inform_email"; private static final String ROUTINGKEY_SMS="inform_sms"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新连接 connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 channel = connection.createChannel(); //声明队列,如果队列在mq 中没有则要创建 //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform"); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform"); //发送消息 //参数:String exchange, String routingKey, BasicProperties props, byte[] body /** * 参数明细: * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"") * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * 3、props,消息的属性 * 4、body,消息内容 */ /* for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send email inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send sms inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes()); System.out.println("send to mq "+message); }*/ for(int i=0;i<5;i++){ //发送消息的时候指定routingKey String message = "send inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭连接 //先关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
看控制台:
消费者代码:
短信:
package com.cxy.rabbitmq.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RoutingEmail { //队列名称 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; private static final String ROUTINGKEY_EMAIL="inform_email"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer); } }
email:
package com.cxy.rabbitmq.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RouSms { //队列名称 private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform"; private static final String ROUTINGKEY_SMS="inform_sms"; public static void main(String[] args) throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.5.131"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); //建立新连接 Connection connection = connectionFactory.newConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); /** * 参数明细 * 1、queue 队列名称 * 2、durable 是否持久化,如果持久化,mq重启后队列还在 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除) * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间 */ channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //声明一个交换机 //参数:String exchange, String type /** * 参数明细: * 1、交换机的名称 * 2、交换机的类型 * fanout:对应的rabbitmq的工作模式是 publish/subscribe * direct:对应的Routing 工作模式 * topic:对应的Topics工作模式 * headers: 对应的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //进行交换机和队列绑定 //参数:String queue, String exchange, String routingKey /** * 参数明细: * 1、queue 队列名称 * 2、exchange 交换机名称 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串 */ channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); //实现消费方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /** * 当接收到消息后此方法将被调用 * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume * @param envelope 信封,通过envelope * @param properties 消息属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message= new String(body,"utf-8"); System.out.println("receive message:"+message); } }; //监听队列 //参数:String queue, boolean autoAck, Consumer callback /** * 参数明细: * 1、queue 队列名称 * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 * 3、callback,消费方法,当消费者接收到消息要执行的方法 */ channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer); } }
通配符模式:
private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#";
# 可以代表空,也可以代表字符,所以当发送 inform.sms.emial 时候两个都可以接受到
五:springboot整合rabbitmq
首先引入pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> <relativePath/> </parent> <artifactId>test-rabbitmq-producer</artifactId> <dependencies> <!-- <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.3</version><!–此版本与spring boot 1.5.9版本匹配–> </dependency>--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> </project>
配置类:
package com.cxy.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
。yml文件:
server: port: 44000 spring: application: name: test-rabbitmq-producer rabbitmq: host: 192.168.5.131 port: 5672 username: guest password: guest virtualHost: /
不需要额外的配置
测试代码:
package com.cxy.rabbitmq; import com.alibaba.fastjson.JSON; import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @SpringBootTest @RunWith(SpringRunner.class) public class Producer05_topics_springboot { @Autowired RabbitTemplate rabbitTemplate; //使用rabbitTemplate发送消息 @Test public void testSendEmail(){ String message = "send email message to user"; /** * 参数: * 1、交换机名称 * 2、routingKey * 3、消息内容 */ rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.email",message); } //使用rabbitTemplate发送消息 @Test public void testSendPostPage(){ Map message = new HashMap<>(); message.put("pageId","5a795ac7dd573c04508f3a56"); //将消息对象转成json串 String messageString = JSON.toJSONString(message); //路由key,就是站点ID String routingKey = "5a751fab6abb5044e0d19ea1"; /** * 参数: * 1、交换机名称 * 2、routingKey * 3、消息内容 */ rabbitTemplate.convertAndSend("ex_routing_cms_postpage",routingKey,messageString); } }
消费者代码:
package com.cxy.rabbitmq.mq; import com.rabbitmq.client.Channel; import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ReceiveHandler { @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) public void send_email(String msg,Message message,Channel channel){ System.out.println("receive message is:"+msg); } }