一 RabbitMQ+spring 实现
1 消息发送方
在applicationContext.xml中配置rabbitMQ相关配置
<!-- rabbitMQ配置 -->
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="8"/>
<property name="port" value="5672"></property>
<!-- 发布确认必须配置在CachingConnectionFactory上 -->
<property name="publisherConfirms" value="true"></property>
</bean>
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
<!--声明持久化队列-->
<rabbit:queue name="depot_queue" durable="true"/>
<!--声明持久化交换器,交换器类型-->
<rabbit:direct-exchange name="depot-amount-exchange"
xmlns="http://www.springframework.org/schema/rabbit"
durable="true">
<!--绑定队列和路由键-->
<rabbit:bindings>
<rabbit:binding queue="depot_queue" key="amount.depot" ></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template id="rabbitTemplate"
connection-factory="rabbitConnectionFactory"
mandatory="true"
return-callback="sendReturnCallback"
confirm-callback="confirmCallback">
<!--发送者消息失败回调 -->
<!--发送者消息确认回调 -->
</rabbit:template>
可以看到配置里需要2个bean, sendReturnCallback和confirmCallback分别用于失败确认和发送者确认
失败确认的bean 需要 implements RabbitTemplate.ReturnCallback,实现失败确认方法
package cn.enjoyedu.callback; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Component; /** * 类说明:发送者确认的回调 */ @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息发送给mq成功"); }else{ System.out.println("消息发送给mq失败,原因:"+cause); } } }
发送者确认的bean,需要implements RabbitTemplate.ConfirmCallback,实现发送者确认方法
package cn.enjoyedu.callback; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Component; /** * 类说明:发送者确认的回调 */ @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息发送给mq成功"); }else{ System.out.println("消息发送给mq失败,原因:"+cause); } } }
实际用法,发送消息到MQ
//路由键 private final static String DEPOT_RK = "amount.depot"; //队列名 private final static String DEPOT_EXCHANGE = "depot-amount-exchange"; @Autowired RabbitTemplate rabbitTemplate; //消息属性 MessageProperties messageProperties = new MessageProperties(); //设置消息持久化 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //发送消息 rabbitTemplate.send(DEPOT_EXCHANGE, DEPOT_RK, new Message(goods.getBytes(), messageProperties));
2 消息接收方
在applicationContext.xml中配置rabbitMQ相关配置
<!-- rabbitMQ配置 -->
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="8"/>
<property name="port" value="5672"></property>
</bean>
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
<!-- 声明持久化队列 -->
<rabbit:queue name="depot_queue" durable="true"/>
<!-- 声明持久化交换器,交换器类型 -->
<rabbit:direct-exchange name="depot-amount-exchange"
xmlns="http://www.springframework.org/schema/rabbit" durable="true">
<!-- 队列和路由键绑定 -->
<rabbit:bindings>
<rabbit:binding queue="depot_queue" key="amount.depot"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 对消息要手动确认 -->
<rabbit:listener-container connection-factory="rabbitConnectionFactory"
acknowledge="manual">
<!-- 消费者消息监听 ref 代表bean名字,method代表bean里面方法-->
<rabbit:listener queues="depot_queue" ref="processDepot"
method="onMessage" />
</rabbit:listener-container>
消费者
package cn.enjoyedu.mq; import cn.enjoyedu.service.DepotManager; import cn.enjoyedu.vo.GoodTransferVo; import com.google.gson.Gson; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** *类说明: 消费者 */ @Service public class ProcessDepot implements ChannelAwareMessageListener { private static Logger logger = LoggerFactory.getLogger(ProcessDepot.class); @Autowired private DepotManager depotManager; private static Gson gson = new Gson(); public void onMessage(Message message, Channel channel) throws Exception { try { String msg = new String(message.getBody()); logger.info(">>>>>>>>>>>>>>接收到消息:"+msg); GoodTransferVo goodTransferVo = gson.fromJson(msg,GoodTransferVo.class); try { depotManager.operDepot(goodTransferVo); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info(">>>>>>>>>>>>>>库存处理完成,应答Mq服务"); } catch (Exception e) { logger.error(e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true); logger.info(">>>>>>>>>>>>>>库存处理失败,拒绝消息,要求Mq重新派发"); throw e; } } catch (Exception e) { logger.error(e.getMessage()); } } }
二 rabbitMQ + Springboot 实现
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot.version}</version>
</dependency>
application.properties 配置
spring.application.name=springboot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
生产者
package cn.enjoyedu.hello; import cn.enjoyedu.RmConst; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** *类说明: */ @Component public class DefaultSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg) { String sendMsg = msg +"---"+ System.currentTimeMillis();; System.out.println("Sender : " + sendMsg); this.rabbitTemplate.convertAndSend(RmConst.QUEUE_HELLO, sendMsg); this.rabbitTemplate.convertAndSend(RmConst.QUEUE_USER, sendMsg); } }
消费者
package cn.enjoyedu.hello; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** *类说明: */ @Component @RabbitListener(queues = "sb.hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("HelloReceiver : " + hello); } }