zoukankan      html  css  js  c++  java
  • spring-cloud-stream 整合 rabbitmq

    1,依赖与配置

    1pom.xml

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    

    2,配置文件相关内容,这里使用系统默认的两个管道,output 和 input 分别对应 Source 和 Sink 两个接口

    # spring.cloud.stream.bindings.[output].destination:         交换机的名称
    # spring.cloud.stream.bindings.[output].group:               组,用于生成队列,组名相同时可以实现分布式
    # spring.cloud.stream.bindings.[input].destination:          交换机的名称
    # spring.cloud.stream.bindings.[input].group:                组,用于生成队列,组名相同时可以实现分布式
    # spring.cloud.stream.bindings.[input].consumer.concurrency: 消费者的并发量
    # spring.rabbitmq.addresses:                                 服务器地址
    # spring.rabbitmq.username:                                  账号
    # spring.rabbitmq.password:                                  密码
    # spring.rabbitmq.virtual-host:                              虚拟主机
    spring:
        stream:
          default-binder: rabbit
          bindings:
            output:
              destination: order.exchange
              group: order.queue
            input:
              destination: order.exchange
              group: order.queue
              consumer:
                concurrency: 3
      rabbitmq:
        addresses: 192.168.200.100:5672
        username: rabbit
        password: 123456
        virtual-host: /
    

    2,代码部分

    1,作为数据的实体类,注意需要实现 Serializable 接口

    package com.hwq.rabbitmq.entity;
    
    import lombok.Getter;
    import lombok.Setter;
    import lombok.ToString;
    
    import java.io.Serializable;
    
    @Getter
    @Setter
    @ToString
    public class Order implements Serializable {
        private String id;
        private String name;
    }
    

    2,消费者监听

    package com.hwq.rabbitmq.service;
    
    import com.hwq.rabbitmq.entity.Order;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Service;
    
    @EnableBinding(Sink.class)
    @Service
    public class InputService {
    
        @StreamListener(Sink.INPUT)
        public void receiveOrder(Order order) throws InterruptedException {
            Thread.sleep(1000);
            System.out.println("接收到消息:" + order);
        }
    
    }
    

    3,封装发送消息的生产者

    package com.hwq.rabbitmq.service;
    
    import com.hwq.rabbitmq.entity.Order;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    
    @EnableBinding(Source.class)
    @Service
    public class OutputService {
    
        @Autowired
        private Source source;
    
        public void sendOrder(Order order, Map<String, Object> properties) {
            MessageHeaders headers = new MessageHeaders(properties);
            Message<Order> message = MessageBuilder.createMessage(order, headers);
            boolean result = source.output().send(message);
            System.out.println("消息发送成功:" + result);
        }
    
    }
    

    4,测试的 控制器

    package com.hwq.rabbitmq.controller;
    
    import com.hwq.rabbitmq.entity.Order;
    import com.hwq.rabbitmq.service.OutputService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    
    @RequestMapping("queue")
    @RestController
    public class QueueController {
    
        @Autowired
        private OutputService outputService;
    
        /**
         * 往消息队列中发送数据
         */
        @RequestMapping("send")
        public String send() {
            Order order = new Order();
            order.setId("123456789123456798");
            order.setName("你的订单");
            for (int i = 0; i < 20; i ++) {
                outputService.sendOrder(order, new HashMap<>());
            }
            return "ok";
        }
    
    }
    

    3,启动项目并访问 http://ip:port/queue/send

  • 相关阅读:
    Filter 和 interceptor 的区别
    JAVA基础知识|Optional
    CentOS 7安装MariaDB 10详解以及相关配置
    Linux系统zookeeper环境搭建(单机、伪分布式、分布式)
    Java设计模式——模板方法模式
    Java设计模式——装饰模式
    Java设计模式——观察者模式
    Java设计模式——代理模式
    Java设计模式——适配器模式
    Java设计模式——策略模式
  • 原文地址:https://www.cnblogs.com/lovling/p/12690836.html
Copyright © 2011-2022 走看看