zoukankan      html  css  js  c++  java
  • springcloudstream

    BillingMessageListener
    package cn.enjoy.listener;
    
    import cn.enjoy.BillingChannel;
    import cn.enjoy.BillingMessage;
    import cn.enjoy.channel.DefaultProcess;
    import cn.enjoy.vo.Product;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    @Component
    @EnableBinding(BillingChannel.class)
    public class BillingMessageListener {
    
        @StreamListener(BillingChannel.INPUT)
        public void input(Message<BillingMessage> message) {
            System.err.println("【*** 消息接收 ***】" + message.getPayload());
        }
    }
    BillingMessageProvider
    package cn.enjoy.provider;
    
    
    import cn.enjoy.BillingChannel;
    import cn.enjoy.BillingMessage;
    import cn.enjoy.channel.DefaultProcess;
    import cn.enjoy.vo.Product;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    
    import javax.annotation.Resource;
    
    @EnableBinding(BillingChannel.class)
    public class BillingMessageProvider {
        @Resource
        @Qualifier("billing_output")
        private MessageChannel output;  // 消息的发送管道
    
        public void send(BillingMessage message) {
            output.send(MessageBuilder.withPayload(message).build());
        }
    }
    BillingChannel
    package cn.enjoy;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface BillingChannel {
        public static final String OUTPUT = "billing_output"; // 输出通道名称
    
        public static final String INPUT = "billing_input"; // 输入通道名称
    
        @Input(BillingChannel.INPUT)
        public SubscribableChannel input();
    
        @Output(BillingChannel.OUTPUT)
        public MessageChannel output();
    }

    application.yml

    server:
      port: 6000
    
    spring:
      cloud:
        stream:
           rabbit:
              bindings:
                billing_input:
                  consumer:
                    bindingRoutingKey: billingKey # 设置一个RoutingKey信息
                billing_output:
                  producer:
                    routingKeyExpression: '''billingKey'''
           binders: # 在此处配置要绑定的rabbitmq的服务信息;
              defaultRabbit: # 表示定义的名称,用于于binding整合
                type: rabbit # 消息组件类型
                environment: # 设置rabbitmq的相关的环境配置
                  spring:
                    rabbitmq:
                       addresses: localhost
                       port: 5672
                       username: guest
                       password: guest
                       virtual-host: /
           bindings: # 服务的整合处理
              billing_input: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
                destination: BillingExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置
                group:  enjoy_group
              billing_output: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
                destination: BillingExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置
      application:
        name: microcloud-stream-provider-consumer
    TestMessageProvider
    package test;
    
    import cn.enjoy.BillingMessage;
    import cn.enjoy.StreamApp;
    import cn.enjoy.provider.BillingMessageProvider;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.annotation.Resource;
    import java.util.Date;
    
    @SpringBootTest(classes = StreamApp.class)
    @RunWith(SpringRunner.class)
    public class TestMessageProvider {
    
        @Resource
        private BillingMessageProvider messageProvider;
    
        @Test
        public void testSend() {
            for (int i = 0; i < 10; i++) {
                BillingMessage message = new BillingMessage();
                message.setAccountId("sss" + i);
                message.setStartTime(new Date());
                message.setEndTime(new Date());
                messageProvider.send(message);
            }
        }
    }
  • 相关阅读:
    解决html中&nbsp;在不同浏览器中占位大小不统一的问题 SUperman
    解决C#调用执行js报检索 COM 类工厂中 CLSID 为 {0E59F1D51FBE11D08FF200A0D10038BC} 组件失败
    面向对象程序设计_tesk1_寒假伊始
    面对对象程序设计_task2_1001.A+B Format (20)
    面向对象程序设计_Task5_Calculator1.5.0
    面向对象程序设计_Task4_Calculator1.1
    面向对象程序设计_课堂作业_01_Circle
    面向对象程序设计__Task3_Calculator
    面对对象程序设计_task2_C++视频教程
    pkgconfig 简述
  • 原文地址:https://www.cnblogs.com/skorzeny/p/12199105.html
Copyright © 2011-2022 走看看