zoukankan      html  css  js  c++  java
  • SpringCloud(4) rabbitmq使用

    一。rabbitmq基本知识

    rabbitmq 整个消息投递的路径为:
    producer->rabbitmq broker cluster->exchange->queue->consumer

    exchange: 交换器,接收生产者发送的消息并路由给对应的队列。三种常用的交换器类型:1.direct(发布订阅,完全匹配) 2。广播型 3.topic(主题,规则匹配)

    queue: 消息队列,用来保存消息直到发送给消费者。消息一直在队列中,知道消费者链接到队列将它取走

    binding: 绑定。用于消息队列和交换器之间的关联,一个绑定就是基于路由键将两者连接起来的路由规则。

     routingkey: 路由键。1.队列通过路由键和交换器绑定。2.消息带着路由键发送到交换器,交换器根据路由键发到匹配的队列

    二。代码示例

    2.1 使用amqp

    (1).pom依赖

    <!-- rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。
        spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。
     -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    (2).消费端

    //1. 只指定队列@RabbitListener(queues = "myQueue")
        //2.自动创建队列
        //@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue("myQueue"))
        //3.自动创建, exchange和queue绑定
        @RabbitListener(bindings = @QueueBinding(
                    exchange = @Exchange("myOrder"),
                    key = "fruit",
                    value = @Queue("fruitQueue")
                ))
        public void process(String message) {
            log.info("fruitProcess receive mq message: {}", message);
        }
        
        /**
         * Producer(生产者): 将消息发送到Exchange
        Exchange(交换器):将从生产者接收到的消息路由到Queue
        Queue(队列):存放供消费者消费的消息
        BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue)
        RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
        Consumer(消费者):从Queue中获取消息
         */
        @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange("myOrder"),
                key = "computer",
                value = @Queue("computerQueue")
            ))
        public void computerProcess(String message) {
            log.info("computerProcess receive mq message: {}", message);
        }

    (3).生产者

    @SpringBootTest
    public class MQSenderTest {
        
        @Autowired
        private AmqpTemplate amqpTemplate;
        
        @Test
        public void send() {
            amqpTemplate.convertAndSend("myQueue", "hello mq!!");
        }
        
        @Test
        public void send3() {
            amqpTemplate.convertAndSend("myOrder", "computer", "computerMsgsg");
        }
    }

    2.2 使用stream

    2.2.1 生产者

    (1) pom依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    (2)application.yml定义

    spring:
      cloud:
        stream:
          bindings:
            #连接源定义的input和output        
            messagesSend:
    #交换器名称 destination: msg-topic backRec: destination: back-topic
    ##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息 group: back-group rabbitmq: addresses: 127.0.0.1:5672 username: guest password: guest

    (3)定义mq连接源

    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface StreamClient {
        public static final String INPUT = "messagesSend"; 
        public static final String BACK = "backRec";
        
            // 发送
        @Output(INPUT)
        MessageChannel output();
        
            // 回调接收
        @Input(BACK)
        SubscribableChannel input();
    }

    (4)发送类, @EnableBinding(StreamClient.class)声明

    @RestController
    @EnableBinding(StreamClient.class)
    public class SendMessageController {
        @Autowired
        StreamClient streamClient;
        
        @RequestMapping("/sendMsg")
        public void sendMsg() {
            OrderDTO order = new OrderDTO();
            order.setOrderId("123");
            streamClient.output().send(MessageBuilder.withPayload(order).build());
        }
    }

    (5)接收返回的mq消息

    @Component
    @Slf4j
    @EnableBinding(StreamClient.class)
    public class StreamReciever {
                // 接收对象
                @StreamListener(StreamClient.BACK) 
                public void process(Object message) {
                  log.info("StreamReciever:{}", message); 
                }
    }

    2.2.2 消费者

    (1) pom依赖

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    (2)application.yml定义

    spring:
      cloud:
        stream:
          bindings:
            messageRec:
              #交换器exchange名称
              destination: msg-topic
              ##消费者组名,这个组下只有一个queue,多台应用情况下只有一台能收到消息
              group: msg-group
            backSend:
              destination: back-topic
    #我没有配置mq信息,自动用的默认配置 rabbitmq: addresses: 127.0.0.1:5672 username: guest password: guest

    (3)连接源

    public interface StreamServer {
        public static final String INPUT = "messageRec";
        public static final String BACK = "backSend";
        @Input(INPUT)
        SubscribableChannel input();
          
        @Output(BACK)
        MessageChannel output();
    }

    (4)接收端。并范松返回mq消息

    @Component
    @Slf4j
    @EnableBinding(StreamServer.class)
    public class StreamReciever {
           // 接收对象
            @StreamListener(StreamServer.INPUT)
            @SendTo(StreamServer.BACK)
            public String process(Object message) {
              log.info("StreamReciever:{}", message);
    // 返回的消息
    return "Revieved.."; } }

  • 相关阅读:
    马哥Linux——第三周作业
    [laravel]phpunit
    [laravel]要点
    [laravel]请求处理
    [angularJS]ng-hide|ng-show切换
    [yii2]urlmanger
    虚拟机bridged, NAT and host-only网络区别
    [yii]Fetch data from database and create listbox in yii
    [shell test] multiple conditions
    特殊的shell变量:
  • 原文地址:https://www.cnblogs.com/t96fxi/p/12865507.html
Copyright © 2011-2022 走看看