一.pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
二.生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//通过Channel发送数据
channel.basicPublish("","hello",null,"hello world".getBytes());
//关闭连接
channel.close();
connection.close();
}
}
这里注意channel.basicPublish方法的第一个参数(exchange)和第二个参数(routingKey),如果没有指定exchange,指定了routingKey,而routingKey与消费者类中指定的queue的name相同,会通过RabbitMQ的默认Exchange(AMQP default)进行路由。

三.消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//创建一个队列
String queueName = "hello";
channel.queueDeclare(queueName,true,false,false,null);
//创建一个消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置Channel
channel.basicConsume(queueName,true,consumer);
//获取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:"+msg);
}
}
}
四.运行
先运行消费者,再运行生产者。
在消费者的控制台可以接收到生产者发送的消息。

五.为消息定义属性
1.在生产者这边:
先设置properties:
//自定义属性
Map<String, Object> headers = new HashMap<>();
headers.put("w","123");
headers.put("w2","456");
//为消息定义属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //为1时,则不持久化,为2持久化消息
.contentEncoding("UTF-8")
.expiration("100000")//过期时间
.headers(headers)//自定义属性
.build();
再将properties作为参数,传到basicPublish方法中
channel.basicPublish("","hello",properties,"hello world".getBytes());
2.消费者
消费者这边,需要通过Delivery获取properties的相关属性。
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
Map<String, Object> headers = delivery.getProperties().getHeaders();
headers.forEach((x,y)-> System.out.println(x+":"+y));
System.out.println("消费端:"+msg);
}
打印结果:

六.自定义消费者
可以继承DefaultConsumer类,实现handleDelivery方法
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println(envelope);
System.out.println(properties);
System.out.println(new String(body));
}
}
在消费端创建消费者的时候,就可以使用自定义的消费者。
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//创建一个队列
String queueName = "myConsumer";
channel.queueDeclare(queueName,true,false,false,null);
//创建一个消费者
//QueueingConsumer consumer = new QueueingConsumer(channel);
MyConsumer consumer = new MyConsumer(channel);
//设置Channel
channel.basicConsume(queueName,true,consumer);
运行结果:
