zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (8.重试机制)

     

    producer:

    **默认超时时间**

    /**
    * Timeout for sending messages.
    */
    private int sendMsgTimeout = 3000;


    // 异步发送时 重试次数,默认 2
    producer.setRetryTimesWhenSendAsyncFailed(1);
    // 同步发送时 重试次数,默认 2
    producer.setRetryTimesWhenSendFailed(1);

    // 是否向其他broker发送请求 默认false
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);

    Consumer:

    消费超时,单位分钟

    `consumer.setConsumeTimeout()`

    发送ack,消费失败

    `RECONSUME_LATER`

    broker投递:

    只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试

    重投使用`messageDelayLevel`

    默认值:

    messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    1、生产者样例

    //添加重试机制
        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("MQ2Group");
    
            producer.setNamesrvAddr("127.0.0.1:9876");
    
            producer.start();
    
            //异步发送 重试次数 系统默认是2
            producer.setRetryTimesWhenSendAsyncFailed(1);
            //同步发送 重试次数 系统默认是2
    //        producer.setRetryTimesWhenSendFailed(1);
            producer.send(new Message("MQ2Topic","回调消息!".getBytes()));
            producer.setRetryAnotherBrokerWhenNotStoreOK(true);
    //        producer.shutdown();
            System.out.println("生产者下线!");
    
        }

    2、消费者样例

        //接收消息
        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MQ2Group");
    
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            consumer.subscribe("MQ2Topic","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for (MessageExt mes: list) {
    
                        System.out.println("mes : "+new String(mes.getBody()));
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
    
            consumer.start();
            System.out.println("Consumer  start...");
        }
  • 相关阅读:
    X Wing 数独Sudoku
    Vue.js—Difference between v-model and v-bind
    How to add dynamically attribute in VueJs
    how many types in javascript
    Why does JavaScript variable declaration at console results in “undefined” being printed?
    What’s the difference between “{}” and “[]” while declaring a JavaScript array?
    What’s the difference between “Array()” and “[]” while declaring a JavaScript array?
    vue-cli 和webpack
    OpenWrt For Support SkyEdge Gilat Modem Configure
    Webots 简介
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14598012.html
Copyright © 2011-2022 走看看