zoukankan      html  css  js  c++  java
  • 消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!

    求关注

    理解Exchange交换机核心概念!

    前言

    来了解RabbitMQ一个重要的概念:Exchange交换机

    1. Exchange概念

    • Exchange:接收消息,并根据路由键转发消息所绑定的队列。

    交换机

    蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
    黄色框:交换机和队列通过路由键有一个绑定的关系。
    绿色框:消费端通过监听队列来接收消息。

    2. 交换机属性

    Name:交换机名称
    Type:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲)
    Durability:是否需要持久化,true为持久化
    Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
    Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
    Arguments:扩展参数,用于扩展AMQP协议自定制化使用

    3. Direct Exchange(直连)

    • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

    注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

    Direct Exchange结构图

    重点:routing key与队列queues 的key保持一致,即可以路由到对应的queue中。

    3.1 代码演示

    生产端:

    
    /**
     * 
    * @ClassName: Producer4DirectExchange 
    * @Description: 生产者
    * @author Coder编程
    * @date2019年7月19日 下午22:15:52 
    *
     */
    public class Producer4DirectExchange {
    
    	
    	public static void main(String[] args) throws Exception {
    		
    		//1创建ConnectionFactory
    		Connection connection = ConnectionUtils.getConnection();
    		//2创建Channel
    		Channel channel = connection.createChannel();  
    		//3 声明
    		String exchangeName = "test_direct_exchange";
    		String routingKey = "test.direct";
    		//4 发送
    		String msg = "Coder编程 Hello World RabbitMQ 4  Direct Exchange Message ... ";
    		channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); 		
    	}
    }
    
    

    消费端:

    
    /**
     * 
    * @ClassName: Consumer4DirectExchange 
    * @Description: 消费者
    * @author Coder编程
    * @date2019年7月19日 下午22:18:52 
    *
     */
    public class Consumer4DirectExchange {
    
    	public static void main(String[] args) throws Exception {
    		
    		
    		//创建ConnectionFactory
    		Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();  
    		//声明
    		String exchangeName = "test_direct_exchange";
    		String exchangeType = "direct";
    		String queueName = "test_direct_queue";
    		String routingKey = "test.direct";
    		
    		//表示声明了一个交换机
    		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    		//表示声明了一个队列
    		channel.queueDeclare(queueName, false, false, false, null);
    		//建立一个绑定关系:
    		channel.queueBind(queueName, exchangeName, routingKey);
    		
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
    	}
    }
    
    
    

    测试结果:

    打印结果

    注意需要routingKey保持一致。可以自己尝试修改routingkey,是否能收到消息。

    4. Topic Exchange

    • 所有发送到Topic Exchange的消息被转发到所有管线RouteKey中指定Topic的Queue上
    • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

    注意:可以使用通配符进行模糊匹配
    符号 "#" 匹配一个或多个词
    符号 "" 匹配不多不少一个词
    例如:"log.#" 能够匹配到 "log.info.oa"
    "log.
    " 只会匹配到 "log.error"

    Topic Exchange结构图

    在一堆消息中,每个不同的队列只关心自己需要的消息。

    4.1 代码演示

    生产端:

    
    /**
     * 
    * @ClassName: Producer4TopicExchange 
    * @Description: 生产者
    * @author Coder编程
    * @date2019年7月19日 下午22:32:41
    *
     */
    public class Producer4TopicExchange {
    
    	
    	public static void main(String[] args) throws Exception {
    		
    		//1创建ConnectionFactory
    		Connection connection = ConnectionUtils.getConnection();
    		//2创建Channel
    		Channel channel = connection.createChannel();  
    		//3声明
    		String exchangeName = "test_topic_exchange";
    		String routingKey1 = "user.save";
    		String routingKey2 = "user.update";
    		String routingKey3 = "user.delete.abc";
    		//4发送
    		String msg = "Coder编程 Hello World RabbitMQ 4 Topic Exchange Message ...";
    		channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
    		channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); 	
    		channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
    		channel.close();  
            connection.close();  
    	}
    }
    
    
    

    消费端:

    
    /**
     * 
    * @ClassName: Consumer4TopicExchange 
    * @Description: 消费者
    * @author Coder编程
    * @date2019年7月19日 下午22:37:12
    *
     */
    public class Consumer4TopicExchange {
    
    	public static void main(String[] args) throws Exception {
    		
    		
    		//创建ConnectionFactory
    		Connection connection = ConnectionUtils.getConnection();
            
            Channel channel = connection.createChannel();  
    		// 声明
    		String exchangeName = "test_topic_exchange";
    		String exchangeType = "topic";
    		String queueName = "test_topic_queue";
    		//String routingKey = "user.*";
    		String routingKey = "user.*";
    		// 1 声明交换机 
    		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    		// 2 声明队列
    		channel.queueDeclare(queueName, false, false, false, null);
    		// 3 建立交换机和队列的绑定关系:
    		channel.queueBind(queueName, exchangeName, routingKey);
    		
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
    	}
    }
    
    
    

    测试结果:

    打印结果

    注意一个问题:需要进行解绑

    5. Fanout Exchange

    • 不处理路由键,只需要简单的将队里绑定到交换机上
    • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
    • Fanout交换机转发消息是最快的

    Fanout Exchange结构图

    5.1 代码演示

    生产端:

    
    
    /**
     * 
    * @ClassName: Producer4FanoutExchange 
    * @Description: 生产者
    * @author Coder编程
    * @date2019年7月19日 下午23:01:16
    *
     */
    public class Producer4FanoutExchange {
    
    	
    	public static void main(String[] args) throws Exception {
    		
    		//1创建ConnectionFactory
    		Connection connection = ConnectionUtils.getConnection();
    		//2 创建Channel
    		Channel channel = connection.createChannel();  
    		//3 声明
    		String exchangeName = "test_fanout_exchange";
    		//4 发送
    		for(int i = 0; i < 10; i ++) {
    			String msg = "Coder 编程  Hello World RabbitMQ 4 FANOUT Exchange Message ...";
    			channel.basicPublish(exchangeName, "", null , msg.getBytes()); 			
    		}
    		channel.close();  
            connection.close();  
    	}
    	
    }
    
    
    

    消费端:

    
    /**
     * 
    * @ClassName: Consumer4FanoutExchange 
    * @Description: 消费者
    * @author Coder编程
    * @date2019年7月19日 下午23:21:18
    *
     */
    public class Consumer4FanoutExchange {
    
    	public static void main(String[] args) throws Exception {
    		
    		//创建ConnectionFactory
    		Connection connection = ConnectionUtils.getConnection();
            
            Channel channel = connection.createChannel();  
    		// 声明
    		String exchangeName = "test_fanout_exchange";
    		String exchangeType = "fanout";
    		String queueName = "test_fanout_queue";
    		String routingKey = "";	//不设置路由键
    		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    		channel.queueDeclare(queueName, false, false, false, null);
    		channel.queueBind(queueName, exchangeName, routingKey);
    		
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer); 
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
    	}
    }
    
    
    

    测试结果:

    打印结果


    6. 其他

    6.1 Bingding —— 绑定

    • Exchange和Exchange、Queue之间的连接关系
    • Bingding可以包含RoutingKey或者参数

    6.2 Queue——消息队列

    • 消息队列,实际存储消息数据
    • Durability:是否持久化,Durable:是 ,Transient:否
    • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。

    6.3 Message——消息

    • 服务器与应用程序之间传送的数据
    • 本质上就是一段数据,由Properties和Payload(Body)组成
    • 常用属性:delivery mode、headers(自定义属性)

    6.4 其他属性

    content_type、content_encoding、priority

    correlation_id、reply_to、expiration、message_id

    timestamp、type、user_id、app_id、cluster_id

    6.5 Virtual Host虚拟主机

    • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
    • 一个Virtual Host里面可以有若干个Exchange和Queue
    • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

    7. 总结

    RabbitMQ的概念、安装与使用、管控台操作、结合RabbitMQ的特性、Exchange、Queue、Binding
    、RoutingKey、Message进行核销API的讲解,通过本章的学习,希望大家对RabbitMQ有一个初步的认识。

    文末

    欢迎关注个人微信公众号:Coder编程
    获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识!
    新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。

    文章收录至
    Github: https://github.com/CoderMerlin/coder-programming
    Gitee: https://gitee.com/573059382/coder-programming
    欢迎关注并star~
    微信公众号

    参考文章:

    《RabbitMQ消息中间件精讲》

    推荐文章:

    消息中间件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP协议!

    消息中间件——RabbitMQ(四)命令行与管控台的基本操作!

    消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!

  • 相关阅读:
    (转)IntelliJ IDEA 插件 阿里巴巴Java开发手册(Alibaba Java Coding Guidelines)
    idea快捷键整理
    (转)mysql使用Navicat 导出和导入数据库
    (转)Intellij Idea工具栏添加打开选中文件的资源管理器位置
    Intellij IDEA设置类注释和方法注释
    mavn jar包依赖冲突解决
    我的Keras使用总结(3)——利用bottleneck features进行微调预训练模型VGG16
    我的Keras使用总结(2)——构建图像分类模型(针对小数据集)
    我的Keras使用总结(1)——Keras概述与常见问题整理
    数据竞赛实战(5)——方圆之外
  • 原文地址:https://www.cnblogs.com/coder-programming/p/11396179.html
Copyright © 2011-2022 走看看