zoukankan      html  css  js  c++  java
  • springCloud Stream

    spring Cloud Steam

    • 生产者配置

    设置生产者的输入输出通道

    package net.happyeasygo.hotel.mq.interf;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface OrderProcessor {
    	String INPUT_ORDER = "inputOrder";
        String OUTPUT_ORDER = "outputOrder";
    
        @Input(INPUT_ORDER)
        SubscribableChannel inputOrder();
    
        @Output(OUTPUT_ORDER)
        MessageChannel outputOrder();
    }
    

    生产者发送消息

    @EnableBinding(OrderProcessor.class)
    public class PayCallBackController{
        @Autowired
    	private OrderProcessor orderProcessor;
    	
    	public void payMethod(){
    	    //该data可以为对象也可以为字符串
    	    String data ="";
            orderProcessor.outputOrder().send(MessageBuilder.withPayload(data).build());	    
    	   }
       }
        //添加监听消息队列output通道
        @StreamListener(OrderProcessor.INPUT_ORDER)
        public void recevieOrder(String obj) {
        	_log.info("Interface order completion, message queue");
        }
    

    设置application.yml文件

    spring:
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            inputOrder:
              destination: mqSupplierOrder
            outputOrder:
              destination: mqSupplierOrder
              content-type: application/json
      rabbitmq: 
        host: ${RMQ_HOST:192.168.1.57} 
        port:  ${RMQ_PORT:5672} 
        username: ${RMQ_USERNAME:guest}
        password: ${RMQ_PASSWORD:guest}
    
    • 消费者配置

    设置消费者的输入输出通道

    package net.happyeasygo.hotel.mq.interf;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface OrderProcessor {
    	String INPUT_ORDER = "inputOrder";
        String OUTPUT_ORDER = "outputOrder";
    
        @Input(INPUT_ORDER)
        SubscribableChannel inputOrder();
    
        @Output(OUTPUT_ORDER)
        MessageChannel outputOrder();
    }
    

    消费者消费消息

    
    @EnableBinding(OrderProcessor.class)
    public class OrderReceiver {
    
        @StreamListener(OrderProcessor.INPUT_ORDER)
    	public void receiveMethod(String bookingNo) throws Exception{
    		strategyDida.book("strategyDida",bookingNo);
    	}
    }
    

    application.yml配置

    spring:
      rabbitmq: 
        host: ${RMQ_HOST:192.168.1.57} 
        port:  ${RMQ_PORT:5672} 
        username: ${RMQ_USERNAME:guest}
        password: ${RMQ_PASSWORD:guest}
      cloud:
        stream:
          default-binder: rabbit
          bindings:
            inputOrder:
              destination: mqSupplierOrder
            outputOrder:
              destination: mqSupplierOrder
    
  • 相关阅读:
    项目spring boot 写es hbase 运行内存溢出
    spring boot项目启动报错
    线程的创建启动及线程池的使用
    ajax 跨域问题处理
    spring @Value("${name}")使用
    平时服务正常,突然挂了,怎么重启都起不来,查看日志Insufficient space for shared memory file 内存文件空间不足
    oracle 特殊符号替换删除处理
    Linux——CentOS 7 systemctl和防火墙firewalld命令
    linux 查看并对外开放端口(防火墙拦截处理)
    SpringBoot 使用 Gson 序列化(禁用 Jackson)
  • 原文地址:https://www.cnblogs.com/joinlemon/p/9335038.html
Copyright © 2011-2022 走看看