zoukankan      html  css  js  c++  java
  • java基于mongodb实现分布式锁

    原理

    通过线程安全findAndModify 实现锁

    实现

    定义锁存储对象:

    /**
     * mongodb 分布式锁
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Document(collection = "distributed-lock-doc")
    public class LockDocument {
        @Id
        private String id;
        private long expireAt;
        private String token;
    }
    

    定义Lock API:

    public interface LockService {
    
        String acquire(String key, long expiration);
    
        boolean release(String key, String token);
    
        boolean refresh(String key, String token, long expiration);
    }
    

    获取锁:

     	@Override
        public String acquire(String key, long expiration) {
            Query query = Query.query(Criteria.where("_id").is(key));
            String token = this.generateToken();
            Update update = new Update()
                .setOnInsert("_id", key)
                .setOnInsert("expireAt", System.currentTimeMillis() + expiration)
                .setOnInsert("token", token);
    
            FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
                                                                     .returnNew(true);
            LockDocument doc = mongoTemplate.findAndModify(query, update, options,
                                                           LockDocument.class);
            boolean locked = doc.getToken() != null && doc.getToken().equals(token);
    
            // 如果已过期
            if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
                DeleteResult deleted = this.mongoTemplate.remove(
                    Query.query(Criteria.where("_id").is(key)
                                        .and("token").is(doc.getToken())
                                        .and("expireAt").is(doc.getExpireAt())),
                    LockDocument.class);
                if (deleted.getDeletedCount() >= 1) {
                    // 成功释放锁, 再次尝试获取锁
                    return this.acquire(key, expiration);
                }
            }
    
            log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
                      key, token, locked);
            return locked ? token : null;
        }
    

    原理:

    • 先尝试upsert锁对象,如果成功且token一致,说明拿到锁
    • 否则加锁失败
    • 如果未拿到锁,但是锁已过期,尝试删除锁
      • 如果删除成功,再次尝试拿锁
      • 如果失败,说明锁可能已经续期了

    释放和续期锁:

    
       @Override
       public boolean release(String key, String token) {
           Query query = Query.query(Criteria.where("_id").is(key)
                                             .and("token").is(token));
           DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
           boolean released = deleted.getDeletedCount() == 1;
           if (released) {
               log.debug("Remove query successfully affected 1 record for key {} with token {}",
                         key, token);
           } else if (deleted.getDeletedCount() > 0) {
               log.error("Unexpected result from release for key {} with token {}, released {}",
                         key, token, deleted);
           } else {
               log.error("Remove query did not affect any records for key {} with token {}",
                         key, token);
           }
    
           return released;
       }
    
       @Override
       public boolean refresh(String key, String token,
                              long expiration) {
           Query query = Query.query(Criteria.where("_id").is(key)
                                             .and("token").is(token));
           Update update = Update.update("expireAt",
                                         System.currentTimeMillis() + expiration);
           UpdateResult updated =
               mongoTemplate.updateFirst(query, update, LockDocument.class);
    
           final boolean refreshed = updated.getModifiedCount() == 1;
           if (refreshed) {
               log.debug("Refresh query successfully affected 1 record for key {} " +
                         "with token {}", key, token);
           } else if (updated.getModifiedCount() > 0) {
               log.error("Unexpected result from refresh for key {} with token {}, " +
                         "released {}", key, token, updated);
           } else {
               log.warn("Refresh query did not affect any records for key {} with token {}. " +
                        "This is possible when refresh interval fires for the final time " +
                        "after the lock has been released",
                        key, token);
           }
    
           return refreshed;
       }
    

    使用

    
    private  LockService lockService;
    
    private void tryAcquireLockAndSchedule() {
            while (!this.stopSchedule) {
                // 尝试拿锁
                this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
                if (this.token != null) {
    				// 拿到锁
                } else {
                    // 等待LOCK_EXPIRATION, 再次尝试
                    Thread.sleep(LOCK_EXPIRATION);
                }
            }
        }
    
    • 先尝试拿锁,如果获取到token,说明拿锁成功
    • 否则可以sleep一段时间后再拿锁

    完整代码,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java


    感谢您的认真阅读。

    如果你觉得有帮助,欢迎点赞支持!

    不定期分享软件开发经验,欢迎关注作者, 一起交流软件开发:

  • 相关阅读:
    archlinux .bash_history
    Ubuntu环境下挂载新硬盘
    软碟通 UltraISO U启替代品 Win32DiskImager 无设备 无盘符 无u盘 无优盘 解决方案 之diskpart
    delphi Integer overflow
    MSBuild Tools offline
    delphi synedit免费的拼写检查器dll
    git 自定义命令行
    lua编译
    gcc ar
    Windows Subsystem for Linux (WSL)挂载移动硬盘U盘 卸载 c d 盘
  • 原文地址:https://www.cnblogs.com/xiaoqi/p/mongodb-lock.html
Copyright © 2011-2022 走看看