zoukankan      html  css  js  c++  java
  • 使用redission实现分布式信号量以及遇到的一些坑

    1、项目背景

      公司的缓存组件WRedis不再支持,所以需要将之前实现的WRedis迁移到新的缓存组件Redis中。Redisson基于java.utils提供了一系列分布式的工具类,比如Map、List、Lock等工具类。在redis和java增加了一层,让我们以更熟悉的方式操作Redis。RPermitExpirableSemaphore(可过期性信号量)是Redisson提供为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。

    2、连接RedissonClient

    redisson提供多种配置方式,程序式的、配置式的,本文选择json配置式。

    Config config = Config.fromJSON(redisCfgFile);
    RedissonClient redissonClient = Redisson.create(config);

    创建RedissionClient,使用json的形式创建。

    redis的配置如下:

    {
        "singleServerConfig":{
            "idleConnectionTimeout":10000,
            "pingTimeout":1000,
            "connectTimeout":10000,
            "timeout":6000,
            "retryAttempts":3,
            "retryInterval":1500,
            "reconnectionTimeout":3000,
            "failedAttempts":3,
            "password":"xxxxx",
            "subscriptionsPerConnection":5,
            "clientName":null,
            "address":"redis://xxxx:000",
            "subscriptionConnectionMinimumIdleSize":1,
            "subscriptionConnectionPoolSize":50,
            "connectionMinimumIdleSize":32,
            "connectionPoolSize":64,
            "database":0
        },
        "threads":0,
        "nettyThreads":0,
        "codec":{
            "class":"org.redisson.codec.JsonJacksonCodec"
        },
        "transportMode":"NIO"
    }

    配置说明:https://yq.aliyun.com/articles/551640?spm=a2c4e.11153940.0.0.3cc221bavS8pf3(参考)

    遇到的坑:

      2.1、断开连接

    redis使用5.0,redission使用3.10.7,出现经常client与server断开连接问题

    追查问题,将线上redis版本降低,从5.0->4.0,不会出现断开连接的问题。但是redission不支持断开重连,使用定时任务去定时的ping server,断开后手动重连。

      2.2、切换主从无法自动重连

    public static void init() {
            SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
                if (RUNNING) {
                    try {
                        NodesGroup nodesGroup = redissonClient.getNodesGroup();
                        Collection<Node> allNodes = nodesGroup.getNodes();
                        for (Node n : allNodes) {
                            boolean ping = n.ping();
                            if (!ping) {
                                //ping不通reload client
                                reload();
                            }
                        }
                    } catch (Exception e) {
                        //抛出异常reload client
                        reload();
                    }
                }
            }, 0, 5, TimeUnit.SECONDS);
        }

      2.3、线上经常出现timed out

    修改配置文件中timeout 3000 -> 6000,因为有一个5s的定时任务在ping server,所以设置稍大一点,就不会出现了。

    3、使用redission实现分布式信号量

    Redisson自带一个RPermitExpirableSemaphore(有过期时间的分布式信号量)

    官方的解释:

     为每个申请对象提供参数化的释放时间的信号量,每个许可证可以被自己的id识别,而且可以被自己的id释放。这个许可证id是128b随机数。同时这个分布式信号量工作于非公平模式,因此申请的顺序是不可以预测。

    具体实现如下:

      3.1、申请许可证

    /**
         * @param semaphoreName 信号量标识
         * @param acquireId     获取者的标识
         * @param limit         总量
         * @return int 0-成功,其他-失败
         * @Description: 指定名称和总量获取信号量
         */
        public static int acquireSemaphore(String semaphoreName, String acquireId, long limit) {
            try {
                RedissonClient client = RedisUtils.getRedissonClient();
                RPermitExpirableSemaphore semaphore = client.getPermitExpirableSemaphore(semaphoreName);
                semaphore.trySetPermits((int) limit);
                //每申请一次信号量,expire信号量的生命SEMAPHORE_LIFE_EXPIRE秒
                semaphore.expire(SEMAPHORE_LIFE_EXPIRE, TimeUnit.SECONDS);
                //尝试次数init
                int time = 0;
                while (MAX_TRY_ACQUIRE_TIME > time) {
                    //尝试获取信号量
                    String permitId = semaphore.tryAcquire(getSemaphoreLifeExpire(), getSemaphoreAcquireExpire(), TimeUnit.MILLISECONDS);
                    //获取信号量失败
                    if (null == permitId) {
                        time++;
                        continue;
                    }
                    //获取信号量成功,设置acquireId和permitId的映射关系
                    if (!RedisUtils.hset(getMapName(semaphoreName), acquireId, permitId, getSemaphoreAcquireExpire(), TimeUnit.MILLISECONDS)) {
                        //如果失败,释放资源
                        semaphore.release(permitId);
                        return acquireError();
                    }
                    return 0;
                }
            } catch (Exception e) {
                e.printStackTrace();
                LOG.error("exception for semaphoreName={}, acquireId={}, msg={}", semaphoreName, acquireId, e.getMessage());
            }
            return acquireError();
        }

    将申请的的许可证id和acquireId放到hash结构中,做一个映射,因为需要这个许可证id去释放资源。

    流程图:

      3.2、释放信号量

    /**
         * @param semaphoreName 信号量标识
         * @param acquireId     获取者的标识
         * @Description: 释放对应的信号量
         * @return: int 0 成功 1 失败(超时的错误可能就无法成功释放)
         * @Author: chi.zhang
         * @Date: 2020/02/19
         */
        public static int releaseSemaphore(String semaphoreName, String acquireId) {
            try {
                RedissonClient client = RedisUtils.getRedissonClient();
                RPermitExpirableSemaphore semaphore = client.getPermitExpirableSemaphore(semaphoreName);
                //根据映射关系找到permitId
                String permitId = RedisUtils.hgetStr(getMapName(semaphoreName), acquireId);
                if (StringUtils.isNotEmpty(permitId)) {
                    //可能被释放,所以使用tryRelease
                    if (semaphore.tryRelease(permitId)) {
                        RedisUtils.hdel(getMapName(semaphoreName), acquireId);
                    }
                } else {
                    LOG.error("释放分布式信号量失败,semaphoreName:{},permitId{}", semaphoreName, permitId);
                    return releaseError();
                }
            } catch (Exception e) {
                e.printStackTrace();
                LOG.error("exception for semaphoreName={}, acquireId={}, msg={}", semaphoreName, acquireId, e.getMessage());
                return releaseError();
            }
            return 0;
        }

    流程图:

     

     

     

     参考文档

    https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

  • 相关阅读:
    rest framework 认证 权限 频率
    rest framework 视图,路由
    rest framework 序列化
    10.3 Vue 路由系统
    10.4 Vue 父子传值
    10.2 Vue 环境安装
    10.1 ES6 的新增特性以及简单语法
    Django 跨域请求处理
    20190827 On Java8 第十四章 流式编程
    20190825 On Java8 第十三章 函数式编程
  • 原文地址:https://www.cnblogs.com/zhangchiblog/p/12565913.html
Copyright © 2011-2022 走看看