zoukankan      html  css  js  c++  java
  • RocketMQ源码分析:(二)消息发送的三种方式

    1. 同步传输(可靠,适用于重要的通知消息、短信通知、短信营销系统等)

    package com.miaoying.rocketmq.client;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import javax.annotation.PostConstruct;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     *  @Description:
     *  @author miaoying
     *  @date 2019/2/27
     */
    @Slf4j
    @Component
    public class RocketMQClient {
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQProducer() throws InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
    
            int messageCount = 10000;
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    
            try {
                producer.start();
                producer.setRetryTimesWhenSendAsyncFailed(0);
    
                for (int i = 0; i < messageCount; i++) {
                    Message message = new Message("TopicMiaoAsync", "push", "keyTest", ("message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(message);
                    log.info("MsgId = " + sendResult.getMsgId() + " , offsetMsgId = " + sendResult.getOffsetMsgId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            countDownLatch.await(5, TimeUnit.SECONDS);
            producer.shutdown();
        }
    }

    2. 异步传输(一般用于响应时间敏感的业务场景)

    package com.miaoying.rocketmq.client;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class AsyncRocketMQClient {
    
        public static void main(String[] args) throws InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("Producer");
            producer.setNamesrvAddr("localhost:9876");
    
            int messageCount = 10000;
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    
            try {
                producer.start();
                producer.setRetryTimesWhenSendAsyncFailed(0);
    
                for (int i = 0; i < messageCount; i++) {
                    Message message = new Message("TopicMiaoAsync", "push", "keyTest", ("message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(message, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult result) {
                            countDownLatch.countDown();
                            // 对于客户端来说msgd是由客户端producer生成的,offsetMsgId是由服务端broker生成的
                            log.info("MsgId : " + result.getMsgId() + " , offsetMsgId : " + result.getOffsetMsgId() + " , send status : " + result.getSendStatus());
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            countDownLatch.countDown();
                            log.error("send message to rocketmq fail. " + e.getMessage());
                        }
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            countDownLatch.await(5, TimeUnit.SECONDS);
            producer.shutdown();
        }
    }

    3. 单向传输(用于需要中等可靠性的情况,例如日志收集)

    package com.miaoying.rocketmq.client;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    
    import javax.annotation.PostConstruct;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     *  @Description:
     *  @author miaoying
     *  @date 2019/2/27
     */
    @Slf4j
    @Component
    public class RocketMQClient {
        /**
         * 生产者的组名
         */
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        /**
         * NameServer 地址
         */
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQProducer() throws InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
    
            int messageCount = 10000;
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    
            try {
                producer.start();
                producer.setRetryTimesWhenSendAsyncFailed(0);
    
                for (int i = 0; i < messageCount; i++) {
                    Message message = new Message("TopicMiaoAsync", "push", "keyTest", ("message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            countDownLatch.await(5, TimeUnit.SECONDS);
            producer.shutdown();
        }
    }

  • 相关阅读:
    37 反转一个3位整数
    372 在O(1)时间复杂度删除链表节点
    174 删除链表中倒数第n个节点
    13 字符串查找
    4.Single Number(出现一次的数)
    7.斐波那契数列
    6.旋转数组的最小数字
    5.用两个栈实现队列
    垃圾收集器与内存分配策略---确定对象的存亡状态
    Java内存区域与内存溢出异常---对象的内存布局和对象的访问定位
  • 原文地址:https://www.cnblogs.com/miaoying/p/10446643.html
Copyright © 2011-2022 走看看