一、RabbitMQ简介
1.1、rabbitMQ的优点(适用范围)
1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。
2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。
3. 有消息确认机制和持久化机制,可靠性高。
4. 开源
其他MQ的优势:
1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。
1.2、几个概念说明
producer&consumer
producer指的是消息生产者,consumer消息的消费者。
Queue
消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
设置为临时队列,queue中的数据在系统重启之后就会丢失
设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange
Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
Direct
直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
fanout
广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
topic
主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
headers
消息体的header匹配(ignore)
Binding
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
virtual host
在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。
二、Rbbitmq与Spring、springMVC结合使用实例
1. MessageProducer
1 MessageProducerpackage com.hjp.rabbitmq.rabbitmq.samples3; 2 3 import javax.annotation.Resource; 4 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.core.AmqpTemplate; 8 import org.springframework.stereotype.Service; 9 10 @Service 11 public class MessageProducer { 12 13 private Logger logger = LoggerFactory.getLogger(MessageProducer.class); 14 15 @Resource 16 private AmqpTemplate amqpTemplate; 17 18 public void sendMessage(Object message) { 19 20 logger.info("to send message:{}", message); 21 amqpTemplate.convertAndSend("queueTestKey", message); 22 23 } 24 }
2.MessageConsumer
package com.hjp.rabbitmq.rabbitmq.samples3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class MessageConsumer implements MessageListener { private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @Override public void onMessage(Message message) { logger.info("receive message:{}",message); } }
3.rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="localhost" port="5672" /> <!--定义rabbit template用于数据的接收和发送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" /> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--定义queue --> <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" /> <!-- 定义direct exchange,绑定queueTest --> <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息接收者 --> <bean id="messageReceiver" class="com.hjp.rabbitmq.rabbitmq.samples3.MessageConsumer"></bean> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueTest" ref="messageReceiver"/> </rabbit:listener-container> </beans>
4.application.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitmq.xml" /> <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans --> <context:component-scan base-package="com.hjp.rabbitmq.rabbitmq.samples3" /> <!-- 激活annotation功能 --> <context:annotation-config /> <!-- 激活annotation功能 --> <context:spring-configured /> </beans>
5.log4j配置
log4j.rootLogger=DEBUG,Console,Stdout
#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
log4j.logger.java.sql.ResultSet=INFO
log4j.logger.org.apache=INFO
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG
log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender
#log4j.appender.Stdout.File = E://logs/log.log
log4j.appender.Stdout.Append = true
log4j.appender.Stdout.Threshold = DEBUG
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
代码下载:https://github.com/jianpingh/rabbitmq-samples