zoukankan      html  css  js  c++  java
  • rabbitMq消费死循环

    消费过程发生错误容易造成死循环
    
    
    1.控制重发次数
    2.try+catch+手动ack
    3.try+catch+手动ack+死信队列(重试次数就失效了,因为捕捉确认后被打入了相应的死信队列)

    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,(任何channel上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1),deliveryTag在channel范围内是唯一的
    第二个参数multiple:批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认; 如果值为false,则只对当前收到的消息进行确认


    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
    第二个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息




    channel.basicNack是 channel.basicReject的补充,提供一次对多条消息进行拒绝的功能

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; 
    第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
    第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
    第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息

     // 每个客户端每次最后获取N个消息
    channel.basicQos(1);



    
    
     







     * (Wmxg)表控制层
    *
    * @author makejava
    * @since 2021-03-27 23:02:34 结合producetransation 看文档的图片理解
    */
    @Component
    public class OrderMqConsumer {
    /**
    * 服务对象
    */
    private int count=1;
    @Autowired
    private DispatcherService dispatcherService;

    @RabbitHandler
    // @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.fanout.exchange",
    // durable = "true",autoDelete = "false"),
    // exchange = @Exchange(value = "order_fanout_exchange",type = ExchangeTypes.FANOUT)))
    @RabbitListener(queues ="order.fanout.exchange")
    public void messageconsumer(String mesg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException {
    try {
    //1收到消息是
    System.out.println("收到的消息是:" + mesg + ",count=" + count);
    //2获取订单的信息
    Kung kung = JSON.parseObject(mesg, Kung.class);
    //3获取ID
    String orderId = kung.getOrderId();
    String userId = kung.getUserId();
    //4保存运单
    Wmxg wmxg = new Wmxg();
    wmxg.setOrderId(orderId);
    wmxg.setUserId(userId);
              //幂等性的问题,存在则更新,不存在则插入   使用分布式锁也可以解决    避免重试时重复派单
    dispatcherService.insert(wmxg);
    System.out.println(1 / 0);

    //对当前消息进行应答
    //catch进行捕捉
    channel.basicAck(tag,false); //只对当前收到的消息进行确认
    true对消息进行批量确认

    } catch (Exception e) {
    //如果出现异常的情况,根据实际情况去进行重发
    //重发一次后,丢失还是日记,库存根据自己的业务场景去定
    //参数1:消息的tag
    // 参数2false 多条处理
    // 参数3requeue重发 fasle 不会重发,会把消息打入死信队列(自己建立一个死信队列,如下文书所示) true会进入死循环的重发(造成重复消费),建议true的情况下,不使用try catch 否则造成循环
    channel.basicNack(tag,false,false);

    }
    }


    自己建立的死信队列:
    @Bean
    public Queue orderQueue() {
    Map<String,Object> args=new HashMap<>();
    args.put("x-message-ttl",5000);//这里过期时间一定是一个INT类型
    args.put("x-dead-letter-exchange","dead_direct_exchange");//绑定死信队列交换机
    args.put("x-max-length",5);//指定最大接受多少条
    args.put("x-dead-letter-routing-key","dead");//fanout没有key
    return new Queue("order.fanout.exchange", true,false,false,args);
    }
     
     
     
     
    一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
  • 相关阅读:
    ssm集成--多模块
    ssm项目集成--单模块
    shiro的学习以及shiro集成spring
    RecyclerView的使用。
    RecyclerView 显示不全的问题.
    Android 中使用数据库作为存储,并随机发布的Demo。
    Fragment的两种加载方式。
    gradle生成的eclipse的web项目,在发布后,缺少jar包的情况。
    gradle 4,构建java web程序
    android 下载远程服务器文件。
  • 原文地址:https://www.cnblogs.com/wangbiaohistory/p/14590489.html
Copyright © 2011-2022 走看看