pom文件:
-
<dependencies>
-
<dependency>
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>5.0.0</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.amqp</groupId>
-
<artifactId>spring-rabbit</artifactId>
-
<version>2.0.2.RELEASE</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-web</artifactId>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
</dependencies>
连接工具类:
-
package top.wj.rabbitmq.client.utils;
-
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
import java.util.HashMap;
-
import java.util.Map;
-
-
public class ChannelUtils {
-
public static Channel getChannelInstance(String connectionDescription) {
-
try {
-
ConnectionFactory connectionFactory = getConnectionFactory();
-
Connection connection = connectionFactory.newConnection(connectionDescription);
-
return connection.createChannel();
-
} catch (Exception e) {
-
throw new RuntimeException("获取Channel连接失败");
-
}
-
}
-
-
private static ConnectionFactory getConnectionFactory() {
-
ConnectionFactory connectionFactory = new ConnectionFactory();
-
-
// 配置连接信息
-
connectionFactory.setHost("127.0.0.1");
-
connectionFactory.setPort(5672);
-
connectionFactory.setVirtualHost("/");
-
connectionFactory.setUsername("guest");
-
connectionFactory.setPassword("guest");
-
-
// 网络异常自动连接恢复
-
connectionFactory.setAutomaticRecoveryEnabled(true);
-
// 每10秒尝试重试连接一次
-
connectionFactory.setNetworkRecoveryInterval(10000);
-
-
return connectionFactory;
-
}
-
}
创建生产者:
-
package top.wj.rabbitmq.client.producer;
-
-
import com.rabbitmq.client.AMQP;
-
import com.rabbitmq.client.BuiltinExchangeType;
-
import com.rabbitmq.client.Channel;
-
import top.wj.rabbitmq.client.utils.ChannelUtils;
-
-
import java.io.IOException;
-
import java.util.HashMap;
-
import java.util.concurrent.TimeoutException;
-
-
public class MessageProducer {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
Channel channel = ChannelUtils.getChannelInstance("队列消息生产者");
-
-
// 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
-
channel.exchangeDeclare("rabbitmq.wj", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
-
-
// 设置消息属性 发布消息 (交换机名, Routing key, 可靠消息相关属性 后续会介绍, 消息属性, 消息体);
-
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
-
channel.basicPublish("rabbitmq.wj", "add", false, basicProperties, "body中的消息内容!".getBytes());
-
}
-
}
创建消费者:
-
package top.wj.rabbitmq.client.consumer;
-
-
import com.rabbitmq.client.*;
-
import top.wj.rabbitmq.client.utils.ChannelUtils;
-
-
import java.io.IOException;
-
import java.util.HashMap;
-
import java.util.concurrent.TimeoutException;
-
-
public class MessageConsumer {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
Channel channel = ChannelUtils.getChannelInstance("队列消息消费者");
-
-
// 声明队列 (队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性);
-
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("rabbitmq.wj.add", true, false, false, new HashMap<>());
-
-
// 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
-
channel.exchangeDeclare("rabbitmq.wj", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
-
-
// 将队列Binding到交换机上 (队列名, 交换机名, Routing key, 绑定属性);
-
channel.queueBind(declareOk.getQueue(), "rabbitmq.wj", "add", new HashMap<>());
-
-
// 消费者订阅消息 监听如上声明的队列 (队列名, 是否自动应答(与消息可靠有关 后续会介绍), 消费者标签, 消费者)
-
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println(consumerTag);
-
System.out.println(envelope.toString());
-
System.out.println(properties.toString());
-
System.out.println("消息内容:" + new String(body));
-
}
-
};
-
channel.basicConsume(declareOk.getQueue(), true, "消费者标签",defaultConsumer );
-
}
-
}
控制台打印信息:
-
-
消费者标签
-
Envelope(deliveryTag=1, redeliver=false, exchange=rabbitmq.wj, routingKey=add)
-
#contentHeader<basic>(content-type=UTF-8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
-
消息内容:body中的消息内容!