zoukankan      html  css  js  c++  java
  • java基于mongodb实现分布式锁

    原理

    通过线程安全findAndModify 实现锁

    实现

    定义锁存储对象:

    /**
     * mongodb 分布式锁
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Document(collection = "distributed-lock-doc")
    public class LockDocument {
        @Id
        private String id;
        private long expireAt;
        private String token;
    }
    

    定义Lock API:

    public interface LockService {
    
        String acquire(String key, long expiration);
    
        boolean release(String key, String token);
    
        boolean refresh(String key, String token, long expiration);
    }
    

    获取锁:

     	@Override
        public String acquire(String key, long expiration) {
            Query query = Query.query(Criteria.where("_id").is(key));
            String token = this.generateToken();
            Update update = new Update()
                .setOnInsert("_id", key)
                .setOnInsert("expireAt", System.currentTimeMillis() + expiration)
                .setOnInsert("token", token);
    
            FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
                                                                     .returnNew(true);
            LockDocument doc = mongoTemplate.findAndModify(query, update, options,
                                                           LockDocument.class);
            boolean locked = doc.getToken() != null && doc.getToken().equals(token);
    
            // 如果已过期
            if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
                DeleteResult deleted = this.mongoTemplate.remove(
                    Query.query(Criteria.where("_id").is(key)
                                        .and("token").is(doc.getToken())
                                        .and("expireAt").is(doc.getExpireAt())),
                    LockDocument.class);
                if (deleted.getDeletedCount() >= 1) {
                    // 成功释放锁, 再次尝试获取锁
                    return this.acquire(key, expiration);
                }
            }
    
            log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
                      key, token, locked);
            return locked ? token : null;
        }
    

    原理:

    • 先尝试upsert锁对象,如果成功且token一致,说明拿到锁
    • 否则加锁失败
    • 如果未拿到锁,但是锁已过期,尝试删除锁
      • 如果删除成功,再次尝试拿锁
      • 如果失败,说明锁可能已经续期了

    释放和续期锁:

    
       @Override
       public boolean release(String key, String token) {
           Query query = Query.query(Criteria.where("_id").is(key)
                                             .and("token").is(token));
           DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
           boolean released = deleted.getDeletedCount() == 1;
           if (released) {
               log.debug("Remove query successfully affected 1 record for key {} with token {}",
                         key, token);
           } else if (deleted.getDeletedCount() > 0) {
               log.error("Unexpected result from release for key {} with token {}, released {}",
                         key, token, deleted);
           } else {
               log.error("Remove query did not affect any records for key {} with token {}",
                         key, token);
           }
    
           return released;
       }
    
       @Override
       public boolean refresh(String key, String token,
                              long expiration) {
           Query query = Query.query(Criteria.where("_id").is(key)
                                             .and("token").is(token));
           Update update = Update.update("expireAt",
                                         System.currentTimeMillis() + expiration);
           UpdateResult updated =
               mongoTemplate.updateFirst(query, update, LockDocument.class);
    
           final boolean refreshed = updated.getModifiedCount() == 1;
           if (refreshed) {
               log.debug("Refresh query successfully affected 1 record for key {} " +
                         "with token {}", key, token);
           } else if (updated.getModifiedCount() > 0) {
               log.error("Unexpected result from refresh for key {} with token {}, " +
                         "released {}", key, token, updated);
           } else {
               log.warn("Refresh query did not affect any records for key {} with token {}. " +
                        "This is possible when refresh interval fires for the final time " +
                        "after the lock has been released",
                        key, token);
           }
    
           return refreshed;
       }
    

    使用

    
    private  LockService lockService;
    
    private void tryAcquireLockAndSchedule() {
            while (!this.stopSchedule) {
                // 尝试拿锁
                this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
                if (this.token != null) {
    				// 拿到锁
                } else {
                    // 等待LOCK_EXPIRATION, 再次尝试
                    Thread.sleep(LOCK_EXPIRATION);
                }
            }
        }
    
    • 先尝试拿锁,如果获取到token,说明拿锁成功
    • 否则可以sleep一段时间后再拿锁

    完整代码,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java


    感谢您的认真阅读。

    如果你觉得有帮助,欢迎点赞支持!

    不定期分享软件开发经验,欢迎关注作者, 一起交流软件开发:

  • 相关阅读:
    微信小程序 editor富文本
    vuex详解
    每日一练
    如何有效地进行代码 Review?
    vue实现瀑布流
    浅谈js防抖和节流-转载
    Java Kafka 消费积压监控
    用于文本内容的复制粘贴
    Java 键值对数据本地保存与读取
    Java ElasticSearch 操作
  • 原文地址:https://www.cnblogs.com/xiaoqi/p/mongodb-lock.html
Copyright © 2011-2022 走看看