zoukankan      html  css  js  c++  java
  • RabbitMq: 主题交换机的使用(Topic Exchange)

    主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
    简单地介绍下规则:

    * (星号) 用来表示一个单词 (必须出现的)
    # (井号) 用来表示任意数量(零个或多个)单词

    通配的绑定键是跟队列进行绑定的,例:

    队列Q1 绑定键为 .TT.
    队列Q2绑定键为 TT.#

    如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
    如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

    当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
    当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
    所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

    实现案例

    先实现一个配置类,定义了两个不同的队列和一个交换机进行绑定

    package com.example.demo.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author lyd
     * @Description: 主题交换机
     * @date 14:11
     */
    @Configuration
    public class TopicRabbitConfig {
    
    	// 队列名绑定键
    	public static final String QUEUE_MAN = "topic.man";
    	public static final String QUEUE_WOMAN = "topic.woman";
    
    	// 交换机绑定建
    	public static final String TOPIC_EXANGE = "topicExchange";
    
    	/**
    	 * man队列
    	 *
    	 * @return
    	 */
    	@Bean
    	public Queue manQueue() {
    		return new Queue(QUEUE_MAN,true);
    	}
    
    	/**
    	 * woman队列
    	 *
    	 * @return
    	 */
    	@Bean
    	public Queue womanQueue() {
    		return new Queue(QUEUE_WOMAN,true);
    	}
    
    	/**
    	 * 交换机
    	 *
    	 * @return
    	 */
    	@Bean
    	public TopicExchange exchange() {
    		return new TopicExchange(TOPIC_EXANGE);
    	}
    
    	/**
    	 * 将man队列和交换机绑定,并指定匹配键关键字为topic.man,这样只要是消息携带的路由键是topic.man,才会分发到该队列
    	 *
    	 * @return
    	 */
    	@Bean
    	Binding bindingExchangeMessageMan() {
    		return BindingBuilder.bind(manQueue()).to(exchange()).with(QUEUE_MAN);
    	}
    
    	/**
    	 * 将woman队列和交换机绑定,并指定匹配键关键字为topic.woman,这样只要是消息携带的路由键是topic.woman,才会分发到该队列
    	 *
    	 * @return
    	 */
    	@Bean
    	Binding bindingExchangeMessageWoman() {
    		return BindingBuilder.bind(womanQueue()).to(exchange()).with(QUEUE_WOMAN);
    	}
    
    }
    

    定义两个接口,分别给两个队列发送消息

    package com.example.demo.controller;
    
    import com.example.demo.config.DirectRabbitConfig;
    import com.example.demo.config.TopicRabbitConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * @author lyd
     * @Description:
     * @date 11:29
     */
    @RestController
    public class ConvertMessController {
    
    	@Autowired
    	RabbitTemplate rabbitTemplate;
    
    	@RequestMapping("sendManMessage")
    	@ResponseBody
    	public String sendManMessage(){
    
    		String messageId = String.valueOf(UUID.randomUUID());
    		String messageData = "我是男人";
    
    		// 将要发送的消息放进Map类型中
    		Map<String,Object> map=new HashMap<>();
    		map.put("messageId",messageId);
    		map.put("messageData",messageData);
    
    		rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_MAN,map);
    		return "ok";
    	}
    
    	@RequestMapping("sendWomanMessage")
    	@ResponseBody
    	public String sendWomanMessage(){
    
    		String messageId = String.valueOf(UUID.randomUUID());
    		String messageData = "我是女人";
    
    		// 将要发送的消息放进Map类型中
    		Map<String,Object> map=new HashMap<>();
    		map.put("messageId",messageId);
    		map.put("messageData",messageData);
    
    		rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_WOMAN,map);
    		return "ok";
    	}
    
    
    }
    

    监听两个队列,接收消息

    package com.example.demo.controller;
    
    import com.example.demo.config.TopicRabbitConfig;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @author lyd
     * @Description:
     * @date 14:26
     */
    @Component
    public class TopicReceiver {
    
    	@RabbitListener(queues = TopicRabbitConfig.QUEUE_MAN)
    	public void processMan(Message message){
    		System.out.println("男消费者接收到的消息:"+message);
    	}
    
    	@RabbitListener(queues = TopicRabbitConfig.QUEUE_WOMAN)
    	public void processWoman(Message message){
    		System.out.println("女消费者接收到的消息:"+message);
    	}
    
    
    }
    

    yml配置文件

    server:
      port: 8080
    
    spring:
      application:
        name: rabbitmq-provider
      rabbitmq:
        host: localhost
        username: guest
        password: guest
        port: 5672
        virtual-host: /
    

    调用接口 http://127.0.0.1:8080/sendManMessage ,接收到消息

    调用接口 http://127.0.0.1:8080/sendWomanMessage ,接收到消息

  • 相关阅读:
    Permutation Sequence
    Sqrt(x)
    Search in Rotated Sorted Array ||
    [STL]list的erase正确与错误用法
    一个支持Git应用编程开发的第三方库(API)
    VC++生成full dump文件
    Maven构建C++工程的插件-NAR
    VC++ Watch窗口查看指针指向的数组
    Android SDK更新失败的解决方法
    ADT20新建项目Android Support library not installed问题
  • 原文地址:https://www.cnblogs.com/lyd447113735/p/14930952.html
Copyright © 2011-2022 走看看