zoukankan      html  css  js  c++  java
  • springcloud整合stream解决项目升级的多个消息中间件的收发问题

    cloud stream

    (一)简介
    Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

    (二)快速搭建
    首先,我们通过一个简单的示例对 Spring Cloud Stream 有一个初步的认识。我们中间件使用 RabbitMQ,创建 spring-cloud-stream 模块

    消息的发送者:springboot工程

    POM

    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>


    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    </dependency>

    <!--spring boot热部署插件-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <optional>true</optional>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-actuator -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    </dependencies>

    application.yml文件:

    server:
    port: 8801

    spring:
    application:
    name: stream-output
    cloud:
    stream:
    bindings:
    output_channel: #通道
    destination: exchange-stream #交换机名称
    group: queue1 #队列名称
    binder: rabbit_cluster # 连接对象
    binders:
    rabbit_cluster:
    type: rabbit
    environment:
    spring:
    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

    eureka:
    client:
    serviceUrl: #注册中心的注册地址
    defaultZone: http://eureka-server1:8761/eureka/
    instance:
    prefer-ip-address: true #在eureka上显示IP地址
    instance-id: stream-output #在eureka上显示的名称

    启动入口:
    @SpringBootApplication
    @EnableEurekaClient
    public class CloudStream8801Application {
    public static void main(String[] args) {

    SpringApplication.run(CloudStream8801Application.class,args);
    }
    创建一个通道:
    public interface IMessageProvide {

    String OUTPUT_CHANNEL = "output_channel";

    //注解@Output表明了它是一个输出类型的通道类,名字output_channel。这一名字与app1中通道名一致,表明注入了
    //一个名字为output_channel的通道
    @Output(IMessageProvide.OUTPUT_CHANNEL)
    MessageChannel logoutput();
    }
    定义发送消息的方法:
    @Service
    @EnableBinding(IMessageProvide.class)
    public class RabbitmqSender {
    @Autowired
    private IMessageProvide messageProvide;

    // 发送消息的方法
    public String sendMessage(Object message, Map<String,Object> properties){

    MessageHeaders headers = new MessageHeaders(properties);
    Message<Object> objectMessage = MessageBuilder.createMessage(message, headers);
    boolean send = messageProvide.logoutput().send(objectMessage);
    if (send)
    return "完成。。";
    return "00000";
    }
    }
    测试发送消息的controller
    @RequestMapping("/stream")
    @RestController
    public class StreamController {

    @Autowired
    private RabbitmqSender rabbitmqSender;

    @RequestMapping("/sane")
    public String saneMessage(){
    String uuid = UUID.randomUUID().toString();
    Map<String, Object> map = new HashMap<>();
    map.put("aa","11");
    map.put("bb","22");
    map.put("cc","33");

    String s = rabbitmqSender.sendMessage(uuid, map);
    return s;
    }
    }
    消息消费的工程:
    POM:
    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>


    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    </dependency>

    <!--spring boot热部署插件-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <optional>true</optional>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-actuator -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    </dependencies>
    和发送消息的工程没差别
    application.yml
    server:
    port: 8802

    spring:
    application:
    name: cloud-input
    cloud:
    stream:
    binders:
    rabbit_cluster:
    type: rabbit
    environment:
    spring:
    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    bindings:
    input_channel:
    destination: exchange-stream
    group: queue1
    binder: rabbit_cluster
    consumer:
    concurrency: 1 #每次从队列中获取一个消息消费
    eureka:
    client:
    serviceUrl: #注册中心的注册地址
    defaultZone: http://eureka-server1:8761/eureka/
    instance:
    prefer-ip-address: true #在eureka上显示IP地址
    instance-id: cloud-input #在eureka上显示的名称
    启动的方法:
    @SpringBootApplication
    @EnableEurekaClient
    public class CloudStream8802Application {
    public static void main(String[] args) {

    SpringApplication.run(CloudStream8802Application.class,args);
    }
    }
    监听队列的service类:
    监听的通道:
    public interface Imessage {
    String INPUT_CHANNEL = "input_channel";

    //注解@Input声明了它是一个输入类型的通道,名字是input_channel
    @Input(Imessage.INPUT_CHANNEL)
    SubscribableChannel loginput();
    }
    消费消息的类:
    @EnableBinding(Imessage.class) //绑定通道
    public class RabbitmqReceiver {
    @Autowired
    private Imessage imessage;

    @StreamListener(Imessage.INPUT_CHANNEL)
    public void receiver(Message message) throws Exception {
    Channel channel = (Channel)message.getHeaders().get(AmqpHeaders.CHANNEL);
    Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    System.err.println("Input Stream 1 接受数据:" + message);
    System.err.println("消息为 :"+message.getPayload().toString());
    System.err.println("消费完毕---------------");
    // channel.basicAck(deliveryTag, false);
    }
    }
    最基础的stream整合就完成了,后面开发中高级的应用后面再补充
  • 相关阅读:
    20145237 《信息安全系统设计基础》第八周学习总结
    实验二 20145237 20155226 2015234 实验报告 固件程序设计
    实验一(开发环境的熟悉)问题总结
    实验二(固件设计)问题总结
    实验五(简单嵌入式WEB服务器实验)问题总结
    实验三( 实时系统的移植)问题总结
    实验四(外设驱动程序设计)问题总结
    20145235《信息安全系统设计基础》课程总结
    教材配套项目——缓冲区实验
    20145235 《信息安全系统设计基础》第十四周学习总结
  • 原文地址:https://www.cnblogs.com/niCong/p/15426312.html
Copyright © 2011-2022 走看看