现在来构建生产者模块,发送消息到 rabbitmq
1. 创建模块
2. 添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- 主要引入stream的rabbit依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.aib.springcloud</groupId>
<artifactId>springclud-api-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3. 改配置
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: #代表多个binder defaultRabbit: #binder的名称,这是固定的 type: rabbit # 使用什么消息中间件 environment: # 消息中间件的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # output是一个通道的名称,表示输出通道,也意味着本模块是生产者 destination: studyExchang # 这是目的地对象的名称,默认是topic,但在rabbitmq这叫exchange;这个名称是可以任意的。 content-type: application/json # 设置消息类型,本次为json binder: defaultRabbit #使用哪个binder eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: send-8801.com prefer-ip-address: true
4. 主启动
@SpringBootApplication public class StreamProviderApplication { public static void main(String[] args) { SpringApplication.run(StreamProviderApplication.class, args); } }
5. 写业务
定义一个接口:
public interface IMessageProvider { public void send(); }
实现类(该实现类不需要加@Component,@EnableBinding会把该类加入到IOC中):
@EnableBinding(Source.class) //开启binder,同时指定source通道并将binder进行绑定 public class IMessageProviderImpl implements IMessageProvider { @Resource(name = "output") //注意这个名称必须是output,不然注入不进来。IOC中的输出通道对象名称是output private MessageChannel channel; //内部是使用我们指定的通道 @Override public void send() { String uuid = IdUtil.simpleUUID(); channel.send(MessageBuilder.withPayload(uuid).build()); //Message对象不能直接new,得通过withPayLoad装载消息,build构建Message System.out.println("消息发送完毕:"+uuid); } }
controller:
@RestController public class ProviderController { @Resource private IMessageProvider provider; @GetMapping("/sendMessage") public void sendMessage(){ provider.send(); } }
6. 测试;当生产者模块启动后,在rabbitmq上就有 名称是studyExchang 的 exchange了。现在访问 http://localhost:8801/sendMessage,会向rabbitmq发送消息了,不过由于没人去消费,不会保存下来。