RabbitMQ - Helloword
简介
RabbitMQ是消息代理:它接受并转发消息。 您可以将其视为邮局:你将要发布的邮件放在邮箱中,可以确保Mailperson先生或女士最终将邮件传递给收件人。 以此类推,RabbitMQ是一个邮政信箱,一个邮局和一个邮递员。
RabbitMQ与邮局之间的主要区别在于,它不处理纸张,而是接收,存储和转发数据消息。
常用术语
生产者
将数据发送到Rabbit的程序是生产者。
消费者
消费与生产具有相似的含义。消费者主要是等待接收消息的程序:
交换机
生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去.
路由键
一个String值,用于定义路由规则,在队列绑定的时候需要指定路由键,在生产者发布消息的时候需要指定路由键,当消息的路由键和队列绑定的路由键匹配时,消息就会发送到该队列。
队列
队列是RabbitMQ内部的邮箱的名称。 尽管所有的消息都流经RabbitMQ和您的应用程序,但它们只能存储在队列中。 队列仅受主机内存和磁盘限制的约束,它实质上是一个大消息缓冲区。 许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。 这就是我们表示队列的方式:
请注意,生产者,消费者和RabbitMQ不必位于同一主机上。 实际上胜场中大多数的应用程序是这样的, 一个应用程序既可以是生产者,也可以是消费者。
绑定
Binding并不是一个概念,而是一种操作,RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来(Exchange—>Routing Key—>Queue),这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind方法将Exchange、Routing Key、Queue绑定起来.
虚拟主机
每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost)。每一个vhost都独立的拥有自己的交换机、队列、绑定等,拥有自己的权限机制。RabbitMQ提供了开箱即用的默认的虚拟主机“/”。vhost就像是mysql中的数据库一样,每个数据库之间是相互隔离的,每个数据库可以拥有自己的用户和权限等等。
Hello World
在本教程的这一部分中,我们将用Java编写两个程序。 发送一条消息的生产者和接收消息并打印出来的消费者。
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。 中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。
以来的jar包:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/rabbitmq-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-client</artifactId>
<version>1.2.0</version>
</dependency>
现在我们有了Java客户端及其依赖项,我们可以编写一些代码。
生产者
生产者用于发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello"; //队列的名称
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); //RabbitMQ的服务器地址
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明要发送到的队列
String message = "Hello World!"; //要发送的消息内容
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); //发送消息
System.out.println(" [x] Sent '" + message + "'");
}
}
}
接下来,我们创建一个通道,该通道是完成工作的大多数API所在的位置。 注意我们可以使用try-with-resources语句,因为Connection和Channel都实现了java.io.Closeable。 这样,我们无需在代码中显式关闭它们。
声明队列是幂等的-仅当队列不存在时才创建。 消息内容是一个字节数组,因此您可以在此处编码任何内容。
发送没有响应
如果这是您第一次使用RabbitMQ,但没有看到“已发送”消息,那么您可能会不知所措,想知道可能是什么问题。 代理可能是在没有足够的可用磁盘空间的情况下启动的(默认情况下,它至少需要200 MB的可用空间),因此拒绝接受消息。 检查代理日志文件以确认并减少限制(如有必要)。 配置文件文档将向您展示如何设置disk_free_limit。
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello"; //队列名称,必须和生产者在同一个队列中。
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);//这与生产者发布到的队列匹配。
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
这就是我们的消费者。 我们的消费者监听来自RabbitMQ的消息,因此与发布单个消息的发布者不同,我们将使消费者保持运行状态以监听消息并打印出来。
请注意,我们也在这里声明队列。 因为我们可能在发布者之前启动使用者,所以我们希望在尝试使用队列中的消息之前确保队列存在。
为什么不使用try-with-resource语句自动关闭通道和连接? 这样我们就可以简单地使程序继续运行,关闭所有内容并退出! 这将很尴尬,因为我们希望在消费者异步侦听消息到达时,该过程保持有效。
我们将告诉服务器将队列中的消息传递给我们。 由于它将异步地向我们发送消息,因此我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备使用它们为止。 那就是DeliverCallback子类所做的。