zoukankan      html  css  js  c++  java
  • 官网英文版学习——RabbitMQ学习笔记(五)Publish/Subscribe

    发布/订阅模式:把一个消息发送给多个消费者。

            前几篇文章的思想是,我们好像看到了生产者将消息直接发送给queue,然后消费者也从queue中进行消费。其实并非如此,RabbitMQ中的消息传递模型的核心思想是,生产者永远不会直接向队列发送任何消息。实际上,生产者甚至不知道消息是否会被传递到任何队列。前几篇没有定义交换器,那么就会采用默认的交换器,进行路由信息。

            在学习该模型之前我们需要了解一些新的概念,交换器、绑定、路由

            相反,生产者只能发送消息给交换器。交换器做一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它将它们推到队列中。交换器必须确切地知道如何处理接收到的消息。是否应该将它附加到特定的队列?是否应该附加到许多队列?或者它应该被丢弃。该规则由exchange类型定义。

    可以使用的交换类型有:direct、topic、header和fanout。查找相关资料这几种类型具体意义如下:

    Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

    1. fanout:所有bind到此exchange的queue都可以接收消息(纯广播的,所有消费者都能收到消息)
    2. direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
    3. topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
    4. headers:通过headers 来决定把消息发给哪些queue(这个很少用,一般情况下,我们用不到)

    绑定

    我们已经创建了fanout交换器和队列。现在我们需要告诉交换器向我们的队列发送消息。交换和队列之间的关系称为绑定。

    下面我们贴上代码:

    发布者:

    package com.rabbitmq.HelloWorld;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Publish {
    	
    	private static final String EXCHANGE_NAME = "exchangeA";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		创建工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		创建连接
    		Connection connetion = factory.newConnection();
    //		获得信道
    		Channel channel = connetion.createChannel();
    //		声明交换器(声明了一个名字位exchangeA,类型位fanout的交换器)
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    		String message = "555,2,2,33,66";
    //		发送消息
    		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    		System.out.println(" [x] Sent '" + message + "'");
    		channel.close();
    		connetion.close();
    	}
    
    }

    订阅者(我们在本项目中和另一个项目中均创建了订阅者,最后都收到了发布者发出的消息):

    package com.rabbitmq.HelloWorld;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Subscribe {
    	
    	private static final String EXCHANGE_NAME = "exchangeA";
    	private static final String QUEUE_NAME = "queueA";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		创建工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		创建连接
    		Connection connetion = factory.newConnection();
    //		获得信道
    		Channel channel = connetion.createChannel();
    //		声明交换器(声明了一个名字位exchangeA,类型位fanout的交换器)
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    //		声明一个队列,在此采用临时队列
    		String queueName = channel.queueDeclare().getQueue();
    //		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    //		队列和交换器进行绑定,未设定路由键
    		channel.queueBind(queueName, EXCHANGE_NAME, "");
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    		Consumer consumer = new DefaultConsumer(channel){
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    					BasicProperties properties, byte[] body) throws IOException {
    				// TODO Auto-generated method stub
    				String message = new String(body,"utf-8");
    				System.out.println("[x] received'"+message+"'");
    			}
    		};
    		channel.basicConsume(queueName, consumer);
    	}
    
    }
    
  • 相关阅读:
    2021.1.28 个人rating赛补题报告
    2021.1.23 个人rating赛补题报告
    2021.1.23 个人rating赛补题报告
    2020.12.14 个人训练赛补题报告
    2020.11.28 2020团体程序设计天梯赛补题报告
    2020.12.3 Codeforces Beta Round #73(Div2)补题报告
    Xhorse VVDI Prog V5.0.6 is Ready for BCM2 Adapter
    Program 2021 Ford Bronco All Keys Lost using VVDI Key Tool Plus
    Xhorse VVDI Prog V5.0.4 Software Update in July 2021
    How to use Xhorse VVDI2 to Exchange BMW FEM/BDC Module?
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9193091.html
Copyright © 2011-2022 走看看