1.添加pom依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.2.RELEASE</version> </dependency>
2.配置spring-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <rabbit:connection-factory id="mqconnectionFactory" host="${mq.host}" port="${mq.port}" username="${mq.username}" password="${mq.password}"/> <!--<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">--> <!--<property name="host" ref="${mq.host}" />--> <!--<property name="port" value="${mq.port}" />--> <!--<property name="username" value="${mq.username}" />--> <!--<property name="password" value="${mq.password}" ></property>--> <!--</bean >--> <rabbit:admin connection-factory="mqconnectionFactory" /> <!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 --> <rabbit:template id="amqpTemplate" connection-factory="mqconnectionFactory"/> <rabbit:queue name="miaosha.queue" /> <!-- <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" > <rabbit:bindings> <rabbit:binding queue="miaosha.queue" /> </rabbit:bindings> </rabbit:direct-exchange>--> <!-- 配置consumer, 监听的类和queue的对应关系 --> <!--定义监听器,当收到消息时会执行里面的配置--> <rabbit:listener-container connection-factory="mqconnectionFactory" acknowledge="manual" > <rabbit:listener ref="consumer" method="receive" queue-names="miaosha.queue"/> </rabbit:listener-container> <bean id="consumer" class="com.shop.rabbitmq.MQConsumer"/> </beans>
在spring-dao.xml进行引入
<bean class="com.shop.util.EncryptPropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:jdbc.properties</value> <value>classpath:redis.properties</value> <value>classpath:global.properties</value> </list> </property> <property name="fileEncoding" value="utf-8"/> </bean>
3.编写MQProducer接口及其实现类(注意是在Service包下)
package com.shop.service; import com.shop.rabbitmq.MQMessage; /** * Created by Skye on 2018/7/7. */ public interface MQProducer { String MIAOSHA_QUEUE = "miaosha.queue"; String QUEUE = "queue"; String TOPIC_QUEUE1 = "topic.queue"; /** * 发送消息到指定队列 * @param msg */ void sendMessage(MQMessage msg); }
MQProducer实现类
package com.shop.service.serviceImpl; import com.shop.cache.RedisUtil; import com.shop.rabbitmq.MQMessage; import com.shop.service.MQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * Created by Skye on 2018/7/7. */ @Service public class MQProducerImpl implements MQProducer { private static Logger log = LoggerFactory.getLogger(MQProducer.class); @Autowired AmqpTemplate amqpTemplate ; @Override public void sendMessage(MQMessage msg) { String mm = RedisUtil.beanToString(msg); log.info("send message:"+msg); //发送信息 amqpTemplate.convertAndSend(MIAOSHA_QUEUE, mm); } }
4.写消费者
package com.shop.rabbitmq; import com.shop.bean.LocalUser; import com.shop.bean.MiaoshaOrder; import com.shop.bean.MiaoshaProduct; import com.shop.cache.RedisUtil; import com.shop.dto.ProductExecution; import com.shop.service.MQProducer; import com.shop.service.MiaoshaService; import com.shop.service.OrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * Created by Skye on 2018/7/7. */ @Service public class MQConsumer { @Autowired MiaoshaService miaoshaService; @Autowired OrderService orderService; private static Logger log = LoggerFactory.getLogger(MQConsumer.class); public void rmqProducerMessage(Object object){ MQMessage MQMessage =(MQMessage) object; System.out.println(MQMessage.getExchange()); System.out.println(MQMessage.getRouteKey()); System.out.println(MQMessage.getParams().toString()); } @RabbitListener(queues=MQProducer.MIAOSHA_QUEUE) public void receive(String message) { log.info("receive message:"+message); MQMessage mm = RedisUtil.stringToBean(message, MQMessage.class); LocalUser localUser = mm.getUser(); MiaoshaProduct miaoshaProduct = mm.getMiaoshaProduct(); ProductExecution productExecution = miaoshaService.getMiaoshaProduct(miaoshaProduct); miaoshaProduct = productExecution.getMiaoshaProduct(); int stock = miaoshaProduct.getMiaoshaStock(); if(stock <= 0) { return; } //判断是否已经秒杀到了 MiaoshaOrder order = orderService.getMiaoshaOrder(localUser, miaoshaProduct); if(order != null) { return; } //减库存 下订单 写入秒杀订单 miaoshaService.doMiaosha(localUser, miaoshaProduct); } }
MQMessage类
package com.shop.rabbitmq; import com.shop.bean.LocalUser; import com.shop.bean.MiaoshaProduct; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; /** * Created by Skye on 2018/7/6. */ public class MQMessage implements Serializable { private static final long serialVersionUID = 1302562501680404088L; private LocalUser user; private MiaoshaProduct miaoshaProduct; public LocalUser getUser() { return user; } public void setUser(LocalUser user) { this.user = user; } public MiaoshaProduct getMiaoshaProduct() { return miaoshaProduct; } public void setMiaoshaProduct(MiaoshaProduct miaoshaProduct) { this.miaoshaProduct = miaoshaProduct; } private Class<?>[] paramTypes;//参数类型 private String exchange;//交换器 private Object[] params; private String routeKey;//路由key public MQMessage(){} public MQMessage(String exchange, String routeKey, Object...params) { this.params=params; this.exchange=exchange; this.routeKey=routeKey; } @SuppressWarnings("rawtypes") public MQMessage(String exchange, String routeKey, String methodName, Object...params) { this.params=params; this.exchange=exchange; this.routeKey=routeKey; int len=params.length; Class[] clazzArray=new Class[len]; for(int i=0;i<len;i++) clazzArray[i]=params[i].getClass(); this.paramTypes=clazzArray; } public byte[] getSerialBytes() { byte[] res=new byte[0]; ByteArrayOutputStream baos=new ByteArrayOutputStream(); ObjectOutputStream oos; try { oos = new ObjectOutputStream(baos); oos.writeObject(this); oos.close(); res=baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return res; } public String getRouteKey() { return routeKey; } public String getExchange() { return exchange; } public void setExchange(String exchange) { this.exchange = exchange; } public void setRouteKey(String routeKey) { this.routeKey = routeKey; } public Class<?>[] getParamTypes() { return paramTypes; } public Object[] getParams() { return params; } }
SSM整合rabbitMQ时报错org.springframework.beans.factory.NoSuchBeanDefinitionException