zoukankan      html  css  js  c++  java
  • RabbitMQ整合Spring Booot【消费者补偿幂等问题】

    如果消费者 运行时候 报错了

    package com.toov5.msg.SMS;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component 
    @RabbitListener(queues="fanout_sms_queue")   
    public class SMSConsumer {
        
        @RabbitHandler  
       public void process(String mString) {
           System.out.println("短信消费者获取生产者消息msg"+mString);
           int i = 1/0;
       }
    }

    当生产者投递消息后:

    消费者会不停的进行打印:

     消息一直没有被消费

     

    原因 Rabbitmq 默认情况下 如果消费者程序出现异常情况 会自动实现补偿机制  也就是 重试机制

    @RabbitListener底层使用AOP进行拦截,如果程序没有抛出异常,自动提交事务。 如果Aop使用异常通知 拦截获取异常信息的话 , 自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器端进行重放,一直重试到不抛出异常为准。

     可以修改重试策略

     一般来说默认5s重试一次,

    消费者配置:

     listener:
          simple:
            retry:
            ####开启消费者重试
              enabled: true
             ####最大重试次数(默认无数次)
              max-attempts: 5
            ####重试间隔次数
              initial-interval: 3000

    效果: 充实5次 不行就放弃了

     

     MQ重试机制机制 需要注意的问题

      

    如何合适选择重试机制

    情况1:  消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试?    

              需要重试   别人的问题不是我自己的问题

    情况2:  消费者获取到消息后,抛出数据转换异常,是否需要重试?   

           不需要重试   充实一亿次也是如此 木有必要  需要发布版本解决

    总结:

    •            对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用 日志记录+定时任务job健康检查+人工进行补偿
    •            把错误记录在日志里面,通过定时Job去自动的补偿,或通过人工去补偿。 

     传统的HTTP请求 如果失败了没法自动重试 ,当然自己可以写个循环实现。MQ完全自己自带的。

    情况2的拓展延申:

        将之前的案例改为   邮件消费者 调用邮件第三方接口 

    伪代码:

        在consumer 中 调用接口后 判断返回值  由于RabbitMQ 在消费者异常时候 会进行重试机制 进行补偿 

        所以可以抛出个异常 来实现

       

    Consumer:

                  String result   =   template.Email();

                 if(result == null){

                       throw new Exception("调用第三方邮件服务器接口失败!");

                   }

    producer:

     

    pom:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.itmayiedu</groupId>
        <artifactId>rabbitmq_producer_springboot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.RELEASE</version>
        </parent>
        <dependencies>
    
            <!-- springboot-web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <!--fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
        </dependencies>
    </project>

    config:

    package com.itmayiedu.rabbitmq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    //Fanout 类型 发布订阅模式
    @Component
    public class FanoutConfig {
    
        // 邮件队列
        private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
    
        // 短信队列
        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
        // fanout 交换机
        private String EXCHANGE_NAME = "fanoutExchange";
    
        // 1.定义邮件队列
        @Bean
        public Queue fanOutEamilQueue() {
            return new Queue(FANOUT_EMAIL_QUEUE);
        }
    
        // 2.定义短信队列
        @Bean
        public Queue fanOutSmsQueue() {
            return new Queue(FANOUT_SMS_QUEUE);
        }
    
        // 2.定义交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_NAME);
        }
    
        // 3.队列与交换机绑定邮件队列
        @Bean
        Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
        }
    
        // 4.队列与交换机绑定短信队列
        @Bean
        Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
        }
    }

     Producer:

    package com.itmayiedu.rabbitmq;
    
    import java.util.UUID;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    
    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(String queueName) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("email", "xx@163.com");
            jsonObject.put("timestamp", System.currentTimeMillis());
            String jsonString = jsonObject.toJSONString();
            System.out.println("jsonString:" + jsonString);
            // 设置消息唯一id 保证每次重试消息id唯一
            /*Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").build();*/
            amqpTemplate.convertAndSend(queueName, jsonString);
        }
    }

    Controller:

    package com.itmayiedu.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.itmayiedu.rabbitmq.FanoutProducer;
    
    @RestController
    public class ProducerController {
        @Autowired
        private FanoutProducer fanoutProducer;
    
        @RequestMapping("/sendFanout")
        public String sendFanout(String queueName) {
            fanoutProducer.send(queueName);
            return "success";
        }
    }

    yml:

    spring:
      rabbitmq:
      ####连接地址
        host: 192.168.91.6
       ####端口号   
        port: 5672
       ####账号 
        username: admin
       ####密码  
        password: admin
       ### 地址
        virtual-host: /admin_toov5

    启动类:

    package com.itmayiedu;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class AppProducer {
    
        public static void main(String[] args) {
            SpringApplication.run(AppProducer.class, args);
        }
    
    }

    Consumer:

    pom:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.itmayiedu</groupId>
        <artifactId>rabbitmq_consumer_springboot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.RELEASE</version>
        </parent>
        <dependencies>
    
            <!-- springboot-web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-mail</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <!--fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
    
        </dependencies>
    </project>

     utils:

    package com.itmayiedu.rabbitmq.utils;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.http.HttpEntity;
    import org.apache.http.HttpStatus;
    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.entity.StringEntity;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.http.util.EntityUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * HttpClient4.3工具类
     * 
     * @author hang.luo
     */
    public class HttpClientUtils {
        private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志记录
    
        private static RequestConfig requestConfig = null;
    
        static {
            // 设置请求和传输超时时间
            requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
        }
    
        /**
         * post请求传输json参数
         * 
         * @param url
         *            url地址
         * @param json
         *            参数
         * @return
         */
        public static JSONObject httpPost(String url, JSONObject jsonParam) {
            // post请求返回结果
            CloseableHttpClient httpClient = HttpClients.createDefault();
            JSONObject jsonResult = null;
            HttpPost httpPost = new HttpPost(url);
            // 设置请求和传输超时时间
            httpPost.setConfig(requestConfig);
            try {
                if (null != jsonParam) {
                    // 解决中文乱码问题
                    StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
                    entity.setContentEncoding("UTF-8");
                    entity.setContentType("application/json");
                    httpPost.setEntity(entity);
                }
                CloseableHttpResponse result = httpClient.execute(httpPost);
                // 请求发送成功,并得到响应
                if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                    String str = "";
                    try {
                        // 读取服务器返回过来的json字符串数据
                        str = EntityUtils.toString(result.getEntity(), "utf-8");
                        // 把json字符串转换成json对象
                        jsonResult = JSONObject.parseObject(str);
                    } catch (Exception e) {
                        logger.error("post请求提交失败:" + url, e);
                    }
                }
            } catch (IOException e) {
                logger.error("post请求提交失败:" + url, e);
            } finally {
                httpPost.releaseConnection();
            }
            return jsonResult;
        }
    
        /**
         * post请求传输String参数 例如:name=Jack&sex=1&type=2
         * Content-type:application/x-www-form-urlencoded
         * 
         * @param url
         *            url地址
         * @param strParam
         *            参数
         * @return
         */
        public static JSONObject httpPost(String url, String strParam) {
            // post请求返回结果
            CloseableHttpClient httpClient = HttpClients.createDefault();
            JSONObject jsonResult = null;
            HttpPost httpPost = new HttpPost(url);
            httpPost.setConfig(requestConfig);
            try {
                if (null != strParam) {
                    // 解决中文乱码问题
                    StringEntity entity = new StringEntity(strParam, "utf-8");
                    entity.setContentEncoding("UTF-8");
                    entity.setContentType("application/x-www-form-urlencoded");
                    httpPost.setEntity(entity);
                }
                CloseableHttpResponse result = httpClient.execute(httpPost);
                // 请求发送成功,并得到响应
                if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                    String str = "";
                    try {
                        // 读取服务器返回过来的json字符串数据
                        str = EntityUtils.toString(result.getEntity(), "utf-8");
                        // 把json字符串转换成json对象
                        jsonResult = JSONObject.parseObject(str);
                    } catch (Exception e) {
                        logger.error("post请求提交失败:" + url, e);
                    }
                }
            } catch (IOException e) {
                logger.error("post请求提交失败:" + url, e);
            } finally {
                httpPost.releaseConnection();
            }
            return jsonResult;
        }
    
        /**
         * 发送get请求
         * 
         * @param url
         *            路径
         * @return
         */
        public static JSONObject httpGet(String url) {
            // get请求返回结果
            JSONObject jsonResult = null;
            CloseableHttpClient client = HttpClients.createDefault();
            // 发送get请求
            HttpGet request = new HttpGet(url);
            request.setConfig(requestConfig);
            try {
                CloseableHttpResponse response = client.execute(request);
    
                // 请求发送成功,并得到响应
                if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                    // 读取服务器返回过来的json字符串数据
                    HttpEntity entity = response.getEntity();
                    String strResult = EntityUtils.toString(entity, "utf-8");
                    // 把json字符串转换成json对象
                    jsonResult = JSONObject.parseObject(strResult);
                } else {
                    logger.error("get请求提交失败:" + url);
                }
            } catch (IOException e) {
                logger.error("get请求提交失败:" + url, e);
            } finally {
                request.releaseConnection();
            }
            return jsonResult;
        }
    
    }

    consumer:

    package com.itmayiedu.rabbitmq;
    
    import java.util.Map;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    import com.itmayiedu.rabbitmq.utils.HttpClientUtils;
    import com.rabbitmq.client.Channel;
    
    //邮件队列
    @Component
    public class FanoutEamilConsumer {
         @RabbitListener(queues = "fanout_email_queue")
         public void process(String msg) throws Exception {
         System.out.println("邮件消费者获取生产者消息msg:" + msg);
         JSONObject jsonObject = JSONObject.parseObject(msg);
         // 获取email参数
         String email = jsonObject.getString("email");
         // 请求地址
         String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
         JSONObject result = HttpClientUtils.httpGet(emailUrl);
         if (result == null) {
         // 因为网络原因,造成无法访问,继续重试
         throw new Exception("调用接口失败!");
         }
         System.out.println("执行结束....");
        
         }
    }

    yml:

    spring:
      rabbitmq:
      ####连接地址
        host: 192.168.91.6
       ####端口号   
        port: 5672
       ####账号 
        username: admin
       ####密码  
        password: admin
       ### 地址
        virtual-host: /admin_toov5
        listener: 
          simple:
            retry:
            ####开启消费者异常重试
              enabled: true
             ####最大重试次数
              max-attempts: 5
            ####重试间隔次数
              initial-interval: 2000
           
    
    server:
      port: 8081

    启动类:

    package com.itmayiedu.rabbitmq;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class AppConsumer {
    
        public static void main(String[] args) {
            SpringApplication.run(AppConsumer.class, args);
        }
    
    }

     邮件服务器:

    package com.mayikt.controller;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @RestController
    public class MsgController {
    
        // 模拟第三方发送邮件
        @RequestMapping("/sendEmail")
        public Map<String, Object> sendEmail(String email) {
            System.out.println("开始发送邮件:" + email);
            Map<String, Object> result = new HashMap<String, Object>();
            result.put("code", "200");
            result.put("msg", "发送邮件成功..");
            System.out.println("发送邮件成功");
            return result;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(MsgController.class, args);
        }
    
    }

    yml:

    server:
      port: 8083

    pom:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.mayikt</groupId>
        <artifactId>mayikt_sms</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.RELEASE</version>
        </parent>
        <dependencies>
    
            <!-- springboot-web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-mail</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <!--fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
    
        </dependencies>
    </project>

    在没有启动邮件服务器时候,消费者调用接口失败会一直重试,重试五次。

    在此期间,如果启动成功,则重试成功,不再重试, 不再进行补偿机制。

    消费者如果保证消息幂等性,不被重复消费

    背景:

    网络延迟传输中,或者消费出现异常或者是消费延迟,会造成进行MQ重试进行重试补偿机制,在重试过程中,可能会造成重复消费。 

    解决办法:

    使用全局MessageID判断消费方使用同一个,解决幂等性。

      

       只要重试过程中,判断如果已经走完了 不能再继续走 继续执行了

       MQ消费者的幂等行的解决 一般使用全局ID  或者写个唯一标识比如时间戳   或者UUID  或者订单号

       

      改进:

      producer:

     添加:

    // 设置消息唯一id 保证每次重试消息id唯一  
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID 
            amqpTemplate.convertAndSend(queueName, message);

     全部代码:

      

    package com.itmayiedu.rabbitmq;
    
    import java.util.UUID;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    
    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(String queueName) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("email", "xx@163.com");
            jsonObject.put("timestamp", System.currentTimeMillis());
            String jsonString = jsonObject.toJSONString();
            System.out.println("jsonString:" + jsonString);
            // 设置消息唯一id 保证每次重试消息id唯一  
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID 
            amqpTemplate.convertAndSend(queueName, message);
        }
    }

     同样的 消费者也需要修改:

     方法参数类型为 Message  然后可以获取这个ID 然后可以进行业务逻辑操作

     @RabbitListener(queues = "fanout_email_queue")
         public void process(Message message) throws Exception {
         // 获取消息Id
         String messageId = message.getMessageProperties().getMessageId();  //id获取之
         String msg = new String(message.getBody(), "UTF-8"); //消息内容获取之
         System.out.println("-----邮件消费者获取生产者消息-----------------" + "messageId:" + messageId + ",消息内容:" +
         msg);
         if (messageId == null) {
                return;
            }
         JSONObject jsonObject = JSONObject.parseObject(msg);
         // 获取email参数
         String email = jsonObject.getString("email");
         // 请求地址
         String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
         JSONObject result = HttpClientUtils.httpGet(emailUrl);
         if (result == null) {
         // 因为网络原因,造成无法访问,继续重试
         throw new Exception("调用接口失败!");
         }
         System.out.println("执行结束....");
         //messId 的情况写入到redis 中  成功就修改为空
         }

     重试机制都是间隔性的  每次都是一个线程  单线程重试

     关于应答模式:

        Spring boot 中进行 AOP拦截 自动帮助做重试

        手动应答的话 ,如果不告诉服务器已经消费成功,则服务器不会删除 消息。告诉消费成功了才会删除。

        

    消费者的yml加入:

     acknowledge-mode: manual 

        

    spring:
      rabbitmq:
      ####连接地址
        host: 192.168.91.6
       ####端口号   
        port: 5672
       ####账号 
        username: admin
       ####密码  
        password: admin
       ### 地址
        virtual-host: /admin_toov5
        listener: 
          simple:
            retry:
            ####开启消费者异常重试
              enabled: true
             ####最大重试次数
              max-attempts: 5
            ####重试间隔次数
              initial-interval: 2000
            ####开启手动ack  
            acknowledge-mode: manual 
    
    server:
      port: 8081

     开启模式之后:

       消费者参数需要加入:  @Headers Map<String, Object> headers, Channel channel

      代码逻辑最后面加入:

    // // 手动ack
    Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
    // 手动签收  告诉RabbitMQ 消费成功了  消息可以删除了
    channel.basicAck(deliveryTag, false);  

    代码如下:

    @RabbitListener(queues = "fanout_email_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);
            JSONObject jsonObject = JSONObject.parseObject(msg);
            // 获取email参数
            String email = jsonObject.getString("email");
            // 请求地址
            String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
            JSONObject result = HttpClientUtils.httpGet(emailUrl);
            if (result == null) {
                // 因为网络原因,造成无法访问,继续重试
                throw new Exception("调用接口失败!");
            }
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手动签收
            channel.basicAck(deliveryTag, false);
            System.out.println("执行结束....");
        }
  • 相关阅读:
    TCP心跳 | TCP keepAlive(转)
    闲说HeartBeat心跳包和TCP协议的KeepAlive机制
    一个DNS统计,RCFs,工具站点
    JMX
    【转】如何实现一个配置中心
    用Netty开发中间件:高并发性能优化
    UDP server & client
    DNS缓存
    C正则库做DNS域名验证时的性能对比
    DNS压力测试工具dnsperf简介
  • 原文地址:https://www.cnblogs.com/toov5/p/10287183.html
Copyright © 2011-2022 走看看