第一步导入依赖 :
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version>
</dependency>
第二步编写一个连接类获取MQ的连接:ConnectionUtil
package com.example.springboot_rabbitmq.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 通过工程获取连接 Connection connection = factory.newConnection(); if (connection.isOpen()){ System.out.println("连接成功"); }else{ System.out.println("连接失败"); } return connection; } public static void main(String[] args) { //测试连接 try { System.out.println(ConnectionUtil.getConnection()); } catch (Exception e) { e.printStackTrace(); } } }
第三步:启动本地服务端
测试连接:
连接成功没毛病
下面使用浏览器访问:http://localhost:15672
window下安装RabbitMQ教程:https://www.cnblogs.com/nongzihong/p/11578255.html
第四步:生产者发送消息到队列
package com.example.springboot_rabbitmq.simple; import com.example.springboot_rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //关闭通道和连接 channel.close(); connection.close(); } }
管理工具中查看消息:
第五步:消费者从队列中获取消息
package com.example.springboot_rabbitmq.simple; import com.example.springboot_rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
信息获取成功