zoukankan      html  css  js  c++  java
  • RocketMQ生产者和消费者

    一.导入依赖

       <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.1.0-incubating</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.1.1</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>1.1.1</version>
            </dependency>

    二:生产者

    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import redis.clients.jedis.Jedis;
    
    public class Producer {
        public static void main(String[] args) throws MQClientException {
         
            //创建生产者并指定组
            DefaultMQProducer producer = new DefaultMQProducer("my-group");
            //指定服务地址
            producer.setNamesrvAddr("192.168.118.3:9876;192.168.118.4:9876");
            //创建生产者实例
            producer.setInstanceName("producer");
            //启动生成者
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000); // 每秒发送一次MQ
    
                    Message msg = new Message("my-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("message-"+i).getBytes()// body 内容
                    );
    
                    SendResult sendResult = producer.send(msg);
                    //打印消息的完整信息
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //发送完所有信息停掉生产者
            producer.shutdown();
        }
    }

    三.消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
            consumer.setNamesrvAddr("192.168.118.3:9876");
            consumer.setInstanceName("consumer");
            consumer.subscribe("my-topic", "TagA");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }

    四:解决消息重复消费

    在客户端网络延迟或者报错的情况下导致消息无法成功签收,其他的消费者能继续监听到这个消息,导致重复消费的情况

    我们可以给没一条消息一个独一无二的标识,当作消息的keys,接受到消息之后,查看redis中有没有这个key,如果有就代表重复消费了,反之正常执行业务逻辑,先将keys放到redis中,防止其他消费者进行重复消费

    ,在所有操作执行完之后将keys重redis中删除

    消费者代码

    package com.yjc.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import redis.clients.jedis.Jedis;
    
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            Jedis jedis=new Jedis("192.168.118.3",6379);
            jedis.auth("admin");
            jedis.select(0);
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
            consumer.setNamesrvAddr("192.168.118.3:9876;192.168.118.4");
            consumer.setInstanceName("consumer");
            consumer.subscribe("my-topic3", "TagA");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                       System.out.println("监听到消息");
                        //查看此消息的key是否在缓存中
                        if (jedis.exists(msg.getKeys())) {
                            System.out.println("重复消费!");
                            //重复消费的情况签收失败
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                        jedis.set(msg.getKeys(),new String(msg.getBody()));
                        System.out.println("放到缓存中");
                        //模拟出错
                        int a=5/0;
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                        //消费完从缓存中删除
                        jedis.del(msg.getKeys());
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
  • 相关阅读:
    前端开发 Knockout
    一套基于Spring Boot+Vue+Shiro前后端分离的代码生成器
    七个开源的 Spring Boot 前后端分离项目
    Java老司机:把这些主流技术搞懂,拿20K没问题
    svn无法cleanup解决方案
    软件测试-1挡板测试
    电子琴
    myeclipse10激活注册码生成器代码
    LNK2005
    无法打开包含文件:"fstream.h"
  • 原文地址:https://www.cnblogs.com/yjc1605961523/p/12326767.html
Copyright © 2011-2022 走看看