Rabbit 默认的端口5672
默认启动方式如下(可登录容器后使用简单的命令进行管理)
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
rabbit cli 工具
- rabbitmqctl for service management and general operator tasks
- rabbitmq-diagnostics for diagnostics and health checking
- rabbitmq-plugins for plugin management
- rabbitmq-queues for qs tasks on queues, in particular quorum queues
- rabbitmq-upgrade for maintenance tasks related to upgrades
使用带管理面板的方式启动(推荐),默认账号密码guest
/ guest
,访问地址http://ip:port
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
常用命令
# 创建一个消息队列,durable=true
rabbitmqadmin declare queue name=队列2 durable=true
# 查看消息队列列表
rabbitmqadmin list queues
# 向“队列2”中发布一条消息
rabbitmqadmin publish routing_key=队列2 payload="{name:mmc}"
# 获取指定消息队列的队头数据,默认不消费
rabbitmqadmin get queue=队列2
# 获取指定消息队列的队头数据,ackmode=ack_requeue_false 消费数据
rabbitmqadmin get queue=队列2 ackmode=ack_requeue_false
Java Client 测试
基础的功能为新增队列、删除队列、发布 Exchange、发布消息、获取指定队列的队头数据等,具体的操作代码如下所示:
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
public class RabbitMQ {
/**
* 新增队列
*/
@Test
public void addQueue() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
// 连接消息队列
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://guest:guest@localhost:5672");
Connection connection = connectionFactory.newConnection();
// 使用channel操作队列
Channel channel = connection.createChannel();
// 使用queueDeclare发布新的队列
System.out.println(channel.queueDeclare("测试队列新增2", true, false, false, null));
// 关闭资源
channel.close();
connection.close();
}
/**
* 删除队列
*/
@Test
public void deleteQueue() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
// 连接消息队列
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://guest:guest@localhost:5672");
Connection connection = connectionFactory.newConnection();
// 使用channel操作队列
Channel channel = connection.createChannel();
// 使用queueDelete删除队列
System.out.println(channel.queueDelete("测试队列新增2"));
// 关闭资源
channel.close();
connection.close();
}
/**
* 发布 Exchange
*/
@Test
public void declareExchange() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
// 连接消息队列
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://guest:guest@localhost:5672");
Connection connection = connectionFactory.newConnection();
// 使用channel操作队列
Channel channel = connection.createChannel();
System.out.println(channel.exchangeDeclare("新增Exchange2", "direct"));
// 关闭资源
channel.close();
connection.close();
}
/**
* 发布消息
*/
@Test
public void publishMessage() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
// 连接消息队列
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://guest:guest@localhost:5672");
Connection connection = connectionFactory.newConnection();
// 使用channel操作队列
Channel channel = connection.createChannel();
byte[] messageBodyBytes = "Hello Word".getBytes();
// MessageProperties.PERSISTENT_TEXT_PLAIN 指定消息持久化
// channel.basicPublish("", "q1", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
// basicPublish("ExchangeName", "RoutingKey", 消息参数配置, 消息内容 byte[])
channel.basicPublish("", "测试队列新增2", null, messageBodyBytes);
// 关闭资源
channel.close();
connection.close();
}
/**
* 获取指定队列的队头数据
*/
@Test
public void receiveMessage() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
// 连接消息队列
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://guest:guest@localhost:5672");
Connection connection = connectionFactory.newConnection();
// 使用channel操作队列
Channel channel = connection.createChannel();
boolean autoAck = false; // false 只获取队头元素,不删除;true 获取队头元素并删除
GetResponse response = channel.basicGet("测试队列新增2", autoAck); // 第一个参数为队列名称
if (response != null) {
byte[] body = response.getBody();
System.out.println(new String(body));
}
// 关闭资源
channel.close();
connection.close();
}
}
SpringBoot 整合 rabbitMQ
Demo1 :使用默认交换机
在 pom.xml 中导入spring-rabbit依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
文件application.properties 配置,rabbitMQ默认配置如下,若未修改可无需添加如下配置
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.port=5672
发送消息,使用下面的单元测试来发送单条数据到消息队列中
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringbootRabbitMQ {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void sendMessage() {
amqpTemplate.convertAndSend("", "q1", "Hello World");
}
}
接受消息,监听队列中的数据,当队列不为空的时候自动获取队列中的数据
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "q1")
void listen(String in) {
System.out.println(in);
}
}
Demo2:使用交换机并通过对象来存储数据
自定义交换机、队列的名字 Constant.java
public class Constant {
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE = "topic.queue";
}
编写RabbitConfig.java 配置文件,配置用到的交换机、队列,并将交换机与队列进行绑定
import cn.getcharzp.common.Constant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean()
public TopicExchange topicExchange() {
return new TopicExchange(Constant.TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue() {
return new Queue(Constant.TOPIC_QUEUE);
}
@Bean
public Binding exchangeBindTopic(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with(Constant.TOPIC_QUEUE);
}
}
编辑发送、接受消息的实体类,我这里用的是User.java
import java.io.Serializable;
public class User implements Serializable {
private String name;
private Integer age;
private String password;
public User(String name, Integer age, String password) {
this.name = name;
this.age = age;
this.password = password;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
", password='" + password + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
编辑发送消息的单元测试,实际场景根据实际业务来书写,主要使用AmqpTemplate来进行消息的发送
import cn.getcharzp.common.Constant;
import cn.getcharzp.pojo.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringbootRabbitMQ {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void sendMessage() {
try {
amqpTemplate.convertAndSend(Constant.TOPIC_EXCHANGE, Constant.TOPIC_QUEUE, new User("GetcharZp", 20, "abcdef"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
通过@RabbitListener实现消息的接受
import cn.getcharzp.common.Constant;
import cn.getcharzp.pojo.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = Constant.TOPIC_QUEUE)
void listen(User user) {
try {
System.out.println(user.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}