zoukankan      html  css  js  c++  java
  • Java分布式锁之数据库方式实现

    之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。

    首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。

    现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。

    以下详细讲解了在多个应用服务里,怎样用数据库去实现分布式锁。

    结合案例:

    1.客户app取出交易(同一个客户在某一个时间点只能对某种资产做取现操作)

    2.交易重试补偿(交易过程服务宕机,扫描重试补偿)

    一、数据库的设计

    数据库锁表的表结构如下:

     
    field type comment
    ID bigint 主键
    OUTER_SERIAL_NO varchar 流水号
    CUST_NO char 客户号
    SOURCE_CODE varchar 锁操作
    THREAD_NO varchar 线程号
    STATUS char 锁状态
    REMARK varchar 备注
    CREATED_AT timestamp 创建时间
    UPDATED_AT timestamp 更新时间

    作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性

    流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)

    客户号:客户的唯一标识

    锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作

    线程号:当前操作线程的线程号,比如取当前线程的uuid

    锁状态:P处理中,F失败,Y成功

    二、代码设计

    代码的目录结构如下: 

    主要贴一下锁操作的核心代码实现:

    锁接口定义:DbLockManager.java

    /**
     * 锁接口 <br>
     * 
     * @Author fugaoyang
     *
     */
    public interface DbLockManager {
    
        /**
         * 加锁
         */
        boolean lock(String outerSerialNo, String custNo, LockSource source);
    
        /**
         * 解锁
         */
        void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus);
    
    }
    View Code

    锁接口实现类:DbLockManagerImpl.java

    /**
     * 
     * 数据库锁实现<br>
     * 
     * @author fugaoyang
     *
     */
    @Service
    public class DbLockManagerImpl implements DbLockManager {
    
        private final Logger LOG = LoggerFactory.getLogger(this.getClass());
    
        @Autowired
        private DbSyncLockMapper lockMapper;
    
        @Transactional
        public boolean lock(String outerSerialNo, String custNo, LockSource source) {
    
            boolean isLock = false;
            TradeSyncLock lock = null;
            try {
                lock = lockMapper.find(outerSerialNo, custNo, source.getCode());
    
                if (null == lock) {
                    lock = new TradeSyncLock();
                    createLock(lock, outerSerialNo, custNo, source);
    
                    int num = lockMapper.insert(lock);
                    if (num == 1) {
                        isLock = true;
                    }
    
                    LOG.info(ThreadLogUtils.getLogPrefix() + "加入锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
                    return isLock;
                }
    
                // 根据交易类型进行加锁
                isLock = switchSynsLock(lock, source);
                LOG.info(ThreadLogUtils.getLogPrefix() + "更新锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
    
            } catch (Exception e) {
                LOG.error(ThreadLogUtils.getLogPrefix() + "交易加锁异常, 客户号:" + custNo, e);
            }
            return isLock;
        }
    
        @Transactional
        public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) {
    
            try {
                TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode());
    
                if (null != lock) {
                    lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(),
                            ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid());
                }
    
                LOG.info(ThreadLogUtils.getLogPrefix() + "释放锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
            } catch (Exception e) {
                LOG.error(ThreadLogUtils.getLogPrefix() + "释放锁异常, 客户号:{}", custNo, e);
            }
        }
    
        /**
         * 匹配加锁
         */
        private boolean switchSynsLock(TradeSyncLock lock, LockSource source) {
            boolean isLock = false;
    
            switch (source) {
            case WITHDRAW:
                ;
                isLock = tradeSynsLock(lock);
                break;
            case WITHDRAW_RETRY:
                ;
                isLock = retrySynsLock(lock);
                break;
            default:
                ;
            }
            return isLock;
        }
    
        /**
         * 交易同步锁
         */
        private boolean tradeSynsLock(TradeSyncLock lock) {
            // 处理中的不加锁,即不执行交易操作
            if (LockStatus.P.getName().equals(lock.getStatus())) {
                return false;
            }
    
            int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(),
                    ThreadLogUtils.getCurrThreadUuid(), null);
            if (num == 1) {
                return true;
            }
            return false;
        }
    
        /**
         * 补偿同步锁
         */
        private boolean retrySynsLock(TradeSyncLock lock) {
            // 处理中或处理完成的不加锁,即不执行补偿操作
            if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) {
                return false;
            }
    
            int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(),
                    ThreadLogUtils.getCurrThreadUuid(), null);
            if (num == 1) {
                return true;
            }
            return false;
        }
    
        private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) {
            lock.setOuterSerialNo(outerSerialNo);
            lock.setCustNo(custNo);
            lock.setSourceCode(source.getCode());
            lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid());
            lock.setStatus(LockStatus.P.getName());
            lock.setRemark(source.getDesc());
        }
    
    }
    View Code

    获取当前线程号以及打印uuid工具类ThreadLogUtils.Java

    /**
     * 
     * 线程处理<br>
     * 
     * @author fugaoyang
     *
     */
    public class ThreadLogUtils {
    
        private static ThreadLogUtils instance = null;
    
        private ThreadLogUtils() {
            setInstance(this);
        }
    
        // 初始化标志
        private static final Object __noop = new Object();
        private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() {
            @Override
            protected Object initialValue() {
                return null;
            }
        };
    
        // 当前线程的UUID信息,主要用于打印日志;
        private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() {
            @Override
            protected String initialValue() {
                return UUID.randomUUID().toString()/* .toUpperCase() */;
            }
        };
    
        private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() {
            @Override
            protected String initialValue() {
                return UUIDGenerator.getUuid();
            }
        };
    
        public static void clear(Boolean isNew) {
            if (isNew) {
    
                currLogUuid.remove();
    
                __flag.remove();
    
                currThreadUuid.remove();
    
            }
        }
    
        public static String getCurrLogUuid() {
            if (!isInitialized()) {
                throw new IllegalStateException("TLS未初始化");
            }
    
            return currLogUuid.get();
        }
    
        public static String getCurrThreadUuid() {
            return currThreadUuid.get();
        }
    
        public static void clearCurrThreadUuid() {
            currThreadUuid.remove();
        }
    
        public static String getLogPrefix() {
            if (!isInitialized()) {
                return "";
            }
    
            return "<uuid=" + getCurrLogUuid() + ">";
        }
    
        private static boolean isInitialized() {
            return __flag.get() != null;
        }
    
        /**
         * 初始化上下文,如果已经初始化则返回false,否则返回true<br/>
         *
         * @return
         */
        public static boolean initialize() {
            if (isInitialized()) {
                return false;
            }
    
            __flag.set(__noop);
            return true;
        }
    
        private static void setInstance(ThreadLogUtils instance) {
            ThreadLogUtils.instance = instance;
        }
    
        public static ThreadLogUtils getInstance() {
            return instance;
        }
    
    }
    View Code

    两种锁的实现的大致思路如下:

    1.交易同步锁

    当一个客户在app取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;

    当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;

    当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;

    2.交易重试补偿锁

    1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;

    2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。

    上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。

    后续可以优化的部分:

    1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。

    2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。

    3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。

    因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~

    完整示例代码后续会更新到github。

  • 相关阅读:
    使用SQL Server Management Studio 创建数据库备份作业
    ClickOnce 获取客户端发布版本号
    在C#用HttpWebRequest中发送GET/HTTP/HTTPS请求
    找不到方法:“Void System.Data.Objects.ObjectContextOptions.set_UseConsistentNullReferenceBehavior(Boolean)
    常用操作类
    数据库命名规范
    expression动态构成
    C# 获得当前方法 和 方法调用链 的 方法
    EF架构封装类
    基于微软企业库的分层代码框架
  • 原文地址:https://www.cnblogs.com/garryfu/p/8028055.html
Copyright © 2011-2022 走看看