参考:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
源码:https://github.com/zuzhaoyue/JavaDemo
先决条件
本教程假定RabbitMQ 在标准端口(5672)上的本地主机上安装并运行。如果您使用不同的主机,端口或证书,则连接设置需要进行调整。
介绍
RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把你想要发送的邮件放在邮箱里时,你可以确定邮递员最终会将邮件递交你的收件人。与此类似,RabbitMQ既是邮政信箱、又是邮局和邮递员。
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块 - 消息。
RabbitMQ和一般的消息传递会使用一些术语:
-
生产(Producing)意味着发送。一个发送消息的程序就是一个生产者:
-
队列(queue)相当于上面例子里的邮箱。尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。一个队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。可以同时有许多生产者(producer)向一个队列发送消息,当然也可以同时有许多消费者尝试从一个队列接收数据。以下是队列的表示方式:
-
消费(consuming)与接受(receiving)意思差不多。一个消费者的主要功能是等待接收信息:
请注意,生产者,消费者和broker不必驻留在同一主机上;
在本教程中,我们将用Java编写两个程序; 一个生产者,用于发送单个消息。一个消费者,用于接收消息并将其打印出来。
在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列
![(P)→[|||]→(C)](https://www.rabbitmq.com/img/tutorials/python-one.png)
知道了上面这些后就可以写代码了~
发出
![(P) - > [|||]](https://www.rabbitmq.com/img/tutorials/sending.png)
我们会调用消息发布者(生产者)来发送,调用消息使用者(消费者) Recv来消费。发布者将连接到RabbitMQ,发送一条消息,然后退出。
发布者的代码如下:
/** * Created by zuzhaoyue on 18/5/15. */ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello1"; public static void main(String[] argv) throws Exception { //创建一个连接服务器的连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //为了发送信息,我们必须声明一个队列,然后才可以向这个队列中发消息,这个声明是幂等的,也就是说仅会在这个队列不存在时,才会创建一个队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); //最后,我们关闭连接和队列 channel.close(); connection.close(); } }
接收
Rabbitmq会推送消息给消费者,因此与发布单个消息的发布者不同,我们不会关闭消费者,而是让它一直进行以监听消息并将其打印出来。

代码如下
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by zuzhaoyue on 18/5/15. */ public class Recv { private final static String QUEUE_NAME = "hello1"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //以下的defaultconsumer实现了consumer这个接口,这个接口被用来缓冲服务器推送过来的信息 //一开始的set up和刚刚的send.java里的相似:1.打开一个连接,2.声明一个队列(这个队列名要和刚刚的队列名相同) //注意:我们在这里声明队列,因为我们可能在生产者之前开始消费 //我们告诉服务器从队列向我们传送消息,既然它会异步传送,我们以对象的形式提供一个回调,来缓冲这些消息,直到我们准备使用它们,这正是defaultconsumer做的事情 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; String result = channel.basicConsume(QUEUE_NAME, true, consumer); System.out.println("result:" + result); } }
执行send.java的main()方法后,访问127.0.0.1:15672
显示:
说明队列中有了一个消息,点开后我们可以看到该消息内容是hello world,如下图:
然后运行recv.java的main()方法,打开消费者,页面显示如下:
该消息已经被成功消费。