zoukankan      html  css  js  c++  java
  • springboot使用RocketMQ 发送接受消息

    Apache RocketMQ(官网地址:http://rocketmq.apache.org)是由阿里巴巴集团开源的大型消息队列,现在已经贡献给了Apache开源基金会,同时是一个分布式消息传递和流媒体平台,具有低延迟、高性能、可靠性、万亿级容量和灵活的可扩展性。(Github官网地址:https://github.com/apache/rocketmq)

    1. 加入RocketMQ依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.4</version>
    </dependency>

    2. 配置RocketMQ服务信息

    xc:
      rocketmq2:
        topic-string: topic_string
        topic-user: topic_user
        string-consumer-group: string_consumer_group
        user-consumer-group: user_consumer_group
    rocketmq:
      name-server: 172.19.25.168:9876
      producer:
        group: producer_group_test

    3. 编写生产者和消费者

    @Service
    @RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.string-consumer-group}", topic = "${xc.rocketmq2.topic-string}")
    public class StringConsumer implements RocketMQListener<String> {
    
        private Logger log = LoggerFactory.getLogger(this.getClass());
    
        @Override
        public void onMessage(String message) {
            log.info("消费字符串消息{}", message);
        }
    }
    @Service
    @RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.user-consumer-group}", topic = "${xc.rocketmq2.topic-user}")
    public class UserConsumer implements RocketMQListener<User> {
    
        private Logger log = LoggerFactory.getLogger(this.getClass());
    
        @Override
        public void onMessage(User user) {
            log.info("消费用户消息{}", user.getUsername());
        }
    }
    @RestController
    @RequestMapping("rocketmq")
    public class RocketmqController {
    
        private Logger log = LoggerFactory.getLogger(this.getClass());
    
        @Autowired
        private XcRocketMq2 xcRocketMq2;
    
        @Resource
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 同步发送
         * 页面访问http://localhost:8080/rocketmq/sync
         *
         * @throws Exception
         */
        @GetMapping("sync")
        public void sync() {
            SendResult sendResult = rocketMQTemplate.syncSend(xcRocketMq2.getTopicString(), "Hello world!");
            log.info("同步发送字符串{}, 发送结果{}", xcRocketMq2.getTopicString(), sendResult);
    
            User user = new User();
            user.setId("1");
            user.setUsername("wusq");
            sendResult = rocketMQTemplate.syncSend(xcRocketMq2.getTopicUser(), user);
            log.info("同步发送对象{}, 发送结果{}", xcRocketMq2.getTopicUser(), sendResult);
        }
    
        /**
         * 异步发送
         * 页面访问http://localhost:8080/rocketmq/async
         *
         * @throws Exception
         */
        @GetMapping("async")
        public void async() {
    
            rocketMQTemplate.asyncSend(xcRocketMq2.getTopicString(), "Hello world!", new SendCallback() {
                @Override
                public void onSuccess(SendResult var1) {
                    log.info("异步发送成功{}", var1);
                }
    
                @Override
                public void onException(Throwable var1) {
                    log.info("异步发送失败{}", var1);
                }
            });
        }
    
        /**
         * 单向发送
         * 页面访问http://localhost:8080/rocketmq/oneway
         *
         * @throws Exception
         */
        @GetMapping("oneway")
        public void oneway() {
            rocketMQTemplate.sendOneWay(xcRocketMq2.getTopicString(), "Hello world!");
            log.info("单向发送");
        }
    }

    源码:https://gitee.com/caoyeoo0/xc-springboot/tree/mq%2FRocketMqStart/

  • 相关阅读:
    『转』 PreTranslateMessage作用和使用方法
    either...or...与 neither...nor...
    CSS五類常用選擇器(收藏)
    JQuery选择器(selectors 的xpath语法应用)
    我是怎么看friends练口语的(转贴)
    变量的命名方法【Hungarian】【camelCase】【PascalCase】
    JavaScript继承机制的实现(未完)
    JavaScript面向对象编程(1) 基础
    Javascript:Object.extend
    JavaScript面向对象编程(2) 类的定义
  • 原文地址:https://www.cnblogs.com/ooo0/p/14109569.html
Copyright © 2011-2022 走看看