Spring Cloud Stream是Spring Cloud的组件之一,是一个为微服务应用构建消息驱动能力的框架。
1、导入引用
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2、定义接口
public interface StreamClient { String INPUT = "input"; String OUTPUT = "output"; @Input(INPUT) SubscribableChannel input(); @Output(OUTPUT) MessageChannel output(); }
3、定义消息的接收
import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { @StreamListener(StreamClient.INPUT) @SendTo(StreamClient.OUTPUT) public Object processInput(String message){ log.info("Input StreamReceiver:{}", message ); return message; } @StreamListener(StreamClient.OUTPUT) public void processOutPut(String message){ log.info("Output StreamReceiver:{}", message ); } }
4、定义消息的发送
@RestController public class SendMessageController { @Autowired private StreamClient streamClient; @GetMapping("/sendMessage") public void process(){ String msg = "hello world"; streamClient.output().send(MessageBuilder.withPayload(msg).build()); } }
5. 结果
6、发送对象
1) 发送者
/** * 发送对象 */ @GetMapping("/sendMessage2") public void process2(){ OrderDTO orderDTO = new OrderDTO(); orderDTO.setOrderId("123"); streamClient.output().send(MessageBuilder.withPayload(orderDTO).build()); }
2)接收者
@StreamListener(StreamClient.OUTPUT) public void processOutPut(OrderDTO message){ log.info("Output StreamReceiver:{}", message.toString() ); }
7、消息接收到后,再回复消息。使用SendTo
@StreamListener(StreamClient.INPUT) @SendTo(StreamClient.OUTPUT) //当Input接收到消息后,回复消息给Output public Object processInput(String message){ log.info("Input StreamReceiver:{}", message ); return message; }