zoukankan      html  css  js  c++  java
  • 利用consul在spring boot中实现最简单的分布式锁

    因为在项目实际过程中所采用的是微服务架构,考虑到承载量基本每个相同业务的服务都是多节点部署,所以针对某些资源的访问就不得不用到用到分布式锁了。

    这里列举一个最简单的场景,假如有一个智能售货机,由于机器本身的原因不能同一台机器不能同时出两个商品,这就要求在在出货流程前针对同一台机器在同一时刻出现并发

    创建订单时只能有一笔订单创建成功,但是订单服务是多节点部署的,所以就不得不用到分布式锁了。

    以上只是一种简单的业务场景,在各种大型互联网实际应用中,需要分布式锁的业务场景会更多,综合比较了业界基于各种中间件来实现的分布式锁方案,然后结合实际业务最终

    决定采用consul来实现,因为我们的项目中采用了consul做注册中心,并且consul天生可以保证一致性(这点类似zk),当然zk也能实现分布式锁,但是这里不对这点做过多讨论。

    redis虽然也能实现分布式锁,但是可能因为场景比较复杂,如果redis采用cluster部署的话,如果某一主节点出现故障的话,有一定几率会出现脑裂现象,这样就可能会让竞争者在

    并发时同时获得到锁,这样可能会破坏掉后面的业务,当然出现这种情况的概率很低,但是也不能完全排除,因为redis的根本不能保证强一致性导致的。

    好了,这里说的最简单的分布式锁的意思是,多个竞争者同一时间并发去获得锁时,获取失败的就直接返回了,获取成功的继续后续的流程,然后在合适的时间释放锁,并且为锁

    加了超时时间,防止获得到锁的进程或线程在未来得及释放锁时自己挂掉了,导致资源处于一直被锁定的状态无法得到释放。主要的实现逻辑就是这样,如果有人想实现获得锁失

    败的竞争者一直继续尝试获得,可以基于该示例进行修改,加上自旋逻辑就OK。

    以下是锁实现代码:

      1 package com.lyb.consullock;
      2 
      3 import com.ecwid.consul.v1.ConsulClient;
      4 import com.ecwid.consul.v1.agent.model.NewCheck;
      5 import com.ecwid.consul.v1.kv.model.PutParams;
      6 import com.ecwid.consul.v1.session.model.NewSession;
      7 import com.ecwid.consul.v1.session.model.Session;
      8 import lombok.Data;
      9 
     10 
     11 import java.time.LocalDateTime;
     12 import java.util.ArrayList;
     13 import java.util.List;
     14 
     15 
     16 public class DistributedLock{
     17     private ConsulClient consulClient;
     18 
     19     /**
     20      * 构造函数
     21      * @param consulHost 注册consul的client或服务端的Ip或主机名,或域名
     22      * @param consulPort 端口号
     23      */
     24     public DistributedLock(String consulHost,int consulPort){
     25         consulClient = new ConsulClient(consulHost,consulPort);
     26     }
     27 
     28     /**
     29      * 获得锁的方法
     30      * @param lockName 竞争的资源名
     31      * @param ttlSeconds 锁的超时时间,超过该时间自动释放
     32      * @return
     33      */
     34     public LockContext getLock(String lockName,int ttlSeconds){
     35         LockContext lockContext = new LockContext();
     36         if(ttlSeconds<10 || ttlSeconds > 86400) ttlSeconds = 60;
     37         String sessionId = createSession(lockName,ttlSeconds);
     38         boolean success = lock(lockName,sessionId);
     39         if(success == false){
     40             consulClient.sessionDestroy(sessionId,null);
     41             lockContext.setGetLock(false);
     42 
     43             return lockContext;
     44         }
     45 
     46         lockContext.setSession(sessionId);
     47         lockContext.setGetLock(true);
     48 
     49         return lockContext;
     50     }
     51 
     52     /**
     53      * 释放锁
     54      * @param sessionID
     55      */
     56     public void releaseLock(String sessionID){
     57         consulClient.sessionDestroy(sessionID,null);
     58     }
     59 
     60     private String createSession(String lockName,int ttlSeconds){
     61         NewCheck check = new NewCheck();
     62         check.setId("check "+lockName);
     63         check.setName(check.getId());
     64         check.setTtl(ttlSeconds+"s"); //该值和session ttl共同决定决定锁定时长
     65         check.setTimeout("10s");
     66         consulClient.agentCheckRegister(check);
     67         consulClient.agentCheckPass(check.getId());
     68 
     69         NewSession session = new NewSession();
     70         session.setBehavior(Session.Behavior.RELEASE);
     71         session.setName("session "+lockName);
     72         session.setLockDelay(1);
     73         session.setTtl(ttlSeconds + "s"); //和check ttl共同决定锁时长
     74         List<String> checks = new ArrayList<>();
     75         checks.add(check.getId());
     76         session.setChecks(checks);
     77         String sessionId = consulClient.sessionCreate(session,null).getValue();
     78 
     79         return sessionId;
     80     }
     81 
     82     private boolean lock(String lockName,String sessionId){
     83         PutParams putParams = new PutParams();
     84         putParams.setAcquireSession(sessionId);
     85 
     86         boolean isSuccess = consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue();
     87 
     88         return isSuccess;
     89     }
     90 
     91     /**
     92      * 竞争锁时返回的对象
     93      */
     94     @Data
     95     public class LockContext{
     96         /**
     97          * 获得锁成功返回该值,比便后面用该值来释放锁
     98          */
     99         private String session;
    100         /**
    101          * 是否获得到锁
    102          */
    103         private boolean isGetLock;
    104     }
    105 }

    pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.6.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.lyb</groupId>
        <artifactId>consul-lock</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>consul-lock</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Greenwich.SR2</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-consul-discovery</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.8</version>
                <optional>true</optional>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    测试代码:

    package com.lyb.consullock;
    
    import org.junit.Assert;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ConsulLockApplicationTests {
        @Autowired
        private ServiceConfig serviceConfig;
        @Test
        public void lockSameResourer() {
            //针对相同资源在同一时刻只有一个线程会获得锁
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            for (int a=0;a<20;a++){
                threadPool.submit(
                        () -> {
                            for (int i = 0;i < 100; i++) {
                                DistributedLock lock = new DistributedLock(
                                        serviceConfig.getConsulRegisterHost(),
                                        serviceConfig.getConsulRegisterPort());
    
                                DistributedLock.LockContext lockContext = lock.getLock("test lock", 10);
                                if (lockContext.isGetLock()) {
                                    System.out.println(Thread.currentThread().getName() + "获得了锁");
                                    try {
                                        TimeUnit.SECONDS.sleep(1);
                                        lock.releaseLock(lockContext.getSession());
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }else {
                                    //System.out.println(Thread.currentThread().getName() + "没有获得锁");
                                }
                            }
                        });
            }
    
            try {
                TimeUnit.MINUTES.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Test
        public void lockDiffResource(){
            //针对不通的资源所有线程都应该能获得锁
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            for (int a=0;a<20;a++){
                threadPool.submit(
                        () -> {
                            for (int i = 0;i < 100; i++) {
                                DistributedLock lock = new DistributedLock(
                                        serviceConfig.getConsulRegisterHost(),
                                        serviceConfig.getConsulRegisterPort());
    
                                DistributedLock.LockContext lockContext = lock.getLock("test lock"+Thread.currentThread().getName(), 10);
                                if (lockContext.isGetLock()) {
                                    System.out.println(Thread.currentThread().getName() + "获得了锁");
                                    try {
                                        TimeUnit.SECONDS.sleep(1);
                                        lock.releaseLock(lockContext.getSession());
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                }else {
                                    //System.out.println(Thread.currentThread().getName() + "没有获得锁");
                                    Assert.assertTrue(lockContext.isGetLock());
                                }
                            }
                        });
            }
    
            try {
                TimeUnit.MINUTES.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

     希望对大家有所帮助

    项目路径:

    https://github.com/wenwuxianren/consul-lock

  • 相关阅读:
    作业任务03
    作业任务02
    作业任务01
    Shell脚本编程01-shell编程与规范与变量
    Linux网络服务05-----DNS域名解析服务(二)
    Linux网络服务05-----DNS域名解析服务(一)
    Linux网络服务13----PXE 高效能批量网络装机
    网络基础知识
    Nginx 入门
    shell 脚本须知
  • 原文地址:https://www.cnblogs.com/wenwuxianren/p/11181786.html
Copyright © 2011-2022 走看看