zoukankan      html  css  js  c++  java
  • RabbitMQ (五)主题(Topic)

    转载请标明出处:http://blog.csdn.net/lmj623565791/article/details/37706355

    上一篇博客中。我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,仅仅能够无脑的转发,假设你还不了解:RabbitMQ (四) 路由选择 (Routing)

    尽管使用direct类型改良了我们的系统。可是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
    在我们的日志系统中,我们有可能希望不仅依据日志的级别并且想依据日志的来源进行订阅。这个概念相似unix工具:syslog。它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
    这样可能给我们很多其它的灵活性:我们可能仅仅想订阅来自’cron’的致命错误日志,而不是来自’kern’的。
    为了在我们的系统中实现上述的需求,我们须要学习略微复杂的主题类型的转发器(topic exchange)。

    1、 主题转发(Topic Exchange)
    发往主题类型的转发器的消息不能任意的设置选择键(routing_key)。必须是由点隔开的一系列的标识符组成。标识符能够是不论什么东西,可是一般都与消息的某些特性相关。一些合法的选择键的样例:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你能够定义不论什么数量的标识符。上限为255个字节。
    绑定键和选择键的形式一样。

    主题类型的转发器背后的逻辑和直接类型的转发器非常相似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。须要注意的是:关于绑定键有两种特殊的情况。
    *能够匹配一个标识符。
    #能够匹配0个或多个标识符。

    2、 图解:
    我们准备发送关于动物的消息。消息会附加一个选择键包括3个标识符(两个点隔开)。第一个标识符描写叙述动物的速度。第二个标识符描写叙述动物的颜色。第三个标识符描写叙述动物的物种:<speed>.<color>.<species>。
    我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。


    能够简单的觉得:
    Q1对全部的橙色动物感兴趣。
    Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
    一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。还有一方面quick.orange.fox仅仅会被转发到Q1。lazy.brown.fox将会被转发到Q2。

    lazy.pink.rabbit尽管与两个绑定键匹配,可是也仅仅会被转发到Q2一次。quick.brown.fox不能与不论什么绑定键匹配,所以会被丢弃。


    假设我们违法我们的约定。发送一个或者四个标识符的选择键,相似:orange,quick.orange.male.rabbit,这些选择键不能与不论什么绑定键匹配。所以消息将会被丢弃。


    还有一方面,lazy.orange.male.rabbit,尽管是四个标识符,也能够与lazy.#匹配,从而转发至Q2。


    注:主题类型的转发器非常强大,能够实现其它类型的转发器。
    当一个队列与绑定键#绑定,将会收到全部的消息。相似fanout类型转发器。
    当绑定键中不包括不论什么#与*时。相似direct类型转发器。


    3、 完整的样例

    发送端EmitLogTopic.java:

    package com.zhy.rabbit._05_topic_exchange;
    
    import java.util.UUID;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogTopic
    {
    
    	private static final String EXCHANGE_NAME = "topic_logs";
    
    	public static void main(String[] argv) throws Exception
    	{
    		// 创建连接和频道
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
    		String[] routing_keys = new String[] { "kernal.info", "cron.warning",
    				"auth.info", "kernel.critical" };
    		for (String routing_key : routing_keys)
    		{
    			String msg = UUID.randomUUID().toString();
    			channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
    					.getBytes());
    			System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
    		}
    
    		channel.close();
    		connection.close();
    	}
    }

    我们发送了4条消息,分别设置了不同的选择键。

    接收端1。ReceiveLogsTopicForKernel.java

    package com.zhy.rabbit._05_topic_exchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogsTopicForKernel
    {
    
    	private static final String EXCHANGE_NAME = "topic_logs";
    
    	public static void main(String[] argv) throws Exception
    	{
    		// 创建连接和频道
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    		// 声明转发器
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    		// 随机生成一个队列
    		String queueName = channel.queueDeclare().getQueue();
    		
    		//接收全部与kernel相关的消息
    		channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
    
    		System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
    
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		channel.basicConsume(queueName, true, consumer);
    
    		while (true)
    		{
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    			String message = new String(delivery.getBody());
    			String routingKey = delivery.getEnvelope().getRoutingKey();
    
    			System.out.println(" [x] Received routingKey = " + routingKey
    					+ ",msg = " + message + ".");
    		}
    	}
    }

    直接收和Kernel相关的日志消息。

    接收端2。ReceiveLogsTopicForCritical.java

    package com.zhy.rabbit._05_topic_exchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogsTopicForCritical
    {
    
    	private static final String EXCHANGE_NAME = "topic_logs";
    
    	public static void main(String[] argv) throws Exception
    	{
    		// 创建连接和频道
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    		// 声明转发器
    		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    		// 随机生成一个队列
    		String queueName = channel.queueDeclare().getQueue();
    
    		// 接收全部与kernel相关的消息
    		channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
    
    		System.out
    				.println(" [*] Waiting for critical messages. To exit press CTRL+C");
    
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		channel.basicConsume(queueName, true, consumer);
    
    		while (true)
    		{
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    			String message = new String(delivery.getBody());
    			String routingKey = delivery.getEnvelope().getRoutingKey();
    
    			System.out.println(" [x] Received routingKey = " + routingKey
    					+ ",msg = " + message + ".");
    		}
    	}
    }

    仅仅接收致命错误的日志消息。

    执行结果:

     [x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
     [x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
     [x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
     [x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

    --------------------------------------------------------------------------------------------------------------------

     [*] Waiting for messages about kernel. To exit press CTRL+C
     [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

    --------------------------------------------------------------------------------------------------------------------

     [*] Waiting for critical messages. To exit press CTRL+C
     [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

    能够看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。



  • 相关阅读:
    null in ABAP and nullpointer in Java
    SAP ABAP SM50事务码和Hybris Commerce的线程管理器
    Hybris service layer和SAP CRM WebClient UI架构的横向比较
    SAP ABAP和Linux系统里如何检查网络传输的数据量
    SAP CRM WebClient UI和Hybris的controller是如何被调用的
    SAP CRM和Cloud for Customer订单中的业务伙伴的自动决定机制
    SAP CRM WebClient UI和Hybris CommerceUI tag的渲染逻辑
    SAP BSP和JSP页面里UI元素的ID生成逻辑
    微信jsapi支付
    微信jsapi退款操作
  • 原文地址:https://www.cnblogs.com/mqxnongmin/p/10834904.html
Copyright © 2011-2022 走看看