zoukankan      html  css  js  c++  java
  • amqp 和 exchange 详细解释

    amqp  的 excange 字面意思是一个交换机。他的任务是吧 消息 分配给消息队列。

    amqp 的  exchange 有三种,分别是 Direct , fanout 和 toppic。三种。

      Direct:通过 Routing key 来分配消息 应该分配给那个消息队列。在给交换机绑定 消息对列的时候需要指定  路由关键字,并且之歌路由关键字必须是不包含通配符。

          特点:消息明确,只有一个对列会消费这个消息

          官方解释:转发消息到routingKey中指定的队列

                要求队列绑定时使用的bindingKey和发送时使用routingKey的保持一致,保证只有key匹配的队列中才可以进行收发消息

      fanout:把消息分给这个 交换机下面的所有 消息队列,值得注意的是 fanout 类型的 绑定 消息对列的时候不需要指配  Routing key

          特点:分配给全部的绑定在这个交换机上的消息队列。类似于发布订阅机制。

          官方解释:

            转发消息到与该交换机绑定的所有队列

            只要接收端和发送端使用同一个交换机,所有端都可以收发消息

      toppic: 上面理想个的综合,把消息分配绑定在这个交换机上的多个消息队列,但是 不一定是全部。可能一个也没有,可能全部都有。通过  带有通配符的 路由关键在来指定分配规则

          个 在 绑定 交换机 和queue 关系的时候 ,Routing key  配置成带有 通配符的 。发消息的 时候 发一个明确的消息 Routing key ,这样 这个消息就会分配到 合适的 消息队列中了。

          特点:分配给 多个 消息队列。可以灵活指定。

          官方解释:

            转发消息到所有关心routingkey中指定话题的队列,只要队列关心的主题(bindingkey)能与消息带的routingkey模糊匹配,就可以将消息发送到该队列。队列绑定时提供的主题可以使用"*"和"#"来的表示关键字,"*"表示一个关键字,

            "#"代表0个或若干个关键  字。关键字之间用"."分隔,如:有routingkey:"log","log.out","log.a.bug"; bindingKey为"log.*"的队列只能接收"log.out"的消息,而bindingKey为"log.#"的队列可以接收前面三个消息。

       header:(我也没用过 猜的)

          

          有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。

          通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

          我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)

          是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

          头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,

          它们甚至可以是整数或者哈希值(字典)等。

          BindingBuilder.bind( queueWeixinPublicDL ).to( new HeadersExchange("") ).whereAny(new HashMap<>());

         

           解释:如果 消息中有属性匹配 header 那么就关系就成立 。

    关于  amqp  消息的 传递流程:

      生产者连接到消息服务器(broker)  --> 生产者发送消息到 交换机( exchange) --> 交换机 把消息路由给绑定在其上的消息队列(queue)  -->  消费者连接到 消息服务器 从 消息对列中取出消息- -> 消费并且告诉 消息队列这个消息已经正常消费(ack)

    一些 amqp 的 名词解释:

    • Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
    • Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
    • Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
    • Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
    • Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
    • Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
    • Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

     下面的例子都是 基于rabbit mq 的

    传统xml 方式的  配置方式:

    <!-- 容邦消息队列 -->	
    	<rabbit:queue name="rbNotificationEventQueue" durable="true"
    		auto-delete="false" exclusive="false" />
    	<!-- 容邦消息死信队列 -->	
    	<rabbit:queue name="rbNotificationEventQueueDL" durable="true" auto-delete="false" exclusive="false">
    	    <rabbit:queue-arguments>
    	        <entry key="x-dead-letter-exchange" value="rbNotificationEventExchange"/>
    	    	<entry key="x-dead-letter-routing-key" value="rbNotificationEventQueue"/>
    	    </rabbit:queue-arguments>
        </rabbit:queue>	
    		
    
    	<!-- 容邦通知查询事件交换机 -->
    	<rabbit:direct-exchange name="rbNotificationEventExchange"
    		durable="true" auto-delete="false">
    		<rabbit:bindings>
    			<rabbit:binding queue="rbNotificationEventQueue" key="rbNotificationEventQueue" />
    			<rabbit:binding queue="rbNotificationEventQueueDL" key="rbNotificationEventQueueDL" />
    		</rabbit:bindings>
    	</rabbit:direct-exchange>
    	
    	<!--监听配置 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory" acknowledge="manual">
    		<rabbit:listener queues="rbNotificationEventQueue" ref="queueListenter" />
    	</rabbit:listener-container>
    

      

    解释:定义了一个 直连交换机(rbNotificationEventExchange) ,上面绑定了两个消息队列。rbNotificationEventQueue 是一个正常队列,rbNotificationEventQueueDL 是一个死信队列,这个死信队列 会在消息超时 的时候,自动转发到指定的队列。定义了一个消费者来消费rbNotificationEventQueue 里面的消息。

    纯配置类的配置方式:

    @Configuration
    public class RabbitMQConfig2 {
    
    	public static final String sys_exchange  = "sys_exchange";
    
        
        public static final String queueMessageSysProfitRecordDay = "queueMessageSysProfitRecordDay";
    
    
        /**
         * 死信
         */
        public static final String queueMessageSysProfitRecordDay_DL = "queueMessageSysProfitRecordDay_DL";
    
        
        /**
         * 交换
         * @return
         */
        @Bean
        public DirectExchange sysExchange() {
            return new DirectExchange(RabbitMQConfig2.sys_exchange, true, false);
        }
    
        
        /**
         * 正常的队列
         * @return
         */
        @Bean
        public Queue queueMessageSysProfitRecordDay() {
        	return new Queue( RabbitMQConfig2.queueMessageSysProfitRecordDay );
        }
    
        /**
         *  死信队列
         */
        
        @Bean
        public Queue queueMessageSysProfitRecordDayDL() {
        	Map<String, Object> arguments = new HashMap<>();
    		arguments.put("x-dead-letter-exchange", RabbitMQConfig2.sys_exchange);
    		arguments.put("x-dead-letter-routing-key", RabbitMQConfig2.queueMessageSysProfitRecordDay);
    		return new Queue(RabbitMQConfig2.queueMessageSysProfitRecordDay_DL, true, false, false, arguments);
        }
    
        
        /**
         * 绑定
         */
        @Bean
        public Binding  bindingMessageSysProfitRecordDay(Queue queueMessageSysProfitRecordDay ,DirectExchange sysExchange ) {
            return BindingBuilder.bind( queueMessageSysProfitRecordDay ).to( sysExchange ).with( RabbitMQConfig2.queueMessageSysProfitRecordDay );
        }
    
        
        /**
         * 死信绑定
         */ 
        
        @Bean
        public Binding  bindingMessageSysProfitRecordDayDL(Queue queueMessageSysProfitRecordDayDL ,DirectExchange sysExchange ) {
            return BindingBuilder.bind( queueMessageSysProfitRecordDayDL ).to( sysExchange ).with( RabbitMQConfig2.queueMessageSysProfitRecordDay_DL );
        }
    
    }
    

     

    附带发送消息和延时消息的代码:

    @Component
    public class MqMessageService{
    
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	/**
    	 * 发送消息
    	 * @param message
    	 * @return
    	 */
    	public boolean send(  Message message ) {
    		try {
    			if( MessageType.QUEUE.equals( message.getMsgType() ) ) {
    				rabbitTemplate.convertAndSend( message.getQueue() , JSONObject.toJSONString(message) );
    				return true;
    			}else if( MessageType.EXCHANGE.equals( message.getMsgType() ) ) {
    				sendExchangeMessage( message );
    				return true;
    			}else {
    				System.out.println( "暂时不处理" );
    				return true;
    			}
    		}catch (Exception e) {
    			System.out.println( e.getStackTrace() );
    			return false;
    		}
    	}
    	
    	/**
    	 *  发送Exchange的消息
    	 * @param message
    	 */
    	private void sendExchangeMessage(  Message message  ) {
    		if( message.getDelayTime() != null ) {
    			String delayTime = message.getDelayTime().toString();
    			MessagePostProcessor processor = new MessagePostProcessor(){
    				@Override
    				public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
    					message.getMessageProperties().setExpiration( delayTime );
    			        return message;
    				}
    		    };
    			rabbitTemplate.convertAndSend( message.getExchange() , message.getRoutingKey() , JSONObject.toJSONString(message) ,processor );
    			return;
    		}
    		rabbitTemplate.convertAndSend( message.getExchange() , message.getRoutingKey() , JSONObject.toJSONString(message) );
    	}
    
    }
    

      

    附带消息对象:

    public class Message {
    	private Long id;
    	
    	private String exchange;
    	
    	private String routingKey;
    
    	private String queue;
    	
    	private MessageType msgType;
    	
    	/**
    	 * 只有 exchange 类型的有延时
    	 */
    	private Long  delayTime;
    
    	public Message() {
    		this.id = SnGeneratorUtil.getId();
    	}
    	
    	public Long getId() {
    		return id;
    	}
    
    	@Override
    	public String toString() {
    		return "Message [id=" + id + ", exchange=" + exchange + ", routingKey=" + routingKey + ", queue=" + queue
    				+ ", msgType=" + msgType + ", delayTime=" + delayTime + "]";
    	}
    
    	public void setId(Long id) {
    		this.id = id;
    	}
    
    	public String getExchange() {
    		return exchange;
    	}
    
    	public void setExchange(String exchange) {
    		this.exchange = exchange;
    	}
    
    	public String getRoutingKey() {
    		return routingKey;
    	}
    
    	public void setRoutingKey(String routingKey) {
    		this.routingKey = routingKey;
    	}
    
    	public String getQueue() {
    		return queue;
    	}
    
    	public void setQueue(String queue) {
    		this.queue = queue;
    	}
    
    	public MessageType getMsgType() {
    		return msgType;
    	}
    
    	public void setMsgType(MessageType msgType) {
    		this.msgType = msgType;
    	}
    
    	/**
    	 * 只有 exchange 类型的有延时
    	 */
    	public Long getDelayTime() {
    		return delayTime;
    	}
    
    	public void setDelayTime(Long delayTime) {
    		this.delayTime = delayTime;
    	}
    
    }
    

      附带消息类型枚举:

    public enum MessageType {
    
    	
    	QUEUE("队列"),
    	EXCHANGE("交换机");
    
        private String mark;
    
        MessageType(String mark ) {
            this.mark = mark;
        }
    
        public String getMark() {
            return mark;
        }
    }
    

      附带发送消息代码: 备注,这是发了一个上面的message 的子类消息 ,需要自定义。并且这个消息是延时10秒的。

    MessageSysProfitRecordDay message = new MessageSysProfitRecordDay();
    		message.setMsgType( MessageType.EXCHANGE );
    		message.setExchange( RabbitMQConfig.sys_exchange );
    		message.setRoutingKey( RabbitMQConfig.queueMessageSysProfitRecordDay_DL );
    		
    		message.setSourceId(sourceId);
    		message.setYear(year);
    		message.setMonth(month);
    		message.setDay(day);
    		message.setType( type.name() );
    		message.setAmountType( amountType.name() );
    		message.setAmount( amount );
    		message.setDelayTime( 10000L );
    		
            mqMessageService.send( message );
    

      附带接受消息的代码:

      

    	@RabbitListener(queues = RabbitMQConfig.queueMessageSysProfitRecordDay)
    	@RabbitHandler
    	public void orderAward(String messageStr) {
    		MessageSysProfitRecordDay message = null;
    		if( logger.isInfoEnabled() ) {
    			logger.info("收到一天系统日结算:{}",messageStr );
    		}
    	}
    

      

    备注:spring boot集成的 自动 ack(在没抛出异常的情况下),别的方式需要手动ack。

  • 相关阅读:
    Django model 字段类型及选项解析(二)
    MYSQL数据库设计规范与原则
    爬虫相关模块命令回顾
    Django model 字段类型及选项解析(一)
    Django自身安全机制-XSS和CSRF
    分页
    css样式大全
    HTML标签和属性大全
    IsPost 判断
    HTML中夹杂CODE
  • 原文地址:https://www.cnblogs.com/cxygg/p/9521224.html
Copyright © 2011-2022 走看看