代码整合
- maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
- yml配置
server:
port: 8021
spring:
#配置rabbitMq 服务器
rabbitmq:
host: 192.168.100.120
port: 5672
username: test
password: test
#Vhost
virtual-host: testRabbit
- 配置DirectConfig
@Configuration
public class DirectConfig {
/**
* 交换机
*/
public static final String DESTINATION_NAME = "rabbitMq_direct";
/**
* 队列名称
*/
public static final String SMS_QUEUE = "Sms_msg";
public static final String EMAIL_QUEUE = "Email_msg";
//RouteKey
public static final String SMS_ROUTINGKEY = "sms";
public static final String EMAIL_ROUTINGKEY = "email";
//配置队列
@Bean
public Queue smsDirectQueue() {
return new Queue(SMS_QUEUE, true);
}
@Bean
public Queue emailDirectQueue() {
return new Queue(EMAIL_QUEUE, true);
}
//配置交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DESTINATION_NAME);
}
//交换机与队列绑定
@Bean
Binding smsBindingDirect() {
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with(SMS_ROUTINGKEY);
}
@Bean
Binding emailBindingDirect() {
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with(EMAIL_ROUTINGKEY);
}
}
- 发送方配置
@RestController
@Slf4j
public class DirectSendController {
@Autowired
private RabbitTemplate template;
@GetMapping("/sendSms")
private String sendSms(@RequestParam("msg") String message) throws Exception {
User user = new User();
user.setId(UUID.randomUUID().toString().replace("-", ""));
user.setPassword("sendSms");
user.setUsername("sendSms");
user.setMsg(message);
String userJson = JSON.toJSONString(user);
log.info("sendSms:{}", userJson);
//发送的时候需要指定队列 指定的交换机,指定的ROUTINGKEY
template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.SMS_ROUTINGKEY, userJson);
return "OK,sendSms:" + message;
}
@GetMapping("/sendEmail")
private String sendEmail(@RequestParam("msg") String message) throws Exception {
User user = new User();
user.setId(UUID.randomUUID().toString().replace("-", ""));
user.setPassword("sendEmail");
user.setUsername("sendEmail");
user.setMsg(message);
String userJson = JSON.toJSONString(user);
log.info("sendEmail:{}", userJson);
//发送的时候需要指定队列 指定的交换机,指定的ROUTINGKEY
template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.EMAIL_ROUTINGKEY, user);
return "OK,sendEmail:" + message;
}
}
- 消费者
@Component
@Slf4j
public class DirectConsumer {
@RabbitListener(queues = DirectConfig.SMS_QUEUE)
public void sms_msg(Message message) throws IOException {
System.out.println("sms_msg消费者收到消息 : "+new String(message.getBody(),"UTF-8"));
}
@RabbitListener(queues = DirectConfig.EMAIL_QUEUE)
public void email_msg(User user) {
System.out.println("email_msg消费者收到消息 : "+JSON.toJSONString(user));
}
}
其他的几种方式都类似。
消息的手动签收、消息退回、消息的回调
- yml增加
# 是否开启消息确认机制
publisher-confirms: true
# 开启消息发送到队列失败返回
publisher-returns: true
- 增加RabbitMq配置
@Configuration
public class RabbitMqConfig {
/**
* 使用SimpleMessageListenerContainer容器设置消费队列监听
* 不使用@RabbitListener注解
*/
// @Bean
// public SimpleMessageListenerContainer simpleMessageListenerContainer() {
// SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
// simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
// simpleMessageListenerContainer.setMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message) {
//
// }
// });
// //simpleMessageListenerContainer.setQueueNames("","");
// //impleMessageListenerContainer.addQueueNames();
// //手动确认
// simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// return simpleMessageListenerContainer;
// }
/**
* @return
* @RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
//这个connectionFactory就是我们自己配置的连接工厂直接注入进来
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//这边设置消息确认方式由自动确认变为手动确认
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置消息预取数量
// simpleRabbitListenerContainerFactory.setPrefetchCount(1);
return simpleRabbitListenerContainerFactory;
}
//如果是单例的
/**
* 每个rabbitTemplate方法只可以有一个回调,不然会报错 only one ConfirmCallback is supported by each RabbitTemplate,解决办法是配成多利的
*
* @param connectionFactory
* @return
*/
@Bean
// @Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//成功回调
template.setConfirmCallback(new Callback());
// 开启mandatory模式(开启失败回调)
template.setMandatory(true);
//失败回调
template.setReturnCallback(new Callback());
return template;
}
- 增加回调类
public class Callback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback: " + "确认情况:" + ack);
System.out.println("ConfirmCallback: " + "原因:" + cause);
if (ack) {
System.out.println("消息发送确认成功");
} else {
System.out.println("消息发送确认失败");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//失败的回调
try {
System.out.println("ReturnCallback: " + "消息:" + new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
}
}
- 发送方增加一个CorrelationData
每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
CorrelationData correlationData = new CorrelationData(id);
template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.SMS_ROUTINGKEY, userJson,correlationData);
- 消费方
@RabbitListener(queues = DirectConfig.SMS_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
public void sms_msg(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
System.out.println("sms_msg消费者收到消息 : " + new String(message.getBody(), "UTF-8"));
/**
* 手动ack
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//消息退回 (可以在可视化界面看到)
//批量退回 退回之后重回消息队列 true false的话就是丢弃这条信息,如果配置了死信队列,那这条消息会进入死信队列
channel.basicNack(deliveryTag, false, true);
//单条退回 channel.basicReject();
}
}