zoukankan      html  css  js  c++  java
  • redission的延迟队列

    先推荐3篇文章,讲解延迟队列的实现原理:

    基于redis的延迟队列

    redis的分布式延迟队列

    redission延迟队列的使用

    redission官方文档

    代码实现:

    @Service
    public class KafkaServiceImpl implements KafkaService, InitializingBean {
        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);
    
        @Resource
        private KafkaTemplate<String, String> stringKafkaTemplate;
        @Resource
        private RedissonClient redissonClient;
        private RBlockingDeque<String> rBlockingDeque;
        private RDelayedQueue<String> rDelayedQueue;
        
        @Override
        public void send(String topic,String key,String jsonObject) {
            stringKafkaTemplate.send(new ProducerRecord<>(topic,key, jsonObject));
            LOGGER.info("send to topic[{}], key[{}], jsonObject[{}]",topic, key, jsonObject);
        }
    
        @Override
        public void delaySend(String topic, String key, String jsonObject, Long delay, TimeUnit timeUnit) {
            KafkaInfo kafkaInfo = new KafkaInfo();
            kafkaInfo.setTopic(topic);
            kafkaInfo.setKey(key);
            kafkaInfo.setJsonObject(jsonObject);
            this.rDelayedQueue.offer(JSON.toJSONString(kafkaInfo), delay, timeUnit);
            LOGGER.info("send delay [{}], timeUnit[{}] to topic[{}], key[{}], jsonObject[{}]",delay , timeUnit, topic, key,
                    jsonObject);
        }
    
        @Override
        public void afterPropertiesSet() {
            this.rBlockingDeque = redissonClient.getBlockingDeque(KAFKA_DELAY_QUEUE);
            if (this.rBlockingDeque == null) {
                return;
            }
            this.rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque);
            if (this.rDelayedQueue == null) {
                return;
            }
            this.startConsumerDelayQueue();
        }
    
        private static class KafkaInfo implements Serializable {
            private static final long serialVersionUID = -5517223779255526862L;
    
            private String topic;
            private String key;
            private String jsonObject;
    
            public String getTopic() {
                return topic;
            }
    
            public void setTopic(String topic) {
                this.topic = topic;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
    
            public String getJsonObject() {
                return jsonObject;
            }
    
            public void setJsonObject(String jsonObject) {
                this.jsonObject = jsonObject;
            }
        }
    
        private void startConsumerDelayQueue() {
    
            Thread thread = new Thread(() -> {
                while (true) {
                    try {
                        String jsonObject = this.rBlockingDeque.take();
                        LOGGER.info("--> 延迟队列获取数据:{}",jsonObject);
                        KafkaInfo kafkaInfo = JSON.parseObject(jsonObject, KafkaInfo.class);
                        this.send(kafkaInfo.getTopic(), kafkaInfo.getKey(), kafkaInfo.getJsonObject());
                    } catch (InterruptedException e) {
                        LOGGER.error("延迟队列获取数据异常....");
                    }
                }
            });
            thread.setDaemon(true);
            thread.start();
        }
    }

    注意:

    放入队列是使用的RDelayedQueue,获取队列是使用RQueue而不是RDelayedQueue。

  • 相关阅读:
    js篇之对象数据属性与存取器属性
    使用ts-loader与webpack编译typescripts出现Module build failed: TypeError: Cannot read property 'afterCompile' of undefined
    js对象深拷贝
    前端工程化之webpack中配置babel-loader(四)
    前端工程化-webpack篇之babel-polyfill与babel-runtime(三)
    process.cwd()与__dirname的区别
    jade(pug)学习和使用
    [bzoj4033][HAOI2015]树上染色_树形dp
    [bzoj2657][Zjoi2012]旅游 journey_ 对偶图_树形dp
    [bzoj2097][Usaco2010 Dec]Exercise 奶牛健美操_贪心_树形dp_二分
  • 原文地址:https://www.cnblogs.com/huahua035/p/14023526.html
Copyright © 2011-2022 走看看