zoukankan      html  css  js  c++  java
  • Redisson学习

    顺便学习一下原理

    1. 启动,看从上到下的调用和回调关系;

    注册topic的元素数量和状态的变更通知,元素放入在被通知后不断加入,zrangebyscore 拿出0 ~ expiretime中的limit多少元素,然后rpush从右边放入target链表,lrem从左侧删除元素,zrem再删除有序集合元素,zrange取出有序集的第一个元素和他的timeout时间

            QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
                
                @Override
                protected RFuture<Long> pushTaskAsync() {
                    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                            "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                          + "if #expiredValues > 0 then "
                              + "for i, v in ipairs(expiredValues) do "
                                  + "local randomId, value = struct.unpack('dLc0', v);"
                                  + "redis.call('rpush', KEYS[1], value);"
                                  + "redis.call('lrem', KEYS[3], 1, v);"
                              + "end; "
                              + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                          + "end; "
                            // get startTime from scheduler queue head task
                          + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                          + "if v[1] ~= nil then "
                             + "return v[2]; "
                          + "end "
                          + "return nil;",
                          Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                          System.currentTimeMillis(), 100);
                }
                
                @Override
                protected RTopic getTopic() {
                    return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
                }
            };
            
            queueTransferService.schedule(queueName, task);

    2. 定时任务调度

        public synchronized void schedule(String name, QueueTransferTask task) {
            QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
            if (oldTask == null) {
                task.start();
            } else {
                oldTask.incUsage();
            }
        }

    3. 任务启动注册

        public void start() {
            RTopic schedulerTopic = getTopic();
            statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
                @Override
                public void onSubscribe(String channel) {
                    pushTask();
                }
            });
            
            messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
                @Override
                public void onMessage(CharSequence channel, Long startTime) {
                    scheduleTask(startTime);
                }
            });
        }

    4. 循环调度

        private void pushTask() {
            RFuture<Long> startTimeFuture = pushTaskAsync();
            startTimeFuture.onComplete((res, e) -> {
                if (e != null) {
                    if (e instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error(e.getMessage(), e);
                    scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                    return;
                }
                
                if (res != null) {
                    scheduleTask(res);
                }
            });
        }

     定时任务用的Netty的HashedWheelTimer

        private void scheduleTask(final Long startTime) {
            TimeoutTask oldTimeout = lastTimeout.get();
            if (startTime == null) {
                return;
            }
            
            if (oldTimeout != null) {
                oldTimeout.getTask().cancel();
            }
            
            long delay = startTime - System.currentTimeMillis();
            if (delay > 10) {
                Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                    @Override
                    public void run(Timeout timeout) throws Exception {
                        pushTask();
                        
                        TimeoutTask currentTimeout = lastTimeout.get();
                        if (currentTimeout.getTask() == timeout) {
                            lastTimeout.compareAndSet(currentTimeout, null);
                        }
                    }
                }, delay, TimeUnit.MILLISECONDS);
                if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                    timeout.cancel();
                }
            } else {
                pushTask();
            }
        }

    5. 加入: zadd 为按照到期时间有序的集合,加入; rpush 添加元素到一个list

                    zrange  0 0 取有序集合中第一个元素,如果有,则publish发一个通知到chennel,内容为到期时间

        @Override
        public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
            if (delay < 0) {
                throw new IllegalArgumentException("Delay can't be negative");
            }
            
            long delayInMs = timeUnit.toMillis(delay);
            long timeout = System.currentTimeMillis() + delayInMs;
         
            long randomId = ThreadLocalRandom.current().nextLong();
            return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                    "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
                  + "redis.call('zadd', KEYS[2], ARGV[1], value);"
                  + "redis.call('rpush', KEYS[3], value);"
                  // if new object added to queue head when publish its startTime 
                  // to all scheduler workers 
                  + "local v = redis.call('zrange', KEYS[2], 0, 0); "
                  + "if v[1] == value then "
                     + "redis.call('publish', KEYS[4], ARGV[1]); "
                  + "end;",
                  Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
                  timeout, randomId, encode(e));
        }
  • 相关阅读:
    Linux系统:Centos7搭建Redis单台和集群环境
    Linux系统:Centos7安装Jdk8、Tomcat8、MySQL5.7环境
    Linux系统:常用Linux系统管理命令总结
    转--->svn的使用
    开发中常见的问题
    1.NSThread
    用NSOperation和NSOperationQueue实现多线程编程
    很实用的时间比对算法
    简单的IOS6和IOS7通过图片名适配
    nginx完美支持tp框架
  • 原文地址:https://www.cnblogs.com/it-worker365/p/12576017.html
Copyright © 2011-2022 走看看