zoukankan      html  css  js  c++  java
  • rocketmq发送普通消息(二)

    先创建Topic,创建命令是如下(在bin目录下执行)

    sh mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876 -b localhost:10911

    导入pom包

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.3.0</version>
            </dependency>

     

    /**
     * 普通消息消费者
     */
    public class Consumer {
    
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
    
        public static void main(String[] args) throws MQClientException {
            // 1. 创建消费者(Push)对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            consumer.setNamesrvAddr(NAME_SERVER_ADDR);
            consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次
            // 3. 订阅对应的主题和Tag
            consumer.subscribe("TopicTest", "*");
    
            // 4. 注册消息接收到Broker消息后的处理接口
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        MessageExt messageExt = list.get(0);
                        System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
    
            // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)
            consumer.start();
    
            System.out.println("已启动消费者");
        }
    }
    /**
     * 普通消息消费者
     */
    public class PullConsumer {
    
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
    
        public static void main(String[] args) throws Exception {
            // 1. 创建消费者(Pull)对象
            DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            consumer.setNamesrvAddr(NAME_SERVER_ADDR);
            consumer.start();
            // 3. 获取到对于topic的queue列表
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest");
            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
                try {
                    // 4. 循环遍历
                    for (MessageQueue messageQueue : messageQueues) {
                        // 5. 获取读取位置
                        long offset = consumer.fetchConsumeOffset(messageQueue, true);
                        // 6. 从指定位置取queue中的消息,每次最多10条。  如果没有则阻塞等待
                        PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, offset, 10);
                        // 7. 存储Offset,客户端每隔5s会定时刷新到Broker()
                        System.out.println(pullResult.getNextBeginOffset());
                        consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                        // 8. 遍历结果
                        if (pullResult.getPullStatus() == PullStatus.FOUND) {
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt messageExt : messageExtList) {
                                System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, 1000L, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    一、异步发送消息

    Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。Producer 通过回调接口接收 MQ 响应,并处理响应结果。该方式的消息可靠性可以得到保障,消息发送效率也可以。

    一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

    /**
     * 异步消息
     * 一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回
     * 不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数,用来异步处理消息发送结果
     */
    public class AsyncProducer {
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
            // 1:创建生产者对象,并指定组名
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2:指定NameServer地址
            producer.setNamesrvAddr(NAME_SERVER_ADDR);
    
            // 3:启动生产者
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2
    
            int count = 10;
            CountDownLatch cd = new CountDownLatch(count);
            // 4:循环发送消息
            for (int i = 0; i < count; i++) {
                final int index = i;
    
                // ID110:业务数据的ID,比如用户ID、订单编号等等
                Message msg = new Message("TopicTest", "TagB", "ID110", ("Hello World " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送异步消息
                producer.send(msg, new SendCallback() {
                    /**
                     * 发送成功的回调函数
                     * 但会结果有多种状态,在SendStatus枚举中定义
                     * @param sendResult
                     */
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK MSG_ID:%s %n", index, sendResult.getMsgId());
                        cd.countDown();
                    }
    
                    /**
                     * 发送失败的回调函数
                     * @param e
                     */
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                        cd.countDown();
                    }
                });
            }
    
            // 确保消息都发送出去了
            cd.await();
            // 5:关闭生产者
            producer.shutdown();
        }
    }

    二、单向发送消息

    Producer 仅负责发送消息,不等待。该发送方式时 MQ 不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。此方式发送消息的过程耗时非常短,一般在微秒级别。

    一般用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

    /**
     * 单向模式
     * 一般用来对可靠性有一定要求的消息发送,例如日志系统
     * 不同于同步的唯一之处在于:调用的是sendOneway方法,且方法不返回任何值,即调用者不需要关心成功或失败
     */
    public class OnewayProducer {
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
    
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
            // 1:创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2:指定NameServer地址
            producer.setNamesrvAddr(NAME_SERVER_ADDR);
    
            // 3:启动生产者
            producer.start();
    
            // 4:发送消息
            for (int i = 0; i < 10; i++) {
                Message msg = new Message("TopicTest", "TagC", ("Hello OneWay :" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            }
            System.out.println("消息已发送");
    
            producer.shutdown();
        }
    }

    三、同步发送消息

    Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。同步发送应用场景广泛,例如通知邮件、报名短信通知等。

    /**
     * 发送同步消息
     * 可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。
     */
    public class SyncProducer {
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 1. 创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            producer.setNamesrvAddr(NAME_SERVER_ADDR);
    
            // 3. 启动生产者
            producer.start();
    
            // 4. 生产者发送消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                SendResult result = producer.send(message);
    
                System.out.printf("发送结果:%s%n", result);
            }
    
            // 5. 停止生产者
            producer.shutdown();
        }
    }

     四、对比

    这短短的一生我们最终都会失去,不妨大胆一点,爱一个人,攀一座山,追一个梦
  • 相关阅读:
    Spring ApplicationListener 理解
    Tomcat 的context.xml说明、Context标签讲解
    IntrospectorCleanupListener作用
    Dubbo 和 Spring Cloud微服务架构 比较及相关差异
    ZooKeeper原理 --------这可能是把ZooKeeper概念讲的最清楚的一篇文章
    Dubbo 入门
    makefile的调试器remake
    linux下的nmap工具能干什么?
    makefile中的patsubst函数有何作用?
    openwrt为何需要refresh新增的补丁?
  • 原文地址:https://www.cnblogs.com/xing1/p/15489954.html
Copyright © 2011-2022 走看看