zoukankan      html  css  js  c++  java
  • Consul的分布式锁实现

    构建分布式系统的时候,经常需要控制对共享资源的互斥访问,就涉及到分布式锁(也称为全局锁)的实现,基于目前的各种工具,我们已经有了大量的实现方式,比如:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。
    分布式锁实现

    • 基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:

    • acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false
      release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true

    基本流程
    在这里插入图片描述

    代码实现 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.didispace</groupId>
        <artifactId>consul-distributed-lock</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <version.consul-api>1.2.1</version.consul-api>
            <version.slf4j>1.7.21</version.slf4j>
            <version.slf4j-log4j>1.7.21</version.slf4j-log4j>
            <version.log4j>1.2.17</version.log4j>
            <version.maven-compile-plugin>3.5.1</version.maven-compile-plugin>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>com.ecwid.consul</groupId>
                <artifactId>consul-api</artifactId>
                <version>${version.consul-api}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${version.slf4j}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${version.slf4j-log4j}</version>
            </dependency>
    
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${version.log4j}</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.10</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${version.maven-compile-plugin}</version>
                    <configuration>
                        <source>${maven.compiler.source}</source>
                        <target>${maven.compiler.target}</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    代码实现

    package com.didispace.lock.consul;
    
    import com.ecwid.consul.v1.ConsulClient;
    import com.ecwid.consul.v1.kv.model.PutParams;
    import lombok.extern.slf4j.Slf4j;
    
    import java.time.LocalDateTime;
    
    /**
     * 基于Consul的互斥锁
     *
     */
    @Slf4j
    public class Lock extends BaseLock {
    
        private static final String prefix = "lock/";  // 同步锁参数前缀
    
        /**
         * @param consulClient
         * @param lockKey       同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询
         * @param checkTtl      对锁Session的TTL
         */
        public Lock(ConsulClient consulClient, String lockKey, CheckTtl checkTtl) {
            super(consulClient, prefix + lockKey, checkTtl);
        }
    
        /**
         * 获取同步锁
         *
         * @param block            是否阻塞,直到获取到锁为止,默认尝试间隔时间为500ms。
         * @return
         */
        public Boolean lock(boolean block) throws InterruptedException {
            return lock(block, 500L, null);
        }
    
    
        /**
         * 获取同步锁
         *
         * @param block            是否阻塞,直到获取到锁为止
         * @param timeInterval     block=true时有效,再次尝试的间隔时间
         * @param maxTimes         block=true时有效,最大尝试次数
         * @return
         */
        public Boolean lock(boolean block, Long timeInterval, Integer maxTimes) throws InterruptedException {
            if (sessionId != null) {
                throw new RuntimeException(sessionId + " - Already locked!");
            }
            sessionId = createSession("lock-" + this.keyPath);
            int count = 1;
            while(true) {
                PutParams putParams = new PutParams();
                putParams.setAcquireSession(sessionId);
                if(consulClient.setKVValue(keyPath, "lock:" + LocalDateTime.now(), putParams).getValue()) {
                    return true;
                } else if(block) {
                    if(maxTimes != null && count >= maxTimes) {
                        return false;
                    } else {
                        count ++;
                        if(timeInterval != null)
                            Thread.sleep(timeInterval);
                        continue;
                    }
                } else {
                    return false;
                }
            }
        }
    
        /**
         * 释放同步锁
         *
         * @return
         */
        public Boolean unlock() {
            if(checkTtl != null) {
                checkTtl.stop();
            }
    
            PutParams putParams = new PutParams();
            putParams.setReleaseSession(sessionId);
            boolean result = consulClient.setKVValue(keyPath, "unlock:" + LocalDateTime.now(), putParams).getValue();
    
            destroySession();
            return result;
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    测式代码

    import com.didispace.lock.consul.CheckTtl;
    import com.didispace.lock.consul.Lock;
    import com.ecwid.consul.v1.ConsulClient;
    import lombok.AllArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    
    import java.util.Random;
    
    /**
     * 测试
     */
    
    public class TestLock {
    
        @Test
        public void testLock() throws Exception  {
            ConsulClient consulClient = new ConsulClient();
            CheckTtl checkTtl = new CheckTtl("lock-1", consulClient);
            new Thread(new LockRunner(1, new CheckTtl("lock-1", consulClient))).start();
            new Thread(new LockRunner(2, new CheckTtl("lock-2", consulClient))).start();
            new Thread(new LockRunner(3, new CheckTtl("lock-3", consulClient))).start();
            new Thread(new LockRunner(4, new CheckTtl("lock-4", consulClient))).start();
            new Thread(new LockRunner(5, new CheckTtl("lock-5", consulClient))).start();
            Thread.sleep(30000L);
        }
    
    
    }
    
    @Slf4j
    @AllArgsConstructor
    class LockRunner implements Runnable {
    
        private int flag;
        private CheckTtl checkTtl;
    
        @Override
        public void run() {
            Lock lock = new Lock(new ConsulClient(), "lock-key", checkTtl);
            try {
                // 获取分布式互斥锁(参数含义:阻塞模式、每次尝试获取锁的间隔500ms、尝试n次)
                if (lock.lock(true, 500L, null)) {
                    log.info("Thread {} start!", flag);
                    // 处理业务逻辑
                    Thread.sleep(new Random().nextInt(5000));
                    log.info("Thread {} end!", flag);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
    
        }
    }
     
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    源码下载

  • 相关阅读:
    Atitit.安全性方案规划设计4gm  v1 q928
    Atitit ati licenseService    设计原理
    Atitit.js图表控件总结
    Atitit. null错误的设计 使用Optional来处理null
    System.Web.Mvc 命名空间
    provider: SQL Network Interfaces, error: 26 Error Locating Server/Instance Specified
    Visual Studio 2010 实用功能总结
    My First J2ME
    Java开发利器ideaIU最新版本10.5的keygen
    Happy New Year for 2012
  • 原文地址:https://www.cnblogs.com/ExMan/p/13903672.html
Copyright © 2011-2022 走看看