-
环境准备
- rabbitmq已运行,端口
5672
,控制台web端口15672
,用户名密码guest/guest
- rabbitmq已运行,端口
-
引入
spring cloud stream
依赖compile('org.springframework.cloud:spring-cloud-starter-stream-rabbit')
-
创建
StreamClient
接口public interface StreamClient { String INPUT = "input"; String OUTPUT = "output"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.OUTPUT) MessageChannel output(); }
-
创建监听
StreamReceiver
类@Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { // 直接监听队列 // @RabbitListener(queues = "error_log") // public void listenerObject(byte[] message) throws UnsupportedEncodingException { // String errorLog = new String(message, "utf-8"); // log.info("Stream Receiver Object: {}", errorLog); // // } @StreamListener(StreamClient.INPUT) public void processInput(Object message) { String errorLog = new String((byte[]) message, StandardCharsets.UTF_8); log.info("StreamInput Receiver Object: {}", errorLog); } }
-
配置
application.yaml
spring: rabbitmq: host: x.x.x.x port: 5672 username: guest password: guest cloud: stream: bindings: input: destination: LogExchanger #Exchange名称 group: error_log #队列名称 # output: # destination: LogExchanger # group: error_log rabbit: bindings: input: consumer: bindQueue: false declareExchange: false queueNameGroupOnly: true