zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (2.发送异步消息)

      异步消息 :

      想要快速发送消息,又不想丢失的时候可以使用异步消息。

       消息接收方式:RocketMQ(三)——————javaAPI(1.2.3.4 消息接收方式)

      //官网示例
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("producerGroup_name");
            // 设置NameServer的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动Producer实例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 100;
            // 根据消息数量实例化倒计时计算器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
            for (int i = 0; i < messageCount; i++) {
                    final int index = i;
                    // 创建消息,并指定Topic,Tag和消息体
                    Message msg = new Message("TopicTest",//指定Topic
                        "TagA", //标签
                        "OrderID188", //keys
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // SendCallback接收异步返回结果的回调
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
            }
        // 等待5s
        countDownLatch.await(5, TimeUnit.SECONDS);
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    
        // 简单理解
        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            // 异步可靠消息
            // 不会阻塞,等待broker的确认
            //采用事件监听方式接受broker返回的确认
            Message message1 = new Message("myTopic001","rocketMq1 第一次发送".getBytes());
    
            // SendCallback接收异步返回结果的回调
            producer.send(message1, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
    
                    System.out.println("发送成功!");
                    System.out.println("发送结果 :"+sendResult);
                }
    
                public void onException(Throwable throwable) {
    
                    throwable.printStackTrace();
                    System.out.println("发送失败!");
                }
            });
    
            //测试先下线抛链接异常
            //producer.shutdown();
            System.out.println("生产者下线!");
        }
  • 相关阅读:
    android Json解析详解
    Android 用ping的方法判断当前网络是否可用
    Android 监控网络状态
    Android TableLayout 常用的属性介绍及演示
    三星笔记本R428安装xp win7双系统,切换系统重启才能进入系统解决办法。
    解决Win8不能上网攻略第二版!三步秒杀原驱动
    Android三种实现自定义ProgressBar的方式介绍
    Android应用开发中半透明效果实现方案
    FFT算法的物理意义
    网络编程Socket之TCP之close/shutdown具体解释(续)
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597827.html
Copyright © 2011-2022 走看看