zoukankan      html  css  js  c++  java
  • 使用rabbitmq手动确认消息的,定时获取队列消息实现

    描述问题

      最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互。

    相关业务

      本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送

    相关分析

      网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理、性能和效率等相关基础业务的工作量,望而却步。。。。。。

      还好spring有相关的 org.springframework.amqp 工具包,简化的大量麻烦>_> 让我们开始吧

      了解rabbit的相关几个概念

     了解了这几个概念的时候你可能已经关注到了我们今天的主题SimpleMessageListenerContainer

     我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写,通过SimpleRabbitListenerContainerFactory我们可以完成相关SimpleMessageListenerContainer容器的管理,

      但对于使用此容器批量消费的方式,官方并没有相关说明,网络上你可能只找到这篇SimpleMessageListenerContainer批量消息处理对于问题描述是很清晰,但是回答只是说的比较简单

      下面我们就对这个问题的答案来个coding

    解决办法

      首先我们因为需要失败重试,使用spring的RepublishMessageRecoverer可以解决这个问题,这显然有一个缺点,即将在整个重试期间占用线程。所以我们使用了死信队列

      相关配置

      1     @Bean
      2     ObjectMapper objectMapper() {
      3         ObjectMapper objectMapper = new ObjectMapper();
      4         DateFormat dateFormat = objectMapper.getDateFormat();
      5         JavaTimeModule javaTimeModule = new JavaTimeModule();
      6 
      7         SimpleModule module = new SimpleModule();
      8         module.addSerializer(new ToStringSerializer(Long.TYPE));
      9         module.addSerializer(new ToStringSerializer(Long.class));
     10         module.addSerializer(new ToStringSerializer(BigInteger.class));
     11 
     12         javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
     13         javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
     14         javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
     15 
     16         objectMapper.registerModule(module);
     17         objectMapper.registerModule(javaTimeModule);
     18         objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化扩展日期格式支持
     19         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
     20         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     21         return objectMapper;
     22 }
     23 
     24 
     25 
     26   @Bean
     27   RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
     28     return new RabbitAdmin(aConnectionFactory);
     29   }
     30 
     31   @Bean
     32   MessageConverter jacksonAmqpMessageConverter( ) {
     33     return new Jackson2JsonMessageConverter(objectMapper());
     34   }
     35 
     36 
     37   @Bean
     38   Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
     39     Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
     40     rabbitAdmin.declareQueue(queue);
     41     return queue;
     42   }
     43   @Bean
     44   Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
     45     Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
     46     rabbitAdmin.declareQueue(queue);
     47     return queue;
     48   }
     49   @Bean
     50   Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
     51     Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
     52     rabbitAdmin.declareQueue(queue);
     53     return queue;
     54   }
     55     /**
     56      * 声明一个交换机
     57      * @return
     58      */
     59   @Bean
     60   TopicExchange controlExchange () {
     61       return new TopicExchange(Exchanges.ExangeTOPIC);
     62   }
     63 
     64 
     65     /**
     66      * 延时重试队列
     67      */
     68     @Bean
     69     public Queue bcwPayControlRetryQueue() {
     70         Map<String, Object> arguments = new HashMap<>();
     71         arguments.put("x-message-ttl", 10 * 1000);
     72         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
     73 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
     74         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
     75         return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
     76     }
     77     /**
     78      * 延时重试队列
     79      */
     80     @Bean
     81     public Queue bcwPushControlRetryQueue() {
     82         Map<String, Object> arguments = new HashMap<>();
     83         arguments.put("x-message-ttl", 10 * 1000);
     84         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
     85 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
     86         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
     87         return new Queue("queue_bcw@push@retry", true, false, false, arguments);
     88     }
     89     /**
     90      * 延时重试队列
     91      */
     92     @Bean
     93     public Queue bcwPullControlRetryQueue() {
     94         Map<String, Object> arguments = new HashMap<>();
     95         arguments.put("x-message-ttl", 10 * 1000);
     96         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
     97 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
     98 //        arguments.put("x-dead-letter-routing-key", "queue_bcw");
     99         return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
    100     }
    101     @Bean
    102     public Binding  bcwPayControlRetryBinding() {
    103         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
    104     }
    105     @Bean
    106     public Binding  bcwPushControlRetryBinding() {
    107         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
    108     }
    109     @Bean
    110     public Binding   bcwPullControlRetryBinding() {
    111         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
    112     }
    113 
    114   /**
    115    * 队列绑定并关联到RoutingKey
    116    *
    117    * @param queueMessages 队列名称
    118    * @param exchange      交换机
    119    * @return 绑定
    120    */
    121   @Bean
    122   Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
    123     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
    124   }
    125   /**
    126    * 队列绑定并关联到RoutingKey
    127    *
    128    * @param queueMessages 队列名称
    129    * @param exchange      交换机
    130    * @return 绑定
    131    */
    132   @Bean
    133   Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
    134     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
    135   }
    136   /**
    137    * 队列绑定并关联到RoutingKey
    138    *
    139    * @param queueMessages 队列名称
    140    * @param exchange      交换机
    141    * @return 绑定
    142    */
    143   @Bean
    144   Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
    145     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
    146   }
    147 
    148   @Bean
    149   @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
    150   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
    151           SimpleRabbitListenerContainerFactoryConfigurer configurer,
    152           ConnectionFactory connectionFactory) {
    153     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    154     configurer.configure(factory, connectionFactory);
    155     factory.setMessageConverter(jacksonAmqpMessageConverter());
    156     return factory;
    157   }

    下面就是我们的主题,定时任务使用的是org.springframework.scheduling

      1 /**
      2  * 手动确认消息的,定时获取队列消息实现
      3  */
      4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
      5     protected final Logger logger = LoggerFactory.getLogger(getClass());
      6     private List<Message> body = new LinkedList<>();
      7     public long start_time;
      8     private Channel channel;
      9     @Autowired
     10     private ObjectMapper objectMapper;
     11     @Autowired
     12     private RabbitTemplate rabbitTemplate;
     13 
     14     public QuartzSimpleMessageListenerContainer() {
     15         // 手动确认
     16         this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
     17 
     18         this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
     19             long current_time = System.currentTimeMillis();
     20             int time = (int) ((current_time - start_time)/1000);
     21             logger.info("====接收到{}队列的消息=====",message.getMessageProperties().getConsumerQueue());
     22             Long retryCount = getRetryCount(message.getMessageProperties());
     23             if (retryCount > 3) {
     24                 logger.info("====此消息失败超过三次{}从队列的消息删除=====",message.getMessageProperties().getConsumerQueue());
     25                 try {
     26                     channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
     27                 } catch (IOException ex) {
     28                     ex.printStackTrace();
     29                 }
     30                 return;
     31             }
     32 
     33             this.body.add(message);
     34             /**
     35              * 判断数组数据是否满了,判断此监听器时间是否大于执行时间
     36              * 如果在最后延时时间段内没有业务消息,此监听器会一直开着
     37              */
     38             if(body.size()>=3 || time>60){
     39                 this.channel = channel;
     40                 callback();
     41             }
     42         });
     43 
     44 
     45 
     46     }
     47     private void callback(){
     48 //         channel = getChannel(getTransactionalResourceHolder());
     49         if(body.size()>0 && channel !=null &&  channel.isOpen()){
     50             try {
     51                 callbackWork();
     52             }catch (Exception e){
     53                 logger.error("推送数据出错:{}",e.getMessage());
     54 
     55                 body.stream().forEach(message -> {
     56                     Long retryCount = getRetryCount(message.getMessageProperties());
     57                     if (retryCount <= 3) {
     58                         logger.info("将消息置入延时重试队列,重试次数:" + retryCount);
     59                         rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
     60                     }
     61                 });
     62 
     63             } finally{
     64 
     65                 logger.info("flsher too data");
     66 
     67                 body.stream().forEach(message -> {
     68                     //手动acknowledge
     69                     try {
     70                         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
     71                     } catch (IOException e) {
     72                         logger.error("手动确认消息失败!");
     73                         e.printStackTrace();
     74                     }
     75                 });
     76 
     77                 body.clear();
     78                 this.stop();
     79 
     80             }
     81         }
     82 
     83     }
     84     abstract void callbackWork() throws Exception;
     85     /**
     86      * 获取消息失败次数
     87      * @param properties
     88      * @return
     89      */
     90     private long getRetryCount(MessageProperties properties){
     91         long retryCount = 0L;
     92         Map<String,Object> header = properties.getHeaders();
     93         if(header != null && header.containsKey("x-death")){
     94             List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
     95             if(deaths.size()>0){
     96                 Map<String,Object> death = deaths.get(0);
     97                 retryCount = (Long)death.get("count");
     98             }
     99         }
    100         return retryCount;
    101     }
    102 
    103     @Override
    104     @Scheduled(cron = "0 0/2 * * * ? ")
    105     public void start() {
    106         logger.info("start push data scheduled!");
    107         //初始化数据,将未处理的调用stop方法,返还至rabbit
    108         body.clear();
    109         super.stop();
    110         start_time = System.currentTimeMillis();
    111         super.start();
    112 
    113         logger.info("end push data scheduled!");
    114     }
    115 
    116     public List<WDNJPullOrder> getBody() {
    117 
    118         List<WDNJPullOrder> collect = body.stream().map(data -> {
    119                     byte[] body = data.getBody();
    120                     WDNJPullOrder readValue = null;
    121                     try {
    122                         readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
    123                         });
    124                     } catch (IOException e) {
    125                         logger.error("处理数据出错{}",e.getMessage());
    126                     }
    127                     return readValue;
    128                 }
    129         ).collect(Collectors.toList());
    130 
    131         return collect;
    132 
    133 
    134     }
    135 
    136 }

    后续

    当然定时任务的启动,你可以写到相关rabbit容器实现的里面,但是这里并不是很需要,所以对于这个的小改动,同学你可以自己实现

     @Scheduled(cron = "0 0/2 * * * ? ")
    
    public void start()
    
    
  • 相关阅读:
    linux,windows kettle安装方法
    等待事件分类
    分析函数详细例子
    v$session中不同连接方式module,program的区别
    charles Glist发布设置
    charles 发布Glist
    charles 工具菜单总结
    charles 高级批量请求
    charles 批量重复请求/重复发包工具
    charles 重写工具/rewrite Srttings
  • 原文地址:https://www.cnblogs.com/dmeck/p/12207284.html
Copyright © 2011-2022 走看看