1.RabbitMQ简介
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。
官网:http://www.rabbitmq.com/
2.Spring集成RabbitMQ
2.1pom.xml文件:
<!--rabbitmq依赖 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> <!-- 如果没有这段,上面也会将其拉进来 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.7.1.RELEASE</version> </dependency> <!-- 这个必要的 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.6</version> </dependency>
2.2 spring-mq.properties
rmq.host = 192.168.XX.XX //rabbitmq服务器ip地址 rmq.port = 5672 //端口 rmq.producer.num = 20 //发消息生产者的最大数,没有这个需求可以不写 rmq.user = admin //用户名 rmq.password = admin //密码
2.3 spring-mq.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <rabbit:connection-factory id="connectionFactory" host="${rmq.host}" username="${rmq.user}" password="${rmq.password}" port="${rmq.port}" /> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!--队列 --> <!-- 说明: durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 --> <rabbit:queue name="my_first_queue" auto-declare="true" durable="true" /> <!-- 任务下发交换机 --> <!-- 说明: rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 rabbit:binding:设置消息queue匹配的key --> <rabbit:direct-exchange name="mq-exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding key="my_first_queue" queue="my_first_queue" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 消息转换器声明 消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消费者 --> <bean id="myCustomer" class="com.ln.mq.Customer"> <!-- 消费者方法要有相应的set方法 --> <property name="converter" ref="jsonMessageConverter" /> </bean> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="my_first_queue" ref="myCustomer" /> </rabbit:listener-container> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" exchange="mq-exchange" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> </beans>
说明:
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
durable:是否持久化
exclusive: 仅创建者可以使用的私有队列,断开后自动删除
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
<rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="test_queue_key" key="test_queue_key"/> </rabbit:bindings> </rabbit:direct-exchange>
rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。
rabbit:binding:设置消息queue匹配的key
2.4 web.xml
<!-- 加载spring容器 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath*:applicationContext.xml classpath*:spring-mq.xml </param-value> </context-param>
3.测试
3.1 生产者测试类
package com.ln.mq; import javax.annotation.Resource; import org.junit.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import com.ln.web.controller.TestBase; /** * 生产者 * */ public class Producer extends TestBase{ private static final String MY_FITST_QUEUE="my_first_queue"; @Resource private AmqpTemplate amqpTemplate; @Test public void sendMessage(){ System.out.println("*******生产者********"); String message="hello my first queue"; amqpTemplate.convertAndSend(MY_FITST_QUEUE, message); } }
3.2 消费者测试类
package com.ln.mq; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.MessageConverter; import com.rabbitmq.client.Channel; /** * 消费者 * */ public class Customer implements ChannelAwareMessageListener{ protected MessageConverter converter; public void setConverter(MessageConverter converter) { this.converter = converter; } @Override public void onMessage(Message message, Channel channel) throws Exception { Object fromMessage = converter.fromMessage(message); System.out.println("***********消费者********"); System.out.println("***********接收到的Message:"+fromMessage.toString()); } }
3.3 运行
运行生产者Producer测试junit
*******生产者********
*******消费者********
*******接收到的Message:hello my first queue
3.4 rabbitmq视图
可以查看队列消息情况
地址:http://localhost:15672
我的是本地的服务,如果要链接远程服务,localhost换成服务器ip。