zoukankan      html  css  js  c++  java
  • 第十一章 SpringCloud之stream

    ####################发送消息(producer)#####################

    1、添加依赖,在pom.xml文件

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <!--            <version>2.0.1.RELEASE</version>-->
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>

    2、创建管道消息

    package com.test.eurekaclientcomsumerstream.config;
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.SubscribableChannel;
    
    //创建发送消息管道
    public interface SendMessageInterface {
        // 创建一个输出管道,用于发送消息
        @Output("mymsg")
        SubscribableChannel sendMsg();
    }

    3、启动类添加注解

    package com.test.eurekaclientcomsumerstream;
    
    import com.test.eurekaclientcomsumerstream.config.SendMessageInterface;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding({SendMessageInterface.class}) //springcloud stream 生产者
    public class EurekaClientComsumerStreamProducerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(EurekaClientComsumerStreamProducerApplication.class, args);
        }
    
    }

    4、使用消息管道发送消息

    package com.test.eurekaclientcomsumerstream.controller;
    
    import com.test.eurekaclientcomsumerstream.config.SendMessageInterface;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    @RestController
    public class SendMsgController {
        @Autowired
        private SendMessageInterface sendMessageInterface;
    
        /**
         * 发送消息
         * @return
         */
        @RequestMapping("/sendMsg")
        public String sendMsg() {
            String msg = UUID.randomUUID().toString();
            System.out.println("生产者发送内容msg:" + msg);
            Message build = MessageBuilder.withPayload(msg.getBytes()).build();
            sendMessageInterface.sendMsg().send(build);
            return "success";
        }
    
    }

    5、application.yml文件添加配置

    server:
      port: 7016
    user:
      httpAddress: http://eureka-provider/getUser/  #使用虚拟主机名
    
    spring:
      application:
        name: eureka-client-stream-producer
      thymeleaf:
        suffix:
      rabbitmq:
        publisher-returns: true
        host: 132.232.44.82
        port: 5672
        listener:
          simple:
            acknowledge-mode: manual
            concurrency: 1
            max-concurrency: 1
            retry:
              enabled: true
      cloud:
        stream:
          bindings:
            mymsg:
              destination: test
              group: stream
    
    
    eureka:
      instance:
        hostname: localhost
        prefer-ip-address: true
        instance-id: ${spring.application.name}:${spring.application.instance_id:${server.port}}
      client:
        serviceUrl:
          defaultZone: http://${eureka.instance.hostname}:8761/eureka
    
    #  cloud:
    #    stream:
    #      bindings:
    #        mymsg: ###指定 管道名称
    #          #指定该应用实例属于 stream 消费组
    #          group: stream
    #  spring.rabbitmq.publisher-returns=true
    #  #采用手动应答
    #  #spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #  #指定最小的消费者数量
    #  spring.rabbitmq.listener.simple.concurrency=1
    #  #指定最大的消费者数量
    #  spring.rabbitmq.listener.simple.max-concurrency=1
    #  #是否支持重试
    #  spring.rabbitmq.listener.simple.retry.enabled=true 

    ####################接受消息(consumer)#####################

    1、在pom.xml添加注解

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
                <!--            <version>2.0.1.RELEASE</version>-->
            </dependency>

    2、创建接受消息管道

    package com.test.eurekaclientcomsumerstreamconsumer.config;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    //创建接受消息管道
    public interface ReadMsgInterface {
    
        // 从管道中获取消息
        @Input("mymsg")
        SubscribableChannel redMsg();
    }

    3、在启动类添加注解

    package com.test.eurekaclientcomsumerstreamconsumer;
    
    import com.test.eurekaclientcomsumerstreamconsumer.config.ReadMsgInterface;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @SpringBootApplication
    @EnableEurekaClient
    @EnableBinding({ReadMsgInterface.class}) //springcloud stream 消费者
    public class EurekaClientComsumerStreamConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(EurekaClientComsumerStreamConsumerApplication.class, args);
        }
    
    }

    4、使用管道接受消息

    package com.test.eurekaclientcomsumerstreamconsumer.controller;
    
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ReadMsgController {
    
        /**
         * 读取消息
         * @param msg 消息体
         */
        @StreamListener("mymsg")
        public void listener(String msg) {
            System.out.println("消费者获取生产消息-----1111:" + msg);
        }
    }

    5、在application.yml文件添加配置

    server:
      port: 7018
    user:
      httpAddress: http://eureka-provider/getUser/  #使用虚拟主机名
    
    spring:
      application:
        name: eureka-client-stream-consumer
      thymeleaf:
        suffix:
      rabbitmq:
        publisher-returns: true
        host: 132.232.44.82
        port: 5672
        listener:
          simple:
            acknowledge-mode: manual
            concurrency: 1
            max-concurrency: 1 #设置只能被消费一次,防止重复消费,分布式时,轮询被消费
            retry:
              enabled: true
      cloud:
        stream:
          bindings:
            mymsg:
              destination: test #目的地
              group: stream #群组
    
    eureka:
      instance:
        hostname: localhost
        prefer-ip-address: true
        instance-id: ${spring.application.name}:${spring.application.instance_id:${server.port}}
      client:
        serviceUrl:
          defaultZone: http://${eureka.instance.hostname}:8761/eureka
    
    #  cloud:
    #    stream:
    #      bindings:
    #        mymsg: ###指定 管道名称
    #          #指定该应用实例属于 stream 消费组
    #          group: stream
    #  spring.rabbitmq.publisher-returns=true
    #  #采用手动应答
    #  #spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #  #指定最小的消费者数量
    #  spring.rabbitmq.listener.simple.concurrency=1
    #  #指定最大的消费者数量
    #  spring.rabbitmq.listener.simple.max-concurrency=1
    #  #是否支持重试
    #  spring.rabbitmq.listener.simple.retry.enabled=true 

    详细代码github地址https://github.com/812406210/springCloud.git

    参考博客:https://www.jianshu.com/p/404fc32122d1

              https://www.cnblogs.com/a1304908180/p/10684818.html

  • 相关阅读:
    jQuery事件篇---高级事件
    Cookie处理
    JDBC技术
    JSP行为
    JSP九大内置对象
    JSP指令学习
    Oracle数据库学习之存储过程--提高程序执行的效率
    数据库操作之游标
    PL/SQL编程接触
    数据库数据的查询----连接查询
  • 原文地址:https://www.cnblogs.com/ywjfx/p/12100307.html
Copyright © 2011-2022 走看看