zoukankan      html  css  js  c++  java
  • rocketmq(三 java操作rocket API, rocketmq 幂等性)

    • JAVA操作rocketmq:

    1.导入rocketmq所需要的依赖:

        <dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>3.0.10</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba.rocketmq</groupId>
                <artifactId>rocketmq-all</artifactId>
                <version>3.0.10</version>
                <type>pom</type>
            </dependency>

    2.创建生产者

    package com.example.producer;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class Producer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("producer-group");
            producer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876");
            producer.setInstanceName("producer");
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    // Thread.sleep(1000); // 每秒发送一次MQ
                    Message msg = new Message("producer-topic", // topic 主题名称
                            "msg", // pull 临时值 在消费者消费的时候 可以根据msg类型进行消费
                            ("pushmsg-" + i).getBytes()// body 内容
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    
    }

    3.创建消费者

    package com.example.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    
            consumer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876");
            consumer.setInstanceName("consumer");
            consumer.subscribe("producer-topic", "msg");//此处是根据Message对象的参数来获取
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("消息id:"+msg.getMsgId() + "---" + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    
    }

    4.运行结果:

     生产者运行结果:

    消费者运行结果:

     

    • rocetmq幂等性问题:

    在Activemq中 jms规范支持两种消息模型:点对点和发布订阅,在rocketmq中 有两种消费模式:广播消费,和集群消费。

    在消费的过程中,如果消费者出现异常或者超时,导致mq没有及时的相应消费的状态,则可能让mq重试,重试机制就有可能导致出现幂等性,而rocketmq的幂等性 只会出现在集群消费(类似activemq中的点对点消息模型)

    生产者:

    package com.example.producer;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class Producer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("producer-group");
            producer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");
            producer.setInstanceName("producer");
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    Message msg = new Message("topic", // topic 主题名称
                            "msg", // pull 临时值 在消费者消费的时候 可以根据msg类型进行消费
                            (i + "条消息").getBytes()// body 内容
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    
    }

    消费者:

    package com.example.consumer;
    
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    
            consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");
            consumer.setInstanceName("consumer");
            consumer.subscribe("topic1", "msg");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));
                    }
                    // 超时的情况 或者程序异常
                    int i = 2 / 0;
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    
    }

    消费结果:

    消息id:C0A81FB100002A9F00000000000268EC---5条消息
    消息id:C0A81FB100002A9F000000000002686E---4条消息
    消息id:C0A81FA900002A9F0000000000037E6A---1条消息
    消息id:C0A81FB100002A9F000000000002696A---6条消息
    消息id:C0A81FB100002A9F00000000000269E8---7条消息
    消息id:C0A81FA900002A9F0000000000038062---9条消息
    消息id:C0A81FA900002A9F0000000000037EE8---2条消息
    消息id:C0A81FA900002A9F0000000000037FE4---8条消息
    消息id:C0A81FA900002A9F0000000000037F66---3条消息
    消息id:C0A81FA900002A9F0000000000037DEC---0条消息
    消息id:C0A81FA900002A9F0000000000038704---1条消息
    消息id:C0A81FA900002A9F000000000003880C---9条消息
    消息id:C0A81FA900002A9F0000000000038914---2条消息
    消息id:C0A81FA900002A9F0000000000038A1C---0条消息
    消息id:C0A81FA900002A9F0000000000038B24---3条消息
    消息id:C0A81FA900002A9F0000000000038C2C---8条消息
    消息id:C0A81FB100002A9F0000000000026E7E---4条消息
    消息id:C0A81FB100002A9F0000000000026F86---7条消息
    消息id:C0A81FB100002A9F0000000000027196---5条消息
    消息id:C0A81FB100002A9F000000000002708E---6条消息

    在Activimq中,可以通过消息id 来作为全局变量,检测是不是重复消费。但是在rocketmq中消费重试的结果中,任意选出两条相同的消息,可以看出 重试的时候消息id是不同的,此时在用消息id作为全局变量判断是否重复消费显然是不可能的。rocketmq中提供了一个消息的key,可以将业务id作为该key。例如:订单号什么的。可以将消息设置的key 在第一次消费的时候存放到数据库之中

    幂等性消费者:

    package com.example.consumer;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class Consumer {
        public static Map<String, String> map = new HashMap<String, String>();// 模拟内存,实际情况可以将key放在redis之中
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
            consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");
            consumer.setInstanceName("consumer");
            consumer.subscribe("topic1", "msg");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        if (!map.containsKey(msg.getKeys())) {
                            // 如果此时的业务逻辑是将收到的消息存放到数据库
                            System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));
                            map.put(msg.getKeys(), new String(msg.getBody()));
                        } else {
                            System.out.println("重复消费");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
                    // 超时的情况 或者程序异常
                    int i = 2 / 0;
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    
    }
  • 相关阅读:
    Oracle11g远程连接配置 visual studio 2003
    窗体设计器的使用套件
    Visual Studio 2003“无法启动调试 没有正确安装调试器“请修复调试器的解决办法
    eclipse maven插件安装教程
    遇到错误ERROR 1044 (42000): Access denied for user ''@'localhost'to database 'mysql',的解决办法
    定位
    盒子模型
    CSS3美化网页元素
    CSS随笔
    前端基础随笔
  • 原文地址:https://www.cnblogs.com/920913cheng/p/10730497.html
Copyright © 2011-2022 走看看