zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (2.发送异步消息)

      异步消息 :

      想要快速发送消息,又不想丢失的时候可以使用异步消息。

       消息接收方式:RocketMQ(三)——————javaAPI(1.2.3.4 消息接收方式)

      //官网示例
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("producerGroup_name");
            // 设置NameServer的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动Producer实例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 100;
            // 根据消息数量实例化倒计时计算器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
            for (int i = 0; i < messageCount; i++) {
                    final int index = i;
                    // 创建消息,并指定Topic,Tag和消息体
                    Message msg = new Message("TopicTest",//指定Topic
                        "TagA", //标签
                        "OrderID188", //keys
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // SendCallback接收异步返回结果的回调
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
            }
        // 等待5s
        countDownLatch.await(5, TimeUnit.SECONDS);
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    
        // 简单理解
        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            // 异步可靠消息
            // 不会阻塞,等待broker的确认
            //采用事件监听方式接受broker返回的确认
            Message message1 = new Message("myTopic001","rocketMq1 第一次发送".getBytes());
    
            // SendCallback接收异步返回结果的回调
            producer.send(message1, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
    
                    System.out.println("发送成功!");
                    System.out.println("发送结果 :"+sendResult);
                }
    
                public void onException(Throwable throwable) {
    
                    throwable.printStackTrace();
                    System.out.println("发送失败!");
                }
            });
    
            //测试先下线抛链接异常
            //producer.shutdown();
            System.out.println("生产者下线!");
        }
  • 相关阅读:
    Windows XP下 Android开发环境 搭建
    Android程序的入口点
    在eclipse里 新建android项目时 提示找不到proguard.cfg
    64位WIN7系统 下 搭建Android开发环境
    在eclipse里 新建android项目时 提示找不到proguard.cfg
    This Android SDK requires Android Developer Toolkit version 20.0.0 or above
    This Android SDK requires Android Developer Toolkit version 20.0.0 or above
    Android requires compiler compliance level 5.0 or 6.0. Found '1.4' instead
    Windows XP下 Android开发环境 搭建
    Android程序的入口点
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597827.html
Copyright © 2011-2022 走看看