1、新建 Maven 项目 stream
2、 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.java</groupId> <artifactId>stream</artifactId> <version>1.0.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> </parent> <!-- 配置版本常量 --> <properties> <jdk.version>1.8</jdk.version> </properties> <dependencies> <!-- Spring Boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> <version>2.0.0.RELEASE</version> </dependency> <!-- 热部署 --> <dependency> <groupId>org.springframework</groupId> <artifactId>springloaded</artifactId> <version>1.2.8.RELEASE</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
3、 StreamStarter.java
package com.java; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 启动类 * * @author Logan * @since 2020-04-22 * @version 1.0.0 */ @SpringBootApplication public class StreamStarter { public static void main(String[] args) { SpringApplication.run(StreamStarter.class, args); } }
4、 DemoListener.java
package com.java.stream.listener; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import lombok.extern.slf4j.Slf4j; /** * 监听消息 * * @author Logan * @since 2020-04-22 * @version 1.0.0 */ @Slf4j @EnableBinding(Sink.class) public class DemoListener { @StreamListener(Sink.INPUT) public void listener(String msg) { log.info("收到消息: {}", msg); } }
5、 DemoPublisher.java
package com.java.stream.publisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import lombok.extern.slf4j.Slf4j; /** * 发送消息 * * @author Logan * @since 2020-04-22 * @version 1.0.0 */ @Slf4j @EnableBinding(Source.class) public class DemoPublisher { @Autowired private Source source; public void output(Object message) { source.output().send(MessageBuilder.withPayload(message).build()); log.info("发送消息成功,{}", message); } }
6、 DemoController.java
package com.java.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.java.stream.publisher.DemoPublisher; /** * 示例调用发送消息 * * @author Logan * @since 2020-04-22 * @version 1.0.0 */ @RestController public class DemoController { @Autowired private DemoPublisher publisher; @GetMapping("/stream") public String demo(String param) { publisher.output(param); return "发送消息成功"; } }
7、 application.yml
spring:
cloud:
stream:
bindings:
# 发消息
output:
destination: demo
binder: mq-demo
# 收消息
input:
destination: demo
binder: mq-demo
group: demo
rabbit:
bindings:
input:
consumer:
concurrency: 4
max-concurrency: 8
prefetch: 10
defaultBinder: mq-demo
binders:
mq-demo:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.1.212
port: 5672
username: guest
password: guest
virtual-host: /
8、 运行 StreamStarter.java 启动项目
浏览器输入:
http://localhost:8080/stream?param=test
测试发送消息和接收消息
.