zoukankan      html  css  js  c++  java
  • springboot整合rabbitmq,支持消息确认机制

     安装

    推荐一篇博客 https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

    项目结构

    POM.XML

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     4     <modelVersion>4.0.0</modelVersion>
     5 
     6     <groupId>com.example</groupId>
     7     <artifactId>rabbitmq</artifactId>
     8     <version>0.0.1-SNAPSHOT</version>
     9     <packaging>jar</packaging>
    10 
    11     <name>rabbitmq</name>
    12     <description>Spring Boot 整合RabbitMQ</description>
    13 
    14     <parent>
    15         <groupId>org.springframework.boot</groupId>
    16         <artifactId>spring-boot-starter-parent</artifactId>
    17         <version>2.0.5.RELEASE</version>
    18         <relativePath/> <!-- lookup parent from repository -->
    19     </parent>
    20 
    21     <properties>
    22         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    23         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    24         <java.version>1.8</java.version>
    25     </properties>
    26 
    27     <dependencies>
    28         <dependency>
    29             <groupId>org.springframework.boot</groupId>
    30             <artifactId>spring-boot-starter</artifactId>
    31         </dependency>
    32 
    33         <!-- rabbitmq -->
    34         <dependency>
    35             <groupId>org.springframework.boot</groupId>
    36             <artifactId>spring-boot-starter-amqp</artifactId>
    37         </dependency>
    38 
    39         <dependency>
    40             <groupId>org.springframework.boot</groupId>
    41             <artifactId>spring-boot-starter-test</artifactId>
    42             <scope>test</scope>
    43         </dependency>
    44     </dependencies>
    45 
    46     <build>
    47         <plugins>
    48             <plugin>
    49                 <groupId>org.springframework.boot</groupId>
    50                 <artifactId>spring-boot-maven-plugin</artifactId>
    51             </plugin>
    52         </plugins>
    53     </build>
    54 
    55 
    56 </project>
    POM.XML

    application.yml

    需要将publisher-confrems设为true,启动确认回调, 将 publisher-returns设为true 确认返回回调

    rabbitmq配置类--RabbitConfig

    第一部分, 定义队列

    第二部分,设置一些消息处理策略

     1 package com.example.rabbitmq;
     2 
     3 import org.slf4j.Logger;
     4 import org.slf4j.LoggerFactory;
     5 import org.springframework.amqp.core.Queue;
     6 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     7 import org.springframework.context.annotation.Bean;
     8 import org.springframework.context.annotation.Configuration;
     9 
    10 import javax.annotation.Resource;
    11 
    12 /**
    13  * rabbitMq 配置类
    14  * @author milicool
    15  * Created on 2018/9/14
    16  */
    17 @Configuration
    18 public class RabbitConfig {
    19     @Resource
    20     private RabbitTemplate rabbitTemplate;
    21 
    22     /**
    23      * 定义一个hello的队列
    24      * Queue 可以有4个参数
    25      *      1.队列名
    26      *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
    27      *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
    28      *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
    29      */
    30     @Bean
    31     public Queue helloQueue() {
    32         return new Queue("queue-test");
    33     }
    34 
    35     /** ======================== 定制一些处理策略 =============================*/
    36 
    37     /**
    38      * 定制化amqp模版
    39      *
    40      * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调   即消息发送到exchange  ack
    41      * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
    42      */
    43     @Bean
    44     public RabbitTemplate rabbitTemplate() {
    45         Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
    46 
    47         // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
    48         rabbitTemplate.setMandatory(true);
    49 
    50         // 消息返回, yml需要配置 publisher-returns: true
    51         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    52             String correlationId = message.getMessageProperties().getCorrelationIdString();
    53             log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
    54         });
    55 
    56         // 消息确认, yml需要配置 publisher-confirms: true
    57         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    58             if (ack) {
    59                 // log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
    60             } else {
    61                 log.debug("消息发送到exchange失败,原因: {}", cause);
    62             }
    63         });
    64 
    65         return rabbitTemplate;
    66     }
    67 }
    配置类

    生产者

     1 /**
     2  * 生产者
     3  * @author milicool
     4  * Created on 2018/9/14
     5  */
     6 @Component
     7 public class Producer {
     8 
     9     @Autowired
    10     private RabbitTemplate rabbitTemplate;
    11 
    12     /**
    13      * 给hello队列发送消息
    14      */
    15     public void send() {
    16         for (int i =0; i< 100; i++) {
    17             String msg = "hello, 序号: " + i;
    18             System.out.println("Producer, " + msg);
    19             rabbitTemplate.convertAndSend("queue-test", msg);
    20         }
    21     }
    22 
    23 }

    消费者

     1 /**
     2  * 消费者
     3  * @author milicool
     4  * Created on 2018/9/14
     5  */
     6 @Component
     7 public class Comsumer {
     8     private Logger log = LoggerFactory.getLogger(Comsumer.class);
     9 
    10     @RabbitListener(queues = "queue-test")
    11     public void process(Message message, Channel channel) throws IOException {
    12         // 采用手动应答模式, 手动确认应答更为安全稳定
    13         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    14         log.info("receive: " + new String(message.getBody()));
    15     }
    16 }

    测试类

     1 @RunWith(SpringRunner.class)
     2 @SpringBootTest
     3 public class RabbitmqApplicationTests {
     4 
     5     @Autowired
     6     private Producer producer;
     7 
     8     @Test
     9     public void contextLoads() {
    10         producer.send();
    11     }
    12 
    13 }

    测试结果

    测试结果太长,没有截取全部,可以查看到消费者接收到了全部消息,如果有的消息在没有接收完,消息将被持久化,下次启动时消费

    web端查看

     感谢阅读  o(∩_∩)o

  • 相关阅读:
    openstack命令行
    Hub, bridge, switch, router, gateway的区别
    openstack奠基篇:devstack (liberty)于centos 7安装
    git常用命令备忘
    如何利用gatling创建一个性能测试例
    如何通过源码生成Gatling可执行工具
    博客园的第一篇博文
    Spring Oauth2 with JWT Sample
    你首先是一个人,然后你才是程序员。
    Maven——快速入门手册(学习记录) ****
  • 原文地址:https://www.cnblogs.com/milicool/p/9662447.html
Copyright © 2011-2022 走看看