zoukankan      html  css  js  c++  java
  • rocketmq发送普通消息(二)

    先创建Topic,创建命令是如下(在bin目录下执行)

    sh mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876 -b localhost:10911

    导入pom包

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.3.0</version>
            </dependency>

     

    /**
     * 普通消息消费者
     */
    public class Consumer {
    
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
    
        public static void main(String[] args) throws MQClientException {
            // 1. 创建消费者(Push)对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            consumer.setNamesrvAddr(NAME_SERVER_ADDR);
            consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次
            // 3. 订阅对应的主题和Tag
            consumer.subscribe("TopicTest", "*");
    
            // 4. 注册消息接收到Broker消息后的处理接口
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        MessageExt messageExt = list.get(0);
                        System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
    
            // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)
            consumer.start();
    
            System.out.println("已启动消费者");
        }
    }
    /**
     * 普通消息消费者
     */
    public class PullConsumer {
    
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
    
        public static void main(String[] args) throws Exception {
            // 1. 创建消费者(Pull)对象
            DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            consumer.setNamesrvAddr(NAME_SERVER_ADDR);
            consumer.start();
            // 3. 获取到对于topic的queue列表
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest");
            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
                try {
                    // 4. 循环遍历
                    for (MessageQueue messageQueue : messageQueues) {
                        // 5. 获取读取位置
                        long offset = consumer.fetchConsumeOffset(messageQueue, true);
                        // 6. 从指定位置取queue中的消息,每次最多10条。  如果没有则阻塞等待
                        PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, offset, 10);
                        // 7. 存储Offset,客户端每隔5s会定时刷新到Broker()
                        System.out.println(pullResult.getNextBeginOffset());
                        consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
                        // 8. 遍历结果
                        if (pullResult.getPullStatus() == PullStatus.FOUND) {
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt messageExt : messageExtList) {
                                System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, 1000L, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    一、异步发送消息

    Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。Producer 通过回调接口接收 MQ 响应,并处理响应结果。该方式的消息可靠性可以得到保障,消息发送效率也可以。

    一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

    /**
     * 异步消息
     * 一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回
     * 不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数,用来异步处理消息发送结果
     */
    public class AsyncProducer {
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
            // 1:创建生产者对象,并指定组名
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2:指定NameServer地址
            producer.setNamesrvAddr(NAME_SERVER_ADDR);
    
            // 3:启动生产者
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2
    
            int count = 10;
            CountDownLatch cd = new CountDownLatch(count);
            // 4:循环发送消息
            for (int i = 0; i < count; i++) {
                final int index = i;
    
                // ID110:业务数据的ID,比如用户ID、订单编号等等
                Message msg = new Message("TopicTest", "TagB", "ID110", ("Hello World " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送异步消息
                producer.send(msg, new SendCallback() {
                    /**
                     * 发送成功的回调函数
                     * 但会结果有多种状态,在SendStatus枚举中定义
                     * @param sendResult
                     */
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK MSG_ID:%s %n", index, sendResult.getMsgId());
                        cd.countDown();
                    }
    
                    /**
                     * 发送失败的回调函数
                     * @param e
                     */
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                        cd.countDown();
                    }
                });
            }
    
            // 确保消息都发送出去了
            cd.await();
            // 5:关闭生产者
            producer.shutdown();
        }
    }

    二、单向发送消息

    Producer 仅负责发送消息,不等待。该发送方式时 MQ 不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。此方式发送消息的过程耗时非常短,一般在微秒级别。

    一般用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

    /**
     * 单向模式
     * 一般用来对可靠性有一定要求的消息发送,例如日志系统
     * 不同于同步的唯一之处在于:调用的是sendOneway方法,且方法不返回任何值,即调用者不需要关心成功或失败
     */
    public class OnewayProducer {
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
    
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
            // 1:创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2:指定NameServer地址
            producer.setNamesrvAddr(NAME_SERVER_ADDR);
    
            // 3:启动生产者
            producer.start();
    
            // 4:发送消息
            for (int i = 0; i < 10; i++) {
                Message msg = new Message("TopicTest", "TagC", ("Hello OneWay :" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            }
            System.out.println("消息已发送");
    
            producer.shutdown();
        }
    }

    三、同步发送消息

    Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。同步发送应用场景广泛,例如通知邮件、报名短信通知等。

    /**
     * 发送同步消息
     * 可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。
     */
    public class SyncProducer {
        public static final String NAME_SERVER_ADDR = "192.168.32.128:9876";
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 1. 创建生产者对象
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
    
            // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
            producer.setNamesrvAddr(NAME_SERVER_ADDR);
    
            // 3. 启动生产者
            producer.start();
    
            // 4. 生产者发送消息
            for (int i = 0; i < 10; i++) {
                Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    
                SendResult result = producer.send(message);
    
                System.out.printf("发送结果:%s%n", result);
            }
    
            // 5. 停止生产者
            producer.shutdown();
        }
    }

     四、对比

    这短短的一生我们最终都会失去,不妨大胆一点,爱一个人,攀一座山,追一个梦
  • 相关阅读:
    Asp.NET 4.0 ajax实例DataView 模板编程1
    ASP.NET 4.0 Ajax 实例DataView模板编程 DEMO 下载
    部分东北话、北京话
    .NET 培训课程解析(一)
    ASP.NET 4.0 Ajax 实例DataView模板编程2
    ASP.NET Web Game 架构设计1服务器基本结构
    ASP.NET Web Game 构架设计2数据库设计
    TFS2008 基本安装
    Linux上Oracle 11g安装步骤图解
    plsql developer远程连接oracle数据库
  • 原文地址:https://www.cnblogs.com/xing1/p/15489954.html
Copyright © 2011-2022 走看看