zoukankan      html  css  js  c++  java
  • RabbiMQ基础以及spring-boot-starter-amqp使用

    ​ RabbitMQ是一种基于amq协议的消息队列,本文主要记录一下rabbitmq的基础内容以及使用spring-boot-starter-amqp操作rabbitmq。

    1,rabbitmq中的几个重要概念

    a) 虚拟主机(vhost)

    ​ 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。虚拟主机的作用在于进行权限管控,rabbitmq默认有一个虚拟主机"/"。可以使用rabbitmqctl add_vhost命令添加虚拟主机,然后使用rabbitmqctl set_permissions命令设置指定用户在指定虚拟主机下的权限,以此达到权限管控的目的。

    b) 消息通道(channel)

    消息通道:  在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    

    c) 交换机(exchange)

    ​ 交换机: exchange的功能是用于消息分发,它负责接收消息并转发到与之绑定的队列,exchange不存储消息,如果一个exchange没有binding任何Queue,那么当它会丢弃生产者发送过来的消息,在启用ACK机制后,如果exchange找不到队列,则会返回错误。一个exchange可以和多个Queue进行绑定。

    交换机有四种类型:

    • 路由模式(Direct):

      ​direct 类型的行为是"先匹配, 再投送"。即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。direct是rabbitmq的默认交换机类型。

    • 通配符模式(Topic):

      ​类似路由模式,但是routing_key支持模糊匹配,按规则转发消息(最灵活)。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。

    • 发布订阅模式(Fanout):

      ​转发消息到所有绑定队列,忽略routing_key。

    • Headers:

    ​ 设置header attribute参数类型的交换机。相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型,忽略routing_key。在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列。
    ​ 在绑定Queue与Exchange时指定一组键值对,当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配。如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。

    匹配规则x-match有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息
    x-match = any :表示只要有键值对匹配就能接受到消息
    

    2,使用spring-boot-starter-amqp操作rabbitmq

    首先添加相关依赖:

          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
          </dependency>
    	  <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>
    

    application.properties中配置rabbitmq相关配置:

    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=cord
    spring.rabbitmq.password=123456
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.virtual-host=/
    

    定义rabbitmq配置类:

    RabbitMQConfig.java

    @Configuration
    public class RabbitMQConfig {
    
        private static final String topicExchangeName = "topic-exchange";
        private static final String fanoutExchange = "fanout-exchange";
        private static final String headersExchange = "headers-exchange";
    
        private static final String queueName = "cord";
    
        //声明队列
        @Bean
        public Queue queue() {
            //Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
            return new Queue("cord", false, true, true);
        }
    
        //声明Topic交换机
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange(topicExchangeName);
        }
    
        //将队列与Topic交换机进行绑定,并指定路由键
        @Bean
        Binding topicBinding(Queue queue, TopicExchange topicExchange) {
            return BindingBuilder.bind(queue).to(topicExchange).with("org.cord.#");
        }
    
        //声明fanout交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(fanoutExchange);
        }
    
        //将队列与fanout交换机进行绑定
        @Bean
        Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    
        //声明Headers交换机
        @Bean
        HeadersExchange headersExchange() {
            return new HeadersExchange(headersExchange);
        }
    
        //将队列与headers交换机进行绑定
        @Bean
        Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
            Map<String, Object> map = new HashMap<>();
            map.put("First","A");
            map.put("Fourth","D");
            //whereAny表示部分匹配,whereAll表示全部匹配
    //        return BindingBuilder.bind(queue).to(headersExchange).whereAll(map).match();
            return BindingBuilder.bind(queue).to(headersExchange).whereAny(map).match();
        }
    }
    

    定义生产者:

    Producer.java

    @Component
    public class Producer {
    
        @Autowired
        private AmqpTemplate template;
    
        @Autowired
        private AmqpAdmin admin;
    
        /**
         * @param routingKey 路由关键字
         * @param msg 消息体
         */
        public void sendDirectMsg(String routingKey, String msg) {
            template.convertAndSend(routingKey, msg);
        }
    
        /**
         * @param routingKey 路由关键字
         * @param msg 消息体
         * @param exchange 交换机
         */
        public void sendExchangeMsg(String exchange, String routingKey, String msg) {
            template.convertAndSend(exchange, routingKey, msg);
        }
    
        /**
         * @param map 消息headers属性
         * @param exchange 交换机
         * @param msg 消息体
         */
        public void sendHeadersMsg(String exchange, String msg, Map<String, Object> map) {
            template.convertAndSend(exchange, null, msg, message -> {
                message.getMessageProperties().getHeaders().putAll(map);
                return message;
            });
        }
    }
    

    定义消费者:

    Consumer.class

    @Component
    public class Consumer {
      
        @RabbitListener(queues = "cord")
        //@RabbitListener(queues = "cord", containerFactory="myFactory")
        public void processMessage(String msg) {
            System.out.format("Receiving Message: -----[%s]----- 
    .", msg);
        }
    }
    

    测试用例:

    RabbitmqTest.java

    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest(classes = CordApplication.class)
    public class RabbitmqTest {
    
        @Autowired
        private Producer producer;
    
        //Direct
        @Test
        public void sendDirectMsg() {
            producer.sendDirectMsg("cord", String.valueOf(System.currentTimeMillis()));
        }
    
        //Topic
        @Test
        public void sendtopicMsg() {
            producer.sendExchangeMsg("topic-exchange","org.cord.test", "hello world");
        }
    
        //Fanout
        @Test
        public void sendFanoutMsg() {
            producer.sendExchangeMsg("fanout-exchange", "abcdefg", String.valueOf(System.currentTimeMillis()));
        }
    
        //Headers
        @Test
        public void sendHeadersMsg() {
            Map<String, Object> map = new HashMap<>();
            map.put("First","A");
            producer.sendHeadersMsg("headers-exchange", "hello word", map);
        }
    }
    
    

    https://spring.io/guides/gs/messaging-rabbitmq/

    https://blog.csdn.net/qq1052441272/article/details/53940754

    https://stackoverflow.com/questions/19240290/how-do-i-implement-headers-exchange-in-rabbitmq-using-java

    https://blog.csdn.net/ztx114/article/details/78410727

    https://www.cnblogs.com/jfl-xx/p/7324285.html

  • 相关阅读:
    浅谈P2P
    一串字符的解密
    下载地址解密
    初探DirectX
    本文介绍在VC 6.0中编译和使用OpenSSL的过程
    鱼钩绑线视频
    PKCS cer 证书
    02、创建顶点缓冲
    [原]SSL 开发简述(Delphi)
    [转]Delphi和C++数据类型对照表
  • 原文地址:https://www.cnblogs.com/cord/p/9403364.html
Copyright © 2011-2022 走看看