zoukankan      html  css  js  c++  java
  • 98--Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息

    创建springboot项目

    pom.xml添加rocketmq-spring-boot-starter依赖。

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
    

    yml 配置

    application.yml

    rocketmq:
      name-server: 192.168.64.141:9876
    

    application-demo1.yml

    使用 demo1 profile 指定生产者组组名

    rocketmq:
      producer:
        group: producer-demo1
    

    application-demo2.yml

    使用 demo2 profile 指定生产者组组名

    rocketmq:
      producer:
        group: producer-demo2
    

    测试

    demo 1

    • 发送普通消息
    • 发送 Spring 的通用 Message 对象
    • 发送异步消息
    • 发送顺序消息

    生产者

    package cn.tedu.demo2.m1;
    
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Producer {
    
        @Autowired
        private RocketMQTemplate t ;
    
        public  void send(){
            //发送同步消息
            t.convertAndSend("Topic1:TagA", "Hello world! ");
    
            //发送spring的Message
            Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();
            t.send("Topic1:TagA",message);
    
            //发送异步消息
            t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送成功");
                }
    
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送失败");
                }
            });
    
            //发送顺序消息
            t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
            t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
            t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
        }
    
    
    }
    
    

    消费者

    package cn.tedu.demo2.m1;
    
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")
    public class Consumer  implements RocketMQListener<String> {
        @Override
        public void onMessage(String s) {
            System.out.println("收到"+s);
        }
    }
    
    

    主类

    package cn.tedu.demo2.m1;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class Main {
    
        public static void main(String[] args) {
            SpringApplication.run(Main.class, args);
        }
    
    }
    

    测试类

    需要放在 test 文件夹

    激活 demo1 profile @ActiveProfiles("demo1")

    package cn.tedu.demo2.m1;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.ActiveProfiles;
    
    @SpringBootTest
    @ActiveProfiles("demo1")
    public class Test1 {
        @Autowired
        private  Producer producer;
        @Test
        public void test1(){
            producer.send();
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    demo 2

    发送事务消息

    生产者

    package cn.tedu.demo2.m2;
    
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    @Component
    
    public class Producer {
    
        @Autowired
        private RocketMQTemplate t;
    
        public void send(){
            Message<String> message = MessageBuilder.withPayload("Hello world").build();
            //一旦发送消息,则执行监听器
            t.sendMessageInTransaction("Topic2",message,null);
        }
        @RocketMQTransactionListener
        class Lis implements RocketMQLocalTransactionListener {
            @Override
            public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("执行本地事务");
                return RocketMQLocalTransactionState.UNKNOWN;
            }
    
            @Override
            public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
                System.out.println("执行事务回查");
                return RocketMQLocalTransactionState.COMMIT;
            }
        }
    
    }
    
    

    消费者

    package cn.tedu.demo2.m2;
    
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")
    public class Consumer implements RocketMQListener<String> {
    
    
        @Override
        public void onMessage(String s) {
            System.out.println("收到"+s);
        }
    }
    
    

    主类

    package cn.tedu.demo2.m2;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class Main {
    
        public static void main(String[] args) {
            SpringApplication.run(Main.class, args);
        }
    
    }
    
    
    

    测试类

    package cn.tedu.demo2.m2;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.ActiveProfiles;
    
    @SpringBootTest
    @ActiveProfiles("demo2")
    public class Test2 {
        @Autowired
        private  Producer producer;
        @Test
        public void  test1(){
            producer.send();
            //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    
  • 相关阅读:
    vscode中使用less写css样式出现红色波浪线
    vue报错error Trailing spaces not allowed no-trailing-spaces
    visual code 报错error Expected space or tab after '//' in comment spaced-comment
    笔记本电脑已经启动却黑屏
    连接MySQL报错The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone.
    IDEA搭建基于maven的springboot工程
    eclipse 下修改Dynamic Web Modulle 的问题
    firefox(火狐)下 js中设置checkbox属性checked="checked"已有,但复选框却不显示勾选的原因
    unbtun python tab补全
    python之路:进阶篇 内置函数
  • 原文地址:https://www.cnblogs.com/liqbk/p/13677137.html
Copyright © 2011-2022 走看看