zoukankan      html  css  js  c++  java
  • 使用RocketMQ 发送简单消息

    Apache RocketMQ(官网地址:http://rocketmq.apache.org)是由阿里巴巴集团开源的大型消息队列,现在已经贡献给了Apache开源基金会,同时是一个分布式消息传递和流媒体平台,具有低延迟、高性能、可靠性、万亿级容量和灵活的可扩展性。(Github官网地址:https://github.com/apache/rocketmq)

    1. 加入RocketMQ依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.2.0</version>
    </dependency>

    2. 配置RocketMQ服务信息

    xc:
      rocketmq:
        consumer:
          PushConsumer: PushConsumer
        producer:
          producerGroup: Producer
        namesrvAddr: 172.19.25.168:9876

    3. 编写生产者和消费者

    这里以发送10条简单消息为例,创建一个生产者,这里使用的是默认生产者DefaultMQProducer,在构建生产者的时候使用构造方法设置生产者的组名。使用setNamesrvAddr()方法设置NameServer,如果有多个NameServer,就使用逗号分隔。这里需要注意一点,生产者对象只调用一次start方法即可,不需要每次都调用。在构建消息体时设置topic和tags。

    @Component
    public class RocketMQSender {
        @Value("${xc.rocketmq.producer.producerGroup}")
        private String producerGroup;
        @Value("${xc.rocketmq.namesrvAddr}")
        private String namesrvAddr;
        private static final Logger log = LoggerFactory.getLogger(RocketMQSender.class);
    
        public void defaultMQProducer() {
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            producer.setVipChannelEnabled(false);
            producer.setNamesrvAddr(namesrvAddr);
            try {
                producer.start();
                Message message = new Message("TopicTest", "push", "【发送消息】".getBytes());
                StopWatch stop = new StopWatch();
                stop.start();
                for (int i = 0; i < 10; i++) {
                    SendResult result = producer.send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, 1);
                    log.info("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
                }
                stop.stop();
                log.info("----------------发送十条消息耗时:" + stop.getTotalTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.shutdown();
            }
        }
    
    }

    接下来编写一个消费者,其中的设置与生产者类似

    @Component
    public class RocketMQReceiver {
        @Value("${xc.rocketmq.consumer.PushConsumer}")
        private String consumerGroup;
        @Value("${xc.rocketmq.namesrvAddr}")
        private String namesrvAddr;
        private static final Logger log = LoggerFactory.getLogger(RocketMQReceiver.class);
    
        //@PostContruct是spring框架的注解,在方法上加该注解会在项目启动的时候执行该方法,也可以理解为在spring容器初始化的时候执行该方法。
        @PostConstruct
        public void defaultMQPushConsumer() {
            //消费者的组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
    
            //指定NameServer地址,多个地址以 ; 隔开
            consumer.setNamesrvAddr(namesrvAddr);
            try {
                //订阅PushTopic下Tag为push的消息
                consumer.subscribe("TopicTest", "push");
    
                //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
                //如果非第一次启动,那么按照上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
                            //输出消息内容
                            log.info("messageExt: " + messageExt);
                            String messageBody = new String(messageExt.getBody());
                            //输出消息内容
                            log.info("【defaultMQPushConsumer消费响应】:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //稍后再试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        //@PostContruct是spring框架的注解,在方法上加该注解会在项目启动的时候执行该方法,也可以理解为在spring容器初始化的时候执行该方法。
        @PostConstruct
        public void defaultMQPushConsumer2() {
            //消费者的组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("aaa");
    
            //指定NameServer地址,多个地址以 ; 隔开
            consumer.setNamesrvAddr(namesrvAddr);
            try {
                //订阅PushTopic下Tag为push的消息
                consumer.subscribe("TopicTest", "push");
    
                //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
                //如果非第一次启动,那么按照上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
                            //输出消息内容
                            log.info("---- messageExt: " + messageExt);
                            String messageBody = new String(messageExt.getBody());
                            //输出消息内容
                            log.info("----【defaultMQPushConsumer2消费响应】:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //稍后再试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    最后编写一个RocketMQController类来调用生产者发送消息

    @RestController
    public class RocketMQController {
    
        @Autowired
        private RocketMQSender rocketMQSender;
    
        @GetMapping("testRocketmq")
        public void testRocketmq() {
            rocketMQSender.defaultMQProducer();
        }
    
    }

    启动项目后,在浏览器中访问http://localhost:8080/testRocketmq

    文章来源:Spring Boot 2实战之旅 9.3 RocketMQ消息队列

    源码:https://gitee.com/caoyeoo0/xc-springboot/tree/mq%2FRocketMQ/

  • 相关阅读:
    有用的网站
    RMVANNUAL matlab remove annual cycle of a time series
    [转载]grdcontour命令在GMT4下绘制等值线图
    Filter应用之-自动登录
    Filter应用之-验证用户是否已经登录
    Filter应用之2-设置某些页面缓存或是不缓存
    过虑器应用之1-设置request编码
    过滤器Filter
    java文件下载
    用COS实现文件上传
  • 原文地址:https://www.cnblogs.com/ooo0/p/14109137.html
Copyright © 2011-2022 走看看