zoukankan      html  css  js  c++  java
  • Redisson关于使用订阅数问题

    一、前提

    最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。

    二、源码分析

    下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

    1、RedissonLock#lock() 方法

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            // 尝试获取,如果ttl == null,则表示获取锁成功
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return;
            }
    
            // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            if (interruptibly) {
                commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                commandExecutor.syncSubscription(future);
            }
    
            // 后面代码忽略
            try {
                // 无限循环获取锁,直到获取锁成功
                // ...
            } finally {
                // 取消订阅锁释放事件
                unsubscribe(future, threadId);
            }
    }
    

    总结下主要逻辑:

    1. 获取当前线程的线程id;
    2. tryAquire尝试获取锁,并返回ttl
    3. 如果ttl为空,则结束流程;否则进入后续逻辑;
    4. this.subscribe(threadId)订阅当前线程,返回一个RFuture;
    5. 如果在指定时间没有监听到,则会产生如上异常。
    6. 订阅成功后, 通过while(true)循环,一直尝试获取锁
    7. fially代码块,会解除订阅
      所以上述这情况问题应该出现在subscribe()方法中

    2、详细看下subscribe()方法

    protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        // entryName 格式:“id:name”;
        // channelName 格式:“redisson_lock__channel:name”;
        return pubSub.subscribe(getEntryName(), getChannelName());
    }
    

    RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        // ....
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    

    subscribeServiceMasterSlaveConnectionManager的实现中又是通过如下方式构造的

    public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
        this(config, id);
        this.config = cfg;
    
        // 初始化
        initTimer(cfg);
        initSingleEntry();
    }
    
    protected void initTimer(MasterSlaveServersConfig config) {
        int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
        Arrays.sort(timeouts);
        int minTimeout = timeouts[0];
        if (minTimeout % 100 != 0) {
            minTimeout = (minTimeout % 100) / 2;
        } else if (minTimeout == 100) {
            minTimeout = 50;
        } else {
            minTimeout = 100;
        }
    
        timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);
    
        connectionWatcher = new IdleConnectionWatcher(this, config);
    
        // 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:
        subscribeService = new PublishSubscribeService(this, config);
    }
    

    PublishSubscribeService构造函数

    private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        super();
        this.connectionManager = connectionManager;
        this.config = config;
        for (int i = 0; i < locks.length; i++) {
            // 这里初始化了一组信号量,每个信号量的初始值为1
            locks[i] = new AsyncSemaphore(1);
        }
    }
    

    3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面

    private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
    
    public RFuture<E> subscribe(String entryName, String channelName) {
          // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量
         // public AsyncSemaphore getSemaphore(ChannelName channelName) {
        //    return locks[Math.abs(channelName.hashCode() % locks.length)];
        // }
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
    
        AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();    
        RPromise<E> newPromise = new RedissonPromise<E>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return semaphore.remove(listenerHolder.get());
            }
        };
    
        Runnable listener = new Runnable() {
    
            @Override
            public void run() {
                //  如果存在RedissonLockEntry, 则直接利用已有的监听
                E entry = entries.get(entryName);
                if (entry != null) {
                    entry.acquire();
                    semaphore.release();
                    entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }
    
                E value = createEntry(newPromise);
                value.acquire();
    
                E oldValue = entries.putIfAbsent(entryName, value);
                if (oldValue != null) {
                    oldValue.acquire();
                    semaphore.release();
                    oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }
    
                // 创建监听,
                RedisPubSubListener<Object> listener = createListener(channelName, value);
                // 订阅监听
                service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            }
        };
    
        // 最终会执行listener.run方法
        semaphore.acquire(listener);
        listenerHolder.set(listener);
    
        return newPromise;
    }
    

    AsyncSemaphore#acquire()方法

    public void acquire(Runnable listener) {
        acquire(listener, 1);
    }
    
    public void acquire(Runnable listener, int permits) {
        boolean run = false;
    
        synchronized (this) {
            // counter初始化值为1
            if (counter < permits) {
                // 如果不是第一次执行,则将listener加入到listeners集合中
                listeners.add(new Entry(listener, permits));
                return;
            } else {
                counter -= permits;
                run = true;
            }
        }
    
        // 第一次执行acquire, 才会执行listener.run()方法
        if (run) {
            listener.run();
        }
    }
    

    梳理上述逻辑:

    • 1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
    • 2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
    • 3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
    • 4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。
      从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法

    4、PublishSubscribeService#subscribe逻辑如下:

    private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
    private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();
    
    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
        // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。
        subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
        return promise;
    }
    
    private void subscribe(Codec codec, ChannelName channelName,  RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
    
        PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
        if (connEntry != null) {
            // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中
            addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }
    
        // 没有时,才是最重要的逻辑
        freePubSubLock.acquire(new Runnable() {
    
            @Override
            public void run() {
                if (promise.isDone()) {
                    lock.release();
                    freePubSubLock.release();
                    return;
                }
    
                // 从队列中取头部元素
                PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
                if (freeEntry == null) {
                    // 第一次肯定是没有的需要建立
                    connect(codec, channelName, promise, type, lock, listeners);
                    return;
                }
    
                // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。
                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }
    
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                if (oldEntry != null) {
                    freeEntry.release();
                    freePubSubLock.release();
    
                    addListeners(channelName, promise, type, lock, oldEntry, listeners);
                    return;
                }
    
                // 如果remainFreeAmount=0, 则从队列中移除
                if (remainFreeAmount == 0) {
                    freePubSubConnections.poll();
                }
                freePubSubLock.release();
    
                // 增加监听
                RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
    
                ChannelFuture future;
                if (PubSubType.PSUBSCRIBE == type) {
                    future = freeEntry.psubscribe(codec, channelName);
                } else {
                    future = freeEntry.subscribe(codec, channelName);
                }
    
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            if (!promise.isDone()) {
                                subscribeFuture.cancel(false);
                            }
                            return;
                        }
    
                        connectionManager.newTimeout(new TimerTask() {
                            @Override
                            public void run(Timeout timeout) throws Exception {
                                subscribeFuture.cancel(false);
                            }
                        }, config.getTimeout(), TimeUnit.MILLISECONDS);
                    }
                });
            }
    
        });
    }
    
    
    private void connect(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
        // 根据channelName计算出slot获取PubSubConnection
        int slot = connectionManager.calcSlot(channelName.getName());
        RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
        promise.onComplete((res, e) -> {
            if (e != null) {
                ((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e);
            }
        });
    
    
        connFuture.onComplete((conn, e) -> {
            if (e != null) {
                freePubSubLock.release();
                lock.release();
                promise.tryFailure(e);
                return;
            }
    
            // 这里会从配置中读取subscriptionsPerConnection
            PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
            // 每获取一次,subscriptionsPerConnection就会减直到为0
            int remainFreeAmount = entry.tryAcquire();
    
            // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中
            PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
            if (oldEntry != null) {
                releaseSubscribeConnection(slot, entry);
    
                freePubSubLock.release();
    
                addListeners(channelName, promise, type, lock, oldEntry, listeners);
                return;
            }
    
    
            if (remainFreeAmount > 0) {
                // 加入到队列中
                freePubSubConnections.add(entry);
            }
            freePubSubLock.release();
    
            RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
    
            // 这里真正的进行订阅(底层与redis交互)
            ChannelFuture future;
            if (PubSubType.PSUBSCRIBE == type) {
                future = entry.psubscribe(codec, channelName);
            } else {
                future = entry.subscribe(codec, channelName);
            }
    
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        if (!promise.isDone()) {
                            subscribeFuture.cancel(false);
                        }
                        return;
                    }
    
                    connectionManager.newTimeout(new TimerTask() {
                        @Override
                        public void run(Timeout timeout) throws Exception {
                            subscribeFuture.cancel(false);
                        }
                    }, config.getTimeout(), TimeUnit.MILLISECONDS);
                }
            });
        });
    }
    

    PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:

     public int tryAcquire() {
        while (true) {
            int value = subscribedChannelsAmount.get();
            if (value == 0) {
                return -1;
            }
    
            if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
                return value - 1;
            }
        }
    }
    

    梳理上述逻辑:

    • 1、还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
    • 2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法
      • 2.1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
      • 2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
      • 2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
      • 2.4 后面就是进行底层的subscribeaddListener
    • 3、如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
    • 4、如果remainFreeAmount < 0 会抛出IllegalStateException异常;如果remainFreeAmount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
    • 5、最后也是进行底层的subscribeaddListener

    三 总结

    根因: 从上面代码分析, 导致问题的根因是因为PublishSubscribeService 会使用公共队列中的freePubSubConnections, 如果同一个key一次性请求超过subscriptionsPerConnection它的默认值5时,remainFreeAmount就可能出现-1的情况, 那么就会导致commandExecutor.syncSubscription(future)中等待超时,也就抛出如上异常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.

    解决方法: 在初始化Redisson可以可指定这个配置项的值。

    相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-Configuration#23-common-settings

    这就是本该拼搏的年纪,却想得太多,做得太少!
  • 相关阅读:
    第十章 嵌入式Linux的调试技术
    第九章 硬件抽象层:HAL
    第八章 让开发板发出声音:蜂鸣器驱动
    第八章GPS与Google Map定位系统
    第六章 接口驱动程序开发
    第七章 Android嵌入式组态软件
    第五章 S5PV210硬件结构
    第四章
    第三章
    第二章
  • 原文地址:https://www.cnblogs.com/yuanfy008/p/15799743.html
Copyright © 2011-2022 走看看