zoukankan      html  css  js  c++  java
  • RocketMQ(二)

    RocketMQ

    Pom

    <dependencies>

                <dependency>

                      <groupId>com.alibaba.rocketmq</groupId>

                      <artifactId>rocketmq-client</artifactId>

                      <version>3.0.10</version>

                </dependency>

                <dependency>

                      <groupId>com.alibaba.rocketmq</groupId>

                      <artifactId>rocketmq-all</artifactId>

                      <version>3.0.10</version>

                      <type>pom</type>

                </dependency>

                <dependency>

                      <groupId>ch.qos.logback</groupId>

                      <artifactId>logback-classic</artifactId>

                      <version>1.1.1</version>

                </dependency>

                <dependency>

                      <groupId>ch.qos.logback</groupId>

                      <artifactId>logback-core</artifactId>

                      <version>1.1.1</version>

                </dependency>

                <dependency>

                      <groupId>junit</groupId>

                      <artifactId>junit</artifactId>

                      <version>4.10</version>

                      <scope>test</scope>

                </dependency>

          </dependencies>

     

     

    生产者

     

    publicclass Producer {

     

          publicstaticvoid main(String[] args) throws MQClientException {

                DefaultMQProducer producer = new DefaultMQProducer("rmq-group");

                producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

                producer.setInstanceName("producer");

                producer.start();

                try {

                      for (inti = 0; i < 10; i++) {

                            Thread.sleep(1000); // 每秒发送一次MQ

                            Message msg = new Message("itmayiedu-topic", // topic 主题名称

                                        "TagA", // tag 临时值

                                        ("itmayiedu-"+i).getBytes()// body 内容

                            );

                            SendResult sendResult = producer.send(msg);

                            System.out.println(sendResult.toString());

                      }

                } catch (Exception e) {

                      e.printStackTrace();

                }

                producer.shutdown();

          }

     

    }

     

    消费者

     

    publicclass Consumer {

          publicstaticvoid main(String[] args) throws MQClientException {

                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

     

                consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

                consumer.setInstanceName("consumer");

                consumer.subscribe("itmayiedu-topic", "TagA");

     

                consumer.registerMessageListener(new MessageListenerConcurrently() {

                      @Override

                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                            for (MessageExt msg : msgs) {

                                  System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));

                            }

                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                      }

                });

                consumer.start();

                System.out.println("Consumer Started.");

          }

    }

     

    RocketMQ重试机制

    MQ 消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

    MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

    publicclass Consumer {

          publicstaticvoid main(String[] args) throws MQClientException {

                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

     

                consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

                consumer.setInstanceName("consumer");

                consumer.subscribe("itmayiedu-topic", "TagA");

     

                consumer.registerMessageListener(new MessageListenerConcurrently() {

                      @Override

                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                            for (MessageExt msg : msgs) {

                                  System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));

                            }

                            try {

                                  inti = 1 / 0;

                            } catch (Exception e) {

                                  e.printStackTrace();

                                    // 需要重试

                                  return ConsumeConcurrentlyStatus.RECONSUME_LATER;

     

                            }

                             // 不需要重试

                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                      }

                });

                consumer.start();

                System.out.println("Consumer Started.");

          }

    }

     

    注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

     

    RocketMQ如何解决消息幂等

    注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

    解决办法:使用自定义全局ID判断幂等,例如流水ID、订单号

    使用msg.setKeys 进行区分

    生产者:

    publicclass Producer {

     

         publicstaticvoid main(String[] args) throws MQClientException {

               DefaultMQProducer producer = new DefaultMQProducer("rmq-group");

               producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

               producer.setInstanceName("producer");

               producer.start();

               try {

                    for (inti = 0; i < 1; i++) {

                         Thread.sleep(1000); // 每秒发送一次MQ

                         Message msg = new Message("itmayiedu-topic", // topic 主题名称

                                    "TagA", // tag 临时值

                                    ("itmayiedu-6" + i).getBytes()// body 内容

                         );

                         msg.setKeys(System.currentTimeMillis() + "");

                         SendResult sendResult = producer.send(msg);

                         System.out.println(sendResult.toString());

                    }

               } catch (Exception e) {

                    e.printStackTrace();

               }

               producer.shutdown();

         }

     

    }

    消费者:

       staticprivate Map<String, String> logMap = new HashMap<>();

     

          publicstaticvoid main(String[] args) throws MQClientException {

                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

     

                consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

                consumer.setInstanceName("consumer");

                consumer.subscribe("itmayiedu-topic", "TagA");

     

                consumer.registerMessageListener(new MessageListenerConcurrently() {

                      @Override

                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                            String key = null;

                            String msgId = null;

                            try {

                                  for (MessageExt msg : msgs) {

                                        key = msg.getKeys();

                                        if (logMap.containsKey(key)) {

                                              // 无需继续重试。

                                              System.out.println("key:"+key+",无需重试...");

                                              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                                        }

                                        msgId = msg.getMsgId();

                                        System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));

                                        inti = 1 / 0;

                                  }

     

                            } catch (Exception e) {

                                  e.printStackTrace();

                                  return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                            } finally {

                                  logMap.put(key, msgId);

                            }

                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                      }

                });

                consumer.start();

                System.out.println("Consumer Started.");

          }

     

  • 相关阅读:
    code-breaking picklecode中对signed_cookies引擎分析
    [一道蓝鲸安全打卡Web分析] 文件上传引发的二次注入
    攻防世界Web新手解析
    面向对象相关
    面试第二篇
    函数相关
    python复习目录
    面试第一篇
    Django Rest Framework进阶二
    Django Rest Framework进阶一
  • 原文地址:https://www.cnblogs.com/dingpeng9055/p/10433669.html
Copyright © 2011-2022 走看看