一、入门实例
1)引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
2)创建消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.申明队列
String queueName = "quickStart";
// 5.绑定队列
channel.queueDeclare(queueName, true, false, false, null);
// 6.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
// 7.循环消费
System.err.println("消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端消费: " + msg);
}
}
}
3)创建生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.循环发送消息
for (int i = 0; i < 5; i++) {
String msg = "Hello RabbitMQ" + i;
channel.basicPublish("", "quickStart", null, msg.getBytes());
}
// 5.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
- 先启动消费者,然后启动生产者,查看控制台输出:
消费端启动
消费端消费: Hello RabbitMQ0
消费端消费: Hello RabbitMQ1
消费端消费: Hello RabbitMQ2
消费端消费: Hello RabbitMQ3
消费端消费: Hello RabbitMQ4
- 注1:如果生产者先启动,这时候因为找不到队列,所以消息会被 MQServer 丢弃(mandatory 属性为false 的情况 )
- 注2:启动过一次消费者后,因为绑定队列时,申明了队列持久化,所以可以找到队列,消息会存储在 MQServer 中,待消费者启动后即可被消费掉。
二、 常用 API
2.1 ConnectionFactory
连接工厂,用于创建连接对象。
方法名 | 作用 |
---|---|
setHost(String host) | 设置主机 IP |
setPort(int port) | 设置主机端口 |
setVirtualHost(String virtualHost) | 设置虚拟主机 |
newConnection() | 创建 RabbitMQ 连接对象 |
setUsername(String username) | 设置用户名 |
setPassword(String password) | 设置密码 |
setAutomaticRecoveryEnabled(boolean automaticRecovery) | 设置是否自动恢复 |
setClientProperties(Map clientProperties) | 设置客户端属性 |
setConnectionTimeout(int timeout) | 设置连接超时时间 |
setRequestedChannelMax(int requestedChannelMax) | 设置最大请求信道 |
2.2 Connection
RabbitMQ 连接对象。
方法名 | 作用 |
---|---|
createChannel() | 创建数据通信信道对象 |
getPort() | 获取主机端口 |
getChannelMax() | 获取最大请求信道 |
getClientProperties() | 获取客户端属性 |
2.3 Channel
数据通信信道对象,用来发送和接收消息。
方法:queueDeclare
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
作用:
声明一个队列
参数 | 参数类型 | 含义 |
---|---|---|
queue | String | 队列名称 |
durable | boolean | 是否持久化,如果设置为true,服务器重启了队列仍然存在 |
exclusive | boolean | 是否为独享队列(排他性队列),只有自己可见的队列,即不允许其它用户访问 |
autoDelete | boolean | 当没有任何消费者订阅该队列时,自动删除该队列 |
arguments | Map | 其他参数 |
方法:basicQos
void basicQos(int prefetchCount) throws IOException;
作用:
设置一次获取多少个消息
参数 | 含义 |
---|---|
prefetchCount | RabbitMQ 同一时间最多推送不多于 prefetchCount 个消息 |
方法:basicConsume
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
作用:
订阅消息并消费
参数 | 含义 |
---|---|
queue | 订阅队列名称 |
autoAck | 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false。详情看后续《消息确认机制》 |
callback | 接收到消息之后执行的回调方法 |
方法:basicPublish
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
作用:
生产发送一个消息
参数 | 含义 |
---|---|
exchange | 指定转发器名称 ExchangeName,如果使用空字符串会交给默认的 Exchange 处理 |
routingKey | 队列名称 |
props | 和消息有关的其他配置参数,路由报头等 |
body | 消息内容 |
方法:basicAck
void basicAck(long deliveryTag, boolean multiple) throws IOException;
作用:
处理完成消息后,手动向服务端发送一次应答。
参数 | 含义 |
---|---|
deliveryTag | 当前消息唯一Id |
multiple | 是否把小于当前deliveryTag的小于都应答了。注:这个要在打开应答机制后使用 |