一、创建生产者服务
1、创建生产者服务 rabbit-producer
spring boot版本为 2.1.16.RELEASE
2、pom.xml
引入spring-boot-starter-amqp
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3、配置application.properties
server.servlet.context-path=/ server.port=8002 # 集群用逗号分割 spring.rabbitmq.addresses=xx.xx.xx.xx:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # 使用启动消息确认模式 spring.rabbitmq.publisher-confirms=true #设置return消息模式,注意要与mandatory一起去配合使用 #spring.rabbitmq.publisher-returns=true #spring.rabbitmq.template.mandatory=true spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=non_null
4、发送
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 作为一个唯一的标识
* @param ack broker是否落盘成功
* @param cause 失败的异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息ACK结果:" + ack + ",correlationData: " +correlationData.getId()) ;
}
};
/**
* 对外发送消息的方法
* @param message 具体的消息内容
* @param properties 额外的附加属性
* @throws Exception
*/
public void send(Object message, Map<String,Object> properties) throws Exception{
MessageHeaders mhs = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
//指定业务唯一的ID
String uuid = UUID.randomUUID().toString();
System.out.println("生成业务唯一Id=" + uuid);
CorrelationData correlationData = new CorrelationData(uuid);
MessagePostProcessor mpp = new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
System.out.println("postProcessMessage message: " +message);
return message;
}
};
rabbitTemplate.convertAndSend("myexchange1", "myroutingkey.1", msg, mpp, correlationData);
}
}
5、测试发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitProducerApplicationTests {
@Autowired
private RabbitSender rabbitSender;
@Test
public void testSender() throws Exception {
Map<String,Object> properties = new HashMap<>();
properties.put("name","zhangsan");
properties.put("age","18");
rabbitSender.send("hello rabbit", properties);
Thread.sleep(10000);
}
}
二、RabbitMQ消费者服务
1、创建RabbitMQ消费者服务 rabbit-consumer
spring boot版本为 2.1.16.RELEASE
2、pom.xml
引入spring-boot-starter-amqp
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3、配置application.properties
server.servlet.context-path=/ server.port=8002 # 集群用逗号分割 spring.rabbitmq.addresses=xx.xx.xx.xx:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # 表示消费者消费成功消息以后需要手工的信息签收(ack),默认为auto spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.listener.simple.prefetch=1 # RabbitListener 相关配置 spring.rabbitmq.listener.order.exchange.name=myexchange1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.key=myroutingkey.* spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=non_null
4、创建接收者类
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @description: 消息接收者
* @author:
* @create: 2020-08-01 09:35
*/
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myqueue1", durable = "true"),
exchange = @Exchange(name = "${spring.rabbitmq.listener.order.exchange.name}",
durable= "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "true"),
key = "${spring.rabbitmq.listener.order.exchange.key}"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception{
// 步骤1、收到消息后进行业务端消费处理
System.out.println("消费消息" + message.getPayload());
//步骤2、处理成功后,获取deliveryTag,并进行手工ACK操作,因为配置文件配置的是手工签收模式
// spring.rabbitmq.listener.simple.acknowledge-mode=manual
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
RabbitListener相关属性配置在属性文件中。
消费者采用手工配置 channel.basicAck