Config配置文件中,生产者和消费者都自己声明下需要的exchange和queue,否则如果服务器没有创建对应的信息,则自己启动会报错。
一、生产者属性文件配置
spring: rabbitmq: host: 192.168.112.131 port: 5672 username: admin password: admin publisher-confirms: true virtual-host: my_vhost publisher-returns: true
二、生产者配置类的声明
@Configuration @Slf4j public class RabbitmqConfig { /** * @param * @return * @throws Exception * @date 2020/10/9 * @version 2、durable 持久化消息队列,默认true * 3、auto-delete 消息队列没有在使用时自动删除,默认false * 4、exclusive 是否有排他性,就是是否只允许一个消费者消费,默认false */ @Bean(name = "message") public Queue queueMesage() { Map<String,Object> arguments = new HashMap<>(); //声明死信exchange 和路由,后面在讲死信队列绑定到这个路由上即可 arguments.put("x-dead-letter-exchange","tony.dead.ex"); arguments.put("x-dead-letter-routing-key","tony.dead.rk"); return new Queue("tony.queue", true, false, false,arguments); } //声明业务exchange @Bean(name = "exchange") public DirectExchange exchange() { return new DirectExchange("tony.exchange",true,false); } //声明死信队列-被拒绝的信息放到这个队列 @Bean(name = "deadqueueMesage") public Queue deadqueueMesage() { return new Queue("tony.dead.queue", true); } //声明死信exchange @Bean(name = "deadexchange") public DirectExchange deadexchange() { return new DirectExchange("tony.dead.ex",true,false); } //绑定队列和交换机 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, @Qualifier("exchange") DirectExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange). with("tony.routeKey"); } //绑定死信队列和死信交换机 @Bean Binding bindingDeadExchangeMessage(@Qualifier("deadqueueMesage") Queue queueMessage, @Qualifier("deadexchange") DirectExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange). with("tony.dead.rk"); } /* @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // rabbitTemplate.setMessageConverter(converter()); // 消息是否成功发送到Exchange rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange" + correlationData.getId()); } else { //重新发送到mq log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } }); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; }*/ }
三、消费者属性文件的配置
spring: rabbitmq: host: 192.168.112.131 port: 5672 username: admin password: admin publisher-confirms: true publisher-returns: true virtual-host: my_vhost listener: simple: acknowledge-mode: manual retry: enabled: true max-attempts: 3 initial-interval: 5000ms
或者
配置可以通过配置文件指定,也可以通过配置类指定: @Configuration public class RabbitConfig { @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(2); factory.setPrefetchCount(10); /* * AcknowledgeMode.NONE:自动确认 * AcknowledgeMode.AUTO:根据情况确认 * AcknowledgeMode.MANUAL:手动确认 */ factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
四、消费者消费消息
@Slf4j @Component public class MessageListener { @RabbitListener(queues = "tony.queue") public void process(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) { log.info(Thread.currentThread().getName() + "=============接收到消息=============" + message + "--" + deliveryTag); try { if ("tony18".equals(message)) { int i = 1 / 0; System.out.println( i ); } } catch (Exception e) { //丢死信队列 try { channel.basicNack(deliveryTag, false, false); } catch (IOException ioException) { ioException.printStackTrace(); } } try { channel.basicAck(deliveryTag, true); } catch (IOException e) { e.printStackTrace(); } } }
五、带有返回值的生产者和消费者
1)生产者
public class RabbitmqConfig { @Bean public Queue queueKillMessage() { return new Queue("tony.queue.back", true, false, false); } @Bean public DirectExchange queuekillExchangeBack() { return new DirectExchange("tony.exchange.back", true, false); } @Bean public Binding bindingSecKillExchangeMessage() { return BindingBuilder .bind(queueKillMessage()) .to(queuekillExchangeBack()) .with("tony.routeKey.back"); } }
2)发送消息
public String sendAndReceive(String msg1) { //设置消息唯一id CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //直接发送message对象 MessageProperties messageProperties = new MessageProperties(); //过期时间10秒,也是为了减少消息挤压的可能 // messageProperties.setExpiration("10000"); messageProperties.setCorrelationId(correlationId.getId()); Message message = new Message(msg1.getBytes(), messageProperties); log.info("TopicSender send the 1st : " + msg1); //设置消息唯一id Message message1 = amqpTemplate.sendAndReceive("tony.exchange.back", "tony.routeKey.back", message, correlationId); System.out.println("返回值:"+message1); if ( message1==null){ return null; }else{ return new String(message1.getBody()); } }
3)消费者
@Configuration public class RabbitConfig { @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(2); factory.setPrefetchCount(10); /* * AcknowledgeMode.NONE:自动确认 * AcknowledgeMode.AUTO:根据情况确认 * AcknowledgeMode.MANUAL:手动确认 */ factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public Queue queueKillMessage() { return new Queue("tony.queue.back", true, false, false); } @Bean public DirectExchange queuekillExchangeBack() { return new DirectExchange("tony.exchange.back", true, false); } @Bean public Binding bindingSecKillExchangeMessage() { return BindingBuilder .bind(queueKillMessage()) .to(queuekillExchangeBack()) .with("tony.routeKey.back"); } }
4)消费者监听器声明
@Component public class MessageListener { @RabbitListener(queues = "tony.queue.back"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/) @RabbitHandler // 此注解加上之后可以接受对象型消息 public String processCallback(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception { try { String msg = new String(message.getBody()); log.info("UserReceiver>>>>>>>接收到消息:" + msg); try { log.info("UserReceiver>>>>>>消息已消费"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条 return UUID.randomUUID().toString(); } catch (Exception e) { System.out.println(e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单 log.info("UserReceiver>>>>>>拒绝消息,直接忽略"); throw e; } } catch (Exception e) { log.info(e.getMessage()); } return ""; } }
六、带有确认机制的生产者
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses+":"+port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); /** 如果要进行消息回调,则这里必须要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setPublisherReturns(publisherReturns); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } @Bean public RabbitTemplate newRabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange,messageId:" + correlationData.getId()); //修改日志表状态,状态改成 1,投递成功且未确认 if(updateMessageLog(1,Long.valueOf(correlationData.getId())) == 1) { log.info("------modify status 1 ok--------"); } } else { log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } }); template.setMandatory(true); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); }); //不使用临时队列 // template.setUseTemporaryReplyQueues(false); // template.setReplyAddress("amq.rabbitmq.reply-to"); // template.setUserCorrelationId(true); // template.setReplyTimeout(10000); return template; }
七、官网参考