zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (2.发送异步消息)

      异步消息 :

      想要快速发送消息,又不想丢失的时候可以使用异步消息。

       消息接收方式:RocketMQ(三)——————javaAPI(1.2.3.4 消息接收方式)

      //官网示例
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("producerGroup_name");
            // 设置NameServer的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动Producer实例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 100;
            // 根据消息数量实例化倒计时计算器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
            for (int i = 0; i < messageCount; i++) {
                    final int index = i;
                    // 创建消息,并指定Topic,Tag和消息体
                    Message msg = new Message("TopicTest",//指定Topic
                        "TagA", //标签
                        "OrderID188", //keys
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // SendCallback接收异步返回结果的回调
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
            }
        // 等待5s
        countDownLatch.await(5, TimeUnit.SECONDS);
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    
        // 简单理解
        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            // 异步可靠消息
            // 不会阻塞,等待broker的确认
            //采用事件监听方式接受broker返回的确认
            Message message1 = new Message("myTopic001","rocketMq1 第一次发送".getBytes());
    
            // SendCallback接收异步返回结果的回调
            producer.send(message1, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
    
                    System.out.println("发送成功!");
                    System.out.println("发送结果 :"+sendResult);
                }
    
                public void onException(Throwable throwable) {
    
                    throwable.printStackTrace();
                    System.out.println("发送失败!");
                }
            });
    
            //测试先下线抛链接异常
            //producer.shutdown();
            System.out.println("生产者下线!");
        }
  • 相关阅读:
    《国富论》
    DataGridView
    《ASP.NET Core 3框架揭秘》
    看见
    英语常用词汇
    《未选择的路》
    Redis实战(20)Redis 如何从海量数据中查询出某一个 Key?
    .NET 程序集Assembly使用
    ExtJs基础知识总结:自定义弹窗和ComboBox自动联想加载(四)
    ExtJs基础知识总结:Dom、IFrame和TreePanel、TabPanel(三)
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597827.html
Copyright © 2011-2022 走看看