zoukankan      html  css  js  c++  java
  • spring cloud stream整合

    spring cloud stream整体架构核心概念图:

    图一:消息的发送端和接收端可以是不同的中间件

     图二:

    图三:在消息的发送之前和消息的接收端套了一层管道

    • @Output:输出注释,用于定义发送消息接口
    • @Input:输入注解,用于定义消息的消费者接口
    • @StreamListener:用于定义监听方法的注解
    • springcloudstream框架有一个非常大的问题就是不能实现可靠性消息投递,会存在少量消息丢失的问题
      这个原因是springcloudstream框架为了和kafka兼顾所以在实际工作中使用它的目的是针对高性能的消息通信的
      这点就是当前版本的springcloudstream的定位

    Barista接口是定义作为后面类的参数,这一接口来定义通道类型和通道名称(该名称可以自定义)
    通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息

    引入pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.dwz</groupId>
      <artifactId>rabbitmq-springcloudstream-producer</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
      <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
      </parent>
      
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
      </properties>
      
      <dependencies>
          <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
      
          <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.0</version>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.0</version>
        </dependency>
        
         <!--spring boot热部署插件-->
          <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-actuator -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
      </dependencies>
      
      <build>
          <plugins>
          <plugin>
            <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
          </plugins>
      </build>
    </project>

    生产者代码

    配置application.properties

    server.port=8001
    server.servlet.context-path=/producer
    
    spring.application.name=producer
    spring.cloud.stream.bindings.output_channel.destination=exchange-4
    spring.cloud.stream.bindings.output_channel.group=queue-4
    spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
    
    #表明使用的环境是rabbit
    spring.cloud.stream.binders.rabbit_cluster.type=rabbit
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=127.0.0.1:5672
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=root_dwz
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=123456
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/vhost_dwz

    Barista接口

    package com.dwz.rabbitmq.stream;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    public interface Barista {
        
        String OUTPUT_CHANNEL = "output_channel";
        
        //注解@Output表明了它是一个输出类型的通道类,名字output_channel。这一名字与app1中通道名一致,表明注入了
        //一个名字为output_channel的通道
        @Output(Barista.OUTPUT_CHANNEL)
        MessageChannel logoutput(); 
    }

    发送消息的方法

    package com.dwz.rabbitmq.stream;
    
    import java.util.Map;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    
    @EnableBinding(Barista.class)
    @Service
    public class RabbitmqSender {
        @Autowired
        private Barista barista;
        
        //发送消息
        public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
            try {
                MessageHeaders mhs = new MessageHeaders(properties);
                Message msg = MessageBuilder.createMessage(message, mhs);
                boolean sendStatus = barista.logoutput().send(msg);
                System.err.println("-------------------sending---------------------");
                System.err.println("发送数据:" + message + ",sendStatus:" + sendStatus);
                return null;
            } catch (Exception e) {
                System.err.println("---------------------error--------------------------");
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());
            }
        }
    }    

    测试代码

    package com.dwz.rabbitmq;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.http.client.utils.DateUtils;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.dwz.rabbitmq.stream.RabbitmqSender;
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Testdd {
        
        @Autowired
        private RabbitmqSender rabbitmqSender;
        
        @Test
        public void sendMessageTest1() {
            for(int i = 0; i < 5; i++) {
                try {
                    Map<String, Object> properties = new HashMap<>();
                    properties.put("SERIAL_NUMBER", "12345");
                    properties.put("BANK_NUMBER", "abc");
                    properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss:SSS"));
                    rabbitmqSender.sendMessage("Hello, I am amqp sender num:" + i, properties);
                } catch (Exception e) {
                    System.err.println("------------------error-------------------");
                    e.printStackTrace();
                }
            }
        }
    }

    消费端代码

    配置application.properties

    server.port=8002
    server.servlet.context-path=/consumer
    
    spring.application.name=consumer
    spring.cloud.stream.bindings.input_channel.destination=exchange-4
    spring.cloud.stream.bindings.input_channel.group=queue-4
    spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
    spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=manual
    #设置断开重连
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
    #启用持久化订阅
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
    #设置最大监听数
    spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
    
    spring.cloud.stream.binders.rabbit_cluster.type=rabbit
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=127.0.0.1:5672
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=root_dwz
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=123456
    spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/vhost_dwz

    Barista接口

    package com.dwz.rabbitmq.stream;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface Barista {
        
        String INPUT_CHANNEL = "input_channel";
        
        //注解@Input声明了它是一个输入类型的通道,名字是input_channel
        @Input(Barista.INPUT_CHANNEL)
        SubscribableChannel loginput(); 
    }

    接收消息的方法

    package com.dwz.rabbitmq.stream;
    
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    @EnableBinding(Barista.class)
    @Component
    public class RabbitmqReceiver {
        
        @StreamListener(Barista.INPUT_CHANNEL)
        public void receiver(Message message) throws Exception {
            Channel channel = (Channel)message.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            System.err.println("Input Stream 1 接受数据:" + message);
            System.err.println("消费完毕---------------");
            channel.basicAck(deliveryTag, false);
        }
    }
  • 相关阅读:
    全站防止SQL注入类
    asp.net 技术网站
    Tekla API 常见问题摘录整理
    C#退出程序结束线程
    C#MetroModernUI库应用实例 WinForm窗体UI的美化
    获取项目物理根目录绝对路径
    计算两个时间月数的差
    centos7安装wps软件
    10 安全运维管理 10.11备份与恢复管理
    10 安全运维管理 10.14外包运维管理
  • 原文地址:https://www.cnblogs.com/zheaven/p/11915570.html
Copyright © 2011-2022 走看看