生产者
package com.wangbiao.consumer.config; import org.apache.logging.log4j.message.SimpleMessageFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.SimpleClientHttpRequestFactory; /** * TODO * * @author wangbiao * @Title TODO * @module TODO * @description direct 发布订阅 这个模式多了个路由KEY 这里单独配置了RabbitTemplate * @date 2021/3/24 22:32 */ @Configuration public class DirectRabbitConfig { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) { // log.info("caching factory: {}", factory.getChannelCacheSize()); RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); /** * 当mandatory标志位设置为true时 * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息 * 那么broker会调用basic.return方法将消息返还给生产者 * 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃 */ rabbitTemplate.setMandatory(true); // rabbitTemplate.setReturnsCallback(rabbitReturnCallback); //使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞 rabbitTemplate.setUsePublisherConnection(true); //设置消息为json // rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); // 如果消息没有到exchange,则confirm回调,ack=false; 如果消息到达exchange,则confirm回调,ack=true rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息发送成功:correlationData({}),ack({}),cause({})"+correlationData+":"+ack+":"+cause); }else{ System.out.println("消息发送成功:correlationData({}),ack({}),cause({})"+correlationData+":"+ack+":"+cause); } } }); //如果exchange到queue成功,则不回调return;如果exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}"+returnedMessage); } }); return rabbitTemplate; } //序列化方式 // @Bean // public MessageConverter jackson2JsonMessageConverter(){ // Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // return jackson2JsonMessageConverter; // } //交换机注册与声明 这里介绍Direct模式交换机 @Bean public DirectExchange DirectExchange() { return new DirectExchange("direct_order_exchange", true, false); } //声明队列 sms.Direct.queue, email.Direct.queue, duanxin.Direct.queue, //完成绑定(队列与短信绑定关系) @Bean public Queue directemail() { return new Queue("email.direct.queue", true); } @Bean public Binding emailBind() { return BindingBuilder.bind(directemail()).to(DirectExchange()).with("email"); } @Bean public FanoutExchange deadExchange() { return new FanoutExchange("dead_direct_exchange", true, false); } @Bean public Queue dead() { return new Queue("dead.direct.queue", true); } @Bean public Binding deadBinding() { return BindingBuilder.bind(dead()).to(deadExchange()); } }
生产者业务代码:
package com.wangbiao.consumer.service; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.*; import java.util.Calendar; import java.util.Date; import java.util.UUID; /** * TODO * * @author wangbiao * @Title TODO * @module TODO * @description TODO * @date 2021/3/24 22:29 */ @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; public void makeOrder() { //1根据商品id查询库存 //2保存订单 String orderId= UUID.randomUUID().toString(); System.out.println("订单ok"); String exchangeName="direct_order_exchange"; //direct_order_exchange String tpoicKey="email"; rabbitTemplate.convertAndSend(exchangeName,tpoicKey,orderId); //确认下消费的信号 // rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { // @Override // public void confirm(CorrelationData correlationData, boolean ack, String cause) { // if(!ack){ // System.out.println("发送消息失败:"+cause); // throw new RuntimeException("发送异常:"+cause); // } // } // }); } }
消费者:
package com.wangbiao.mq; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.Channel; import com.wangbiao.Kung; import com.wangbiao.entity.Wmxg; import com.wangbiao.service.impl.DispatcherService; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * *@死信队列的消费 接收那些重复发送的消息还没有被消费的消息 (业务上一般安排人工客服) * */ @Component public class DeadMqConsumer { /** * 服务对象 */ private int count=1; @Autowired private DispatcherService dispatcherService; @RabbitHandler // @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.fanout.exchange", // durable = "true",autoDelete = "false"), // exchange = @Exchange(value = "order_fanout_exchange",type = ExchangeTypes.FANOUT))) @RabbitListener(queues ="dead.direct.queue") public void messageconsumer(String mesg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException { try { //1收到消息是 System.out.println("死信队列收到的消息是:" + mesg + ",count=" + count); //2获取订单的信息 Kung kung = JSON.parseObject(mesg, Kung.class); //3获取ID String orderId = kung.getOrderId(); String userId = kung.getUserId(); //4保存运单 Wmxg wmxg = new Wmxg(); wmxg.setOrderId(orderId); wmxg.setUserId(userId); // dispatcherService.insert(wmxg); //对当前消息进行应答 //用catch进行捕捉 channel.basicAck(tag,false); //只对当前收到的消息进行确认 true对消息进行批量确认 } catch (Exception e) { //如果出现异常的情况,根据实际情况去进行重发 //重发一次后,丢失还是日记,库存根据自己的业务场景去定 //参数1:消息的tag // 参数2:false 多条处理 // 参数3:requeue重发 fasle 不会重发,会把消息打入死信队列 true会进入死循环的重发,建议true的情况下,不使用try catch 否则造成循环 System.out.println("死信队列 发短信,然后移除"); System.out.println("死信队列仍然没有解决就人工客服"); channel.basicNack(tag,false,false); } } }
package com.wangbiao.mq; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.Channel; import com.rabbitmq.client.impl.AMQImpl; import com.wangbiao.Kung; import com.wangbiao.entity.Wmxg; import com.wangbiao.service.impl.DispatcherService; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * *@description 订单的消费者 *坚挺的队列名称: order.fanout.exchange * */ @Component public class OrderMqConsumer { /** * 服务对象 */ private int count=1; @Autowired private DispatcherService dispatcherService; @RabbitHandler // @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.fanout.exchange", // durable = "true",autoDelete = "false"), // exchange = @Exchange(value = "order_fanout_exchange",type = ExchangeTypes.FANOUT))) @RabbitListener(queues ="email.direct.queue") public void messageconsumer(String mesg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException { try { //1收到消息是 System.out.println("收到的消息是:" + mesg + ",count=" + count); //2获取订单的信息 Kung kung = JSON.parseObject(mesg, Kung.class); //3获取ID String orderId = kung.getOrderId(); String userId = kung.getUserId(); //4保存运单 Wmxg wmxg = new Wmxg(); wmxg.setOrderId(orderId); wmxg.setUserId(userId); //幂等性的问题,存在则更新,不存在则插入 使用分布式锁也可以解决 // dispatcherService.insert(wmxg); // System.out.println(1 / 0); //对当前消息进行应答 //用catch进行捕捉 channel.basicAck(tag,false); //只对当前收到的消息进行确认 true对消息进行批量确认 } catch (Exception e) { //如果出现异常的情况,根据实际情况去进行重发 //重发一次后,丢失还是日记,库存根据自己的业务场景去定 //参数1:消息的tag // 参数2:false 多条处理 // 参数3:requeue重发 fasle 不会重发,会把消息打入死信队列 true会进入死循环的重发,建议true的情况下,不使用try catch 否则造成循环 channel.basicNack(tag,false,false); } } }
查看下管理面板
队列
交换机
交换机通道的解释
路由的通道与解释
可以看出有一条消息未被消费
下面演示程序调用示例:
此时对应的面板关系是:
幂等性问题: 消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,
此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,
那么此时 RabbitMQ 并不会将该条消息删除,
当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。
同时,由于类似的原因,消息在发送的时候,
同一条消息也可能会发送两次 采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下: id-0(正在执行业务) id-1(执行业务成功) 如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。 极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。