zoukankan      html  css  js  c++  java
  • redission的延迟队列

    先推荐3篇文章,讲解延迟队列的实现原理:

    基于redis的延迟队列

    redis的分布式延迟队列

    redission延迟队列的使用

    redission官方文档

    代码实现:

    @Service
    public class KafkaServiceImpl implements KafkaService, InitializingBean {
        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);
    
        @Resource
        private KafkaTemplate<String, String> stringKafkaTemplate;
        @Resource
        private RedissonClient redissonClient;
        private RBlockingDeque<String> rBlockingDeque;
        private RDelayedQueue<String> rDelayedQueue;
        
        @Override
        public void send(String topic,String key,String jsonObject) {
            stringKafkaTemplate.send(new ProducerRecord<>(topic,key, jsonObject));
            LOGGER.info("send to topic[{}], key[{}], jsonObject[{}]",topic, key, jsonObject);
        }
    
        @Override
        public void delaySend(String topic, String key, String jsonObject, Long delay, TimeUnit timeUnit) {
            KafkaInfo kafkaInfo = new KafkaInfo();
            kafkaInfo.setTopic(topic);
            kafkaInfo.setKey(key);
            kafkaInfo.setJsonObject(jsonObject);
            this.rDelayedQueue.offer(JSON.toJSONString(kafkaInfo), delay, timeUnit);
            LOGGER.info("send delay [{}], timeUnit[{}] to topic[{}], key[{}], jsonObject[{}]",delay , timeUnit, topic, key,
                    jsonObject);
        }
    
        @Override
        public void afterPropertiesSet() {
            this.rBlockingDeque = redissonClient.getBlockingDeque(KAFKA_DELAY_QUEUE);
            if (this.rBlockingDeque == null) {
                return;
            }
            this.rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque);
            if (this.rDelayedQueue == null) {
                return;
            }
            this.startConsumerDelayQueue();
        }
    
        private static class KafkaInfo implements Serializable {
            private static final long serialVersionUID = -5517223779255526862L;
    
            private String topic;
            private String key;
            private String jsonObject;
    
            public String getTopic() {
                return topic;
            }
    
            public void setTopic(String topic) {
                this.topic = topic;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
    
            public String getJsonObject() {
                return jsonObject;
            }
    
            public void setJsonObject(String jsonObject) {
                this.jsonObject = jsonObject;
            }
        }
    
        private void startConsumerDelayQueue() {
    
            Thread thread = new Thread(() -> {
                while (true) {
                    try {
                        String jsonObject = this.rBlockingDeque.take();
                        LOGGER.info("--> 延迟队列获取数据:{}",jsonObject);
                        KafkaInfo kafkaInfo = JSON.parseObject(jsonObject, KafkaInfo.class);
                        this.send(kafkaInfo.getTopic(), kafkaInfo.getKey(), kafkaInfo.getJsonObject());
                    } catch (InterruptedException e) {
                        LOGGER.error("延迟队列获取数据异常....");
                    }
                }
            });
            thread.setDaemon(true);
            thread.start();
        }
    }

    注意:

    放入队列是使用的RDelayedQueue,获取队列是使用RQueue而不是RDelayedQueue。

  • 相关阅读:
    LocalImprove算法
    Improve算法
    CSU-ACM2014年校队选拔赛指导赛解题报告
    CSU-ACM暑假集训基础组训练赛(4)解题报告
    CSU-ACM暑假集训基础组七夕专场
    CSU-ACM暑假集训基础组训练赛(2) 解题报告
    CSU-ACM2014暑假集训基础组训练赛(1) 解题报告
    Aizu 2164 CSUOJ 1436 Revenge of the Round Table
    插头DP小结
    Codeforces 128C Games with Rectangle
  • 原文地址:https://www.cnblogs.com/huahua035/p/14023526.html
Copyright © 2011-2022 走看看