RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
生产者:向RabbitMQ发生消息的程序
消费者:从RabbitMQ接受消息的程序
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>rabbitmq</groupId> <artifactId>rabbitmq-tutorials</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.2</version> </dependency> </dependencies> </project>
目录结构:
com.rabbitmq.tutorials.helloworld
|-Recv.java
|-Send.java
生产者代码
package com.rabbitmq.tutorials.helloworld; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { /** * 队列名称 */ private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //step 1: create a connection to the server ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103");//主机名称或IP地址 Connection connection = factory.newConnection(); Channel channel = connection.createChannel();//创建频道 //step 2: To send, we must declare a queue for us to send to; then we can publish a message to the queue: channel.queueDeclare(QUEUE_NAME, false, false, false, null);//如果队列已经存在不会再创建 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//发布消息 System.out.println(" [x] Sent '" + message + "'"); //step 3: Lastly, we close the channel and the connection; channel.close(); connection.close(); } }
消费者代码:
package com.rabbitmq.tutorials.helloworld; import com.rabbitmq.client.*; import java.io.IOException; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //step 1: create a connection to the server ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //step 2: To send, we must declare a queue for us to receive from; then we can receive a message from the queue: //请注意,我们也在这里声明队列。因为我们可能会在发布者之前启动消费者,所以我们希望确保队列存在,然后再试图使用消息。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //告诉服务器将队列中的消息传递给我们。由于它会异步推送消息,因此我们以对象的形式提供回调,缓冲消息直到准备好使用它们。这是一个DefaultConsumer子类所做的 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //消费 channel.basicConsume(QUEUE_NAME, true, consumer); } }
列车rabbit-server所有的队列:
[root@bogon ~]# rabbitmqctl list_queues Listing queues ... amq.gen-L5p1oIQL-h-_HEdxPMTizw 0 amq.gen-X_1bJzROsSXzqfHj6Y8L_w 0 amq.gen-brpJjZlamOzpzZh2i7Q79A 0 amq.gen-kPfAU8lXzROVMDSYnrncyg 0 task_queue 10 ...done.