一、linux下安装rabbitmq
1、安装erlang环境
wget http://erlang.org/download/otp_src_18.2.1.tar.gz tar xvfz otp_src_18.2.1.tar.gz cd otp_src_18.2.1 ./configure make install
2、安装RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/vx.x.x/rabbitmq-server-generic-unix-x.x.x.tar.xz //xy文件压缩工具 yum install xz //解压 xz -d rabbitmq-server-generic-unix-x.x.x.tar.xz tar -xvf rabbitmq-server-generic-unix-x.x.x.tar //将其移动至/usr/local/下 按自己习惯 cp -r rabbitmq_server-x.x.x /usr/local/rabbitmq //改变环境变量 vi /etc/profile export PATH=/usr/local/rabbitmq/sbin:$PATH source /etc/profile //启用MQ管理方式 rabbitmq-plugins enable rabbitmq_management #启动后台管理 rabbitmq-server -detached #后台运行rabbitmq //设置端口号 可供外部使用 iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
3、添加用户和权限
//添加用户 rabbitmqctl add_user admin admin //添加权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" //添加用户角色 rabbitmqctl set_user_tags admin administrator
二、Spring mvc整合RabbitMQ
1、添加pom.xml依赖jar包
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.5.RELEASE</version> </dependency>
2、添加配置applicationContext.xml
<!--配置rabbitmq开始--> <bean id="connectionFactoryMq" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="192.168.181.201"/> <property name="username" value="admin"/> <property name="password" value="admin"/> <property name="host" value="192.168.181.201"/> <property name="port" value="5672"/> </bean> <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> <constructor-arg ref="connectionFactoryMq"/> </bean> <!--创建rabbitTemplate消息模板类--> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="connectionFactoryMq"/> </bean> <!--创建消息转换器为SimpleMessageConverter--> <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"> </bean> <!--创建持久化的队列--> <bean id="queue" class="org.springframework.amqp.core.Queue"> <constructor-arg index="0" value="testQueue"></constructor-arg> <constructor-arg index="1" value="true"></constructor-arg> <constructor-arg index="2" value="false"></constructor-arg> <constructor-arg index="3" value="true"></constructor-arg> </bean> <!--创建交换器的类型 并持久化--> <bean id="topicExchange" class="org.springframework.amqp.core.TopicExchange"> <constructor-arg index="0" value="testExchange"></constructor-arg> <constructor-arg index="1" value="true"></constructor-arg> <constructor-arg index="2" value="false"></constructor-arg> </bean> <util:map id="arguments"> </util:map> <!--绑定交换器 队列--> <bean id="binding" class="org.springframework.amqp.core.Binding"> <constructor-arg index="0" value="testQueue"></constructor-arg> <constructor-arg index="1" value="QUEUE"></constructor-arg> <constructor-arg index="2" value="testExchange"></constructor-arg> <constructor-arg index="3" value="testQueue"></constructor-arg> <constructor-arg index="4" value="#{arguments}"></constructor-arg> </bean> <!--用于接收消息的处理类--> <bean id="rqmConsumer" class="com.slp.mq.RmqConsumer"></bean> <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="rqmConsumer" /> <property name="defaultListenerMethod" value="rmqProducerMessage"></property> <property name="messageConverter" ref="serializerMessageConverter"></property> </bean> <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,监听队列 queues可以传多个--> <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="queues" ref="queue"></property> <property name="connectionFactory" ref="connectionFactoryMq"></property> <property name="messageListener" ref="messageListenerAdapter"></property> </bean> <bean id="rmqProducer" class="com.slp.mq.RmqProducer"></bean> <!--配置rabbitmq结束-->
3、消息实体类
package com.slp.mq; import java.io.*; /** * @author sanglp * @create 2018-02-06 14:00 * @desc rabbit消息类 **/ public class RabbitMessage implements Serializable { /** * 参数类型 */ private Class<?>[] paramTypes ; /** * 交换器 */ private String exchange; private Object[] params; /** * 路由key */ private String routekey; public RabbitMessage() { } public RabbitMessage(String exchange, String routekey,Object...params) { this.exchange = exchange; this.params = params; this.routekey = routekey; } @SuppressWarnings("rawtypes") public RabbitMessage(String exchange,String routeKey,String methodName,Object...params) { this.params=params; this.exchange=exchange; this.routekey=routeKey; int len=params.length; Class[] clazzArray=new Class[len]; for(int i=0;i<len;i++) { clazzArray[i] = params[i].getClass(); } this.paramTypes=clazzArray; } public byte[] getSerialBytes(){ byte[] res = new byte[0]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutput oos ; try { oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.close(); res = baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return res; } public Class<?>[] getParamTypes() { return paramTypes; } public void setParamTypes(Class<?>[] paramTypes) { this.paramTypes = paramTypes; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } public String getRoutekey() { return routekey; } public void setRoutekey(String routekey) { this.routekey = routekey; } }
4、生产者
package com.slp.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import javax.annotation.Resource; /** * @author sanglp * @create 2018-02-06 14:19 * @desc 生产者 **/ public class RmqProducer { @Resource private RabbitTemplate rabbitTemplate; /** * 发送信息 * @param msg */ public void sendMessage(RabbitMessage msg){ System.out.println(rabbitTemplate.getConnectionFactory().getHost()); System.out.println(rabbitTemplate.getConnectionFactory().getPort()); System.out.println("msg"+msg); rabbitTemplate.convertAndSend(msg.getExchange(),msg.getRoutekey(),msg); System.out.println("发送完成"); } }
5、消费者
package com.slp.mq; /** * @author sanglp * @create 2018-02-06 14:23 * @desc 消费者 **/ public class RmqConsumer { public void rmqProducerMessage(Object object){ System.out.println("消费前"); RabbitMessage rabbitMessage = (RabbitMessage) object; System.out.println(rabbitMessage.getExchange()); System.out.println(rabbitMessage.getRoutekey()); System.out.println(rabbitMessage.getParams().toString()); } }
6、测试类
package com.slp; import com.slp.mq.RabbitMessage; import com.slp.mq.RmqConsumer; import com.slp.mq.RmqProducer; import org.junit.Before; import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * @author sanglp * @create 2018-02-06 14:36 * @desc mq测试类 **/ public class MqTest { private RmqProducer rmqProducer ; private RmqConsumer rqmConsumer ; @Before public void setUp() throws Exception { //ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("D:/web-back/web-back/myweb/web/WEB-INF/applicationContext.xml"); //context.start(); String path="web/WEB-INF/applicationContext.xml"; ApplicationContext context = new FileSystemXmlApplicationContext(path); rmqProducer = (RmqProducer) context.getBean("rmqProducer"); rqmConsumer = (RmqConsumer)context.getBean("rqmConsumer"); } @Test public void test(){ String exchange = "testExchange"; String routeKey ="testQueue"; String methodName = "test"; //参数 for (int i=0;i<10;i++){ Map<String,Object> param=new HashMap<String, Object>(); param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param); //发送消息 rmqProducer.sendMessage(msg); } // rqmConsumer.rmqProducerMessage(msg); } }
运行结果:
没有开启消费者之前: