zoukankan      html  css  js  c++  java
  • RabbitMQ六中工作模式-发布和订阅模式

    RabbitMQ 发布和订阅模式

    此模式会把消息发布给所有的消费者, 同一条消息, 任何消费者都能收到

    在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。

    为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。

    在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。

    最终, 消息会被广播到所有消息接受者

    Exchanges 交换机

    RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

    有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

    fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。

    我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。

    但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。

    要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

    //自动生成队列名
    //非持久,独占,自动删除
    String queueName = ch.queueDeclare().getQueue();
    

    绑定 Bindings

    我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

    //指定的队列,与指定的交换机关联起来
    //成为绑定 -- binding
    //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
    ch.queueBind(queueName, "logs", "");
    1234
    

    现在, logs交换机将会向我们指定的队列添加消息

    列出绑定关系: (centos下执行)

    rabbitmqctl list_bindings

    代码

    生产者

    生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

    package rabbitmq.publishsubscribe;
    
    import java.util.Scanner;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setPort(5672);
    		f.setUsername("admin");
    		f.setPassword("admin");
    		
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为logs的交换机,交换机类型为fanout
    		//这一步是必须的,因为禁止发布到不存在的交换。
    		ch.exchangeDeclare("logs", "fanout");
    		
    		while (true) {
    			System.out.print("输入消息: ");
    			String msg = new Scanner(System.in).nextLine();
    			if ("exit".equals(msg)) {
    				break;
    			}
    			
    			//第一个参数,向指定的交换机发送消息
    			//第二个参数,不指定队列,由消费者向交换机绑定队列
    			//如果还没有队列绑定到交换器,消息就会丢失,
    			//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
    			ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
    			System.out.println("消息已发送: "+msg);
    		}
    
    		c.close();
    	}
    }
    

    消费者

    如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。
    ReceiveLogs.java代码:

    package rabbitmq.publishsubscribe;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("192.168.64.140");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		Connection c = f.newConnection();
    		Channel ch = c.createChannel();
    		
    		//定义名字为 logs 的交换机, 它的类型是 fanout
    		ch.exchangeDeclare("logs", "fanout");
    		
    		//自动生成对列名,
    		//非持久,独占,自动删除
    		String queueName = ch.queueDeclare().getQueue();
    		
    		//把该队列,绑定到 logs 交换机
    		//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
    		ch.queueBind(queueName, "logs", "");
    		
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume(queueName, true, callback, cancel);
    	}
    }
    
  • 相关阅读:
    LOJ P10004 智力大冲浪 题解
    LOJ P10011 愤怒的牛 题解
    LOJ P10002 喷水装置 题解
    洛谷 P2279 [HNOI2003]消防局的设立 题解
    洛谷 P5640 【CSGRound2】逐梦者的初心 题解
    洛谷 P2827 蚯蚓 题解
    [SHOI2012]魔法树
    浅析树链剖分
    [Bzoj1731]排队布局
    [POJ-1201]Intervals
  • 原文地址:https://www.cnblogs.com/zpKang/p/13584695.html
Copyright © 2011-2022 走看看