zoukankan      html  css  js  c++  java
  • RabbitMQ六中工作模式-主题模式

    RabbitMQ 主题模式

    在路由模式中,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。

    虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

    在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。

    这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。

    要在日志系统中实现这一点,我们需要了解更复杂的Topic交换机。

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

    • * 可以通配单个单词。
    • # 可以通配零个或多个单词。

    用一个例子来解释这个问题是最简单的

    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    • Q1对所有橙色的动物感兴趣。
    • Q2想接收关于兔子和慢速动物的所有消息。

    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    代码

    生产者

    package rabbitmq.topic;
    
    import java.util.Random;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//参数1: 交换机名
    		//参数2: 交换机类型
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".contentEquals(msg)) {
    				break;
    			}
    			System.out.print("输入routingKey: ");
    			String routingKey = new Scanner(System.in).nextLine();
    			
    			//参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
    			ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
    			
    			System.out.println("消息已发送: "+routingKey+" - "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    package rabbitmq.topic;
    
    import java.io.IOException;
    import java.util.Scanner;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		System.out.println("输入bindingKey,用空格隔开:");
    		String[] a = new Scanner(System.in).nextLine().split("\s");
    		
    		//把该队列,绑定到 topic_logs 交换机
    		//允许使用多个 bindingKey
    		for (String bindingKey : a) {
    			ch.queueBind(queueName, "topic_logs", bindingKey);
    		}
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				String routingKey = message.getEnvelope().getRoutingKey();
    				System.out.println("收到: "+routingKey+" - "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    
  • 相关阅读:
    Oracle的连接时ORA-12519错误
    MongoDB(4.4)使用
    MongoDB安装
    SpringBoot_加密配置中的敏感信息
    SpringBoot_配置文件详解
    Nginx入门
    SpringBoot+Redis集成简单测试
    Redis安装
    RabbitMQ消息中间件(第四章)第四部分-SpringBoot整合RabbitMQ
    Mysql try restarting transaction怎么解决,排查事务锁表
  • 原文地址:https://www.cnblogs.com/zpKang/p/13591428.html
Copyright © 2011-2022 走看看