zoukankan      html  css  js  c++  java
  • Spring Cloud Stream消息驱动之RocketMQ入门(一)

    SpringCloudStream目前支持的中间件有RabbitMQ、Kafka,还有我最近在学习的RocketMQ,以下是我学习的笔记
    学习Spring cloud Stream 可以先学习一下了解 Spring Messaging 和 Spring Integration,

    先看看Spring Message 消息的模型

    file

    Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header

    file

    消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中,直接撸demo吧

    pom.xml 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>cloud-stream-rocketmq-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>cloud-stream-rocketmq-demo</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
                <version>0.2.1.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>Hoxton.SR1</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    @EnableBinding:该注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定
    @StreamListener:该注解主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名

    @Component
    @EnableBinding(StreamInput.class)
    @Slf4j
    public class ReceiveClient {
    
        @StreamListener(StreamInput.input)
        public void receive01(String message){
            log.info("接收消息:"+message);
        }
    
    
    }
    
    

    @Input注解绑定了一个名为input的通道

    public interface StreamInput {
    
        String input = "input";
    
        @Input(StreamInput.input)
        SubscribableChannel input();
    }
    

    @Output注解绑定了一个名为Output的通道

    public interface StreamInput {
    
        String input = "input";
    
        @Input(StreamInput.input)
        SubscribableChannel input();
    }
    

    测试一下
    启动类加上刚刚添加的两个接口
    @EnableBinding({StreamInput.class, StreamOutput.class})

    @Autowired
        private StreamOutput streamOutput;
    
        @GetMapping("/send")
        public String send(){
            MessageBuilder builder = MessageBuilder.withPayload("测试消息".getBytes());
            streamOutput.output().send(builder.build());
            return "ok";
        }
    

    不要忘记@EnableBinding注解绑定

    个人联系方式QQ:944484545,欢迎大家的加入,分享学习是一件开心事
  • 相关阅读:
    EIGRP-16-其他和高级的EIGRP特性-2-非等价负载分担
    EIGRP-15-其他和高级的EIGRP特性-1-路由器ID
    EIGRP-14-EIGRP的命名模式
    EIGRP-13-弥散更新算法-停滞在活动状态
    EIGRP-12-弥散更新算法-DUAL的FSM(*没写完)
    EIGRP-11-弥散更新算法-EIGRP中的本地计算和弥散计算
    EIGRP-10-弥散更新算法-计算距离,报告距离,可行距离和可行性条件
    EIGRP-9-弥散更新算法-拓扑表
    EIGRP-8-路由器的邻接关系
    EIGRP-7-可靠传输协议
  • 原文地址:https://www.cnblogs.com/hy-xiaobin/p/12173233.html
Copyright © 2011-2022 走看看