zoukankan      html  css  js  c++  java
  • Redisson学习

    顺便学习一下原理

    1. 启动,看从上到下的调用和回调关系;

    注册topic的元素数量和状态的变更通知,元素放入在被通知后不断加入,zrangebyscore 拿出0 ~ expiretime中的limit多少元素,然后rpush从右边放入target链表,lrem从左侧删除元素,zrem再删除有序集合元素,zrange取出有序集的第一个元素和他的timeout时间

            QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
                
                @Override
                protected RFuture<Long> pushTaskAsync() {
                    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                            "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                          + "if #expiredValues > 0 then "
                              + "for i, v in ipairs(expiredValues) do "
                                  + "local randomId, value = struct.unpack('dLc0', v);"
                                  + "redis.call('rpush', KEYS[1], value);"
                                  + "redis.call('lrem', KEYS[3], 1, v);"
                              + "end; "
                              + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                          + "end; "
                            // get startTime from scheduler queue head task
                          + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                          + "if v[1] ~= nil then "
                             + "return v[2]; "
                          + "end "
                          + "return nil;",
                          Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                          System.currentTimeMillis(), 100);
                }
                
                @Override
                protected RTopic getTopic() {
                    return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
                }
            };
            
            queueTransferService.schedule(queueName, task);

    2. 定时任务调度

        public synchronized void schedule(String name, QueueTransferTask task) {
            QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
            if (oldTask == null) {
                task.start();
            } else {
                oldTask.incUsage();
            }
        }

    3. 任务启动注册

        public void start() {
            RTopic schedulerTopic = getTopic();
            statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
                @Override
                public void onSubscribe(String channel) {
                    pushTask();
                }
            });
            
            messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
                @Override
                public void onMessage(CharSequence channel, Long startTime) {
                    scheduleTask(startTime);
                }
            });
        }

    4. 循环调度

        private void pushTask() {
            RFuture<Long> startTimeFuture = pushTaskAsync();
            startTimeFuture.onComplete((res, e) -> {
                if (e != null) {
                    if (e instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error(e.getMessage(), e);
                    scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                    return;
                }
                
                if (res != null) {
                    scheduleTask(res);
                }
            });
        }

     定时任务用的Netty的HashedWheelTimer

        private void scheduleTask(final Long startTime) {
            TimeoutTask oldTimeout = lastTimeout.get();
            if (startTime == null) {
                return;
            }
            
            if (oldTimeout != null) {
                oldTimeout.getTask().cancel();
            }
            
            long delay = startTime - System.currentTimeMillis();
            if (delay > 10) {
                Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                    @Override
                    public void run(Timeout timeout) throws Exception {
                        pushTask();
                        
                        TimeoutTask currentTimeout = lastTimeout.get();
                        if (currentTimeout.getTask() == timeout) {
                            lastTimeout.compareAndSet(currentTimeout, null);
                        }
                    }
                }, delay, TimeUnit.MILLISECONDS);
                if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                    timeout.cancel();
                }
            } else {
                pushTask();
            }
        }

    5. 加入: zadd 为按照到期时间有序的集合,加入; rpush 添加元素到一个list

                    zrange  0 0 取有序集合中第一个元素,如果有,则publish发一个通知到chennel,内容为到期时间

        @Override
        public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
            if (delay < 0) {
                throw new IllegalArgumentException("Delay can't be negative");
            }
            
            long delayInMs = timeUnit.toMillis(delay);
            long timeout = System.currentTimeMillis() + delayInMs;
         
            long randomId = ThreadLocalRandom.current().nextLong();
            return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                    "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
                  + "redis.call('zadd', KEYS[2], ARGV[1], value);"
                  + "redis.call('rpush', KEYS[3], value);"
                  // if new object added to queue head when publish its startTime 
                  // to all scheduler workers 
                  + "local v = redis.call('zrange', KEYS[2], 0, 0); "
                  + "if v[1] == value then "
                     + "redis.call('publish', KEYS[4], ARGV[1]); "
                  + "end;",
                  Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
                  timeout, randomId, encode(e));
        }
  • 相关阅读:
    jquery实现选项卡(两句即可实现)
    常用特效积累
    jquery学习笔记
    idong常用js总结
    织梦添加幻灯片的方法
    LeetCode "Copy List with Random Pointer"
    LeetCode "Remove Nth Node From End of List"
    LeetCode "Sqrt(x)"
    LeetCode "Construct Binary Tree from Inorder and Postorder Traversal"
    LeetCode "Construct Binary Tree from Preorder and Inorder Traversal"
  • 原文地址:https://www.cnblogs.com/it-worker365/p/12576017.html
Copyright © 2011-2022 走看看