zoukankan      html  css  js  c++  java
  • zookeeper分布式锁

    首先搭建zookeeper集群docker-compose.yml

    version: '2'
    networks:
      zk:
    services:
      zk1:
        image: zookeeper:3.4
        container_name: zk1
        networks:
            - zk
        ports:
            - "21811:2181"
        environment:
          ZOO_MY_ID: 1
          ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888
      zk2:
        image: zookeeper:3.4
        container_name: zk2
        networks:
            - zk
        ports:
            - "21812:2181"
        environment:
          ZOO_MY_ID: 2
          ZOO_SERVERS: server.1=zk1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zk3:2888:3888
      zk3:
        image: zookeeper:3.4
        container_name: zk3
        networks:
            - zk
        ports:
            - "21813:2181"
        environment:
          ZOO_MY_ID: 3
          ZOO_SERVERS: server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=0.0.0.0:2888:3888

    docker-compose up -d 创建并启动
    检查状态

    [root@localhost test]# docker exec -it zk1 bash ./bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /conf/zoo.cfg
    Mode: follower
    [root@localhost test]# docker exec -it zk2 bash ./bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /conf/zoo.cfg
    Mode: follower
    [root@localhost test]# docker exec -it zk3 bash ./bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /conf/zoo.cfg
    Mode: leader

    下面我们来程序验证:

    我们用现成的curator来操作zk分布式锁

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.GetDataBuilder;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.Time;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class DistributedLock {
    
    
            public static Logger log = LoggerFactory.getLogger(DistributedLock.class);
            private InterProcessMutex interProcessMutex;  //可重入排它锁
            private String lockName;  //竞争资源标志
            private String root = "/distributed/lock/";//根节点
            private static CuratorFramework curatorFramework;
            private static String ZK_URL = "127.0.0.1:21811,127.0.0.1:21812,127.0.0.1:21813";
            static{
                curatorFramework= CuratorFrameworkFactory.newClient(ZK_URL,new ExponentialBackoffRetry(1000,3));
                curatorFramework.start();
            }
    
            /**
             * 实例化
             * @param lockName
             */
            public DistributedLock(String lockName){
                try {
                    this.lockName = lockName;
                    interProcessMutex = new InterProcessMutex(curatorFramework, root + lockName);
                }catch (Exception e){
                    log.error("initial InterProcessMutex exception="+e);
                }
            }
    
            /**
             * 获取锁
             */
            public void acquireLock(){
                int flag = 0;
                try {
                    //重试N次,每次最大等待1s
                    while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)){
                        flag++;
                        if(flag>5){  //重试次
                            break;
                        }
                    }
                } catch (Exception e) {
                    log.error("distributed lock acquire exception="+e);
                }
                if(flag>5){
                    log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  busy"+ new Date().getTime());
                }else{
                    log.info("Thread:"+Thread.currentThread().getId()+" acquire distributed lock  success"+ new Date().getTime());
                }
            }
    
            /**
             * 释放锁
             */
            public void releaseLock(){
                try {
                    if(interProcessMutex != null && interProcessMutex.isAcquiredInThisProcess()){
                        interProcessMutex.release();
                        curatorFramework.delete().inBackground().forPath(root+lockName);
                        //byte[] data = curatorFramework.getData().forPath(root + lockName);
                        log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  success"+ new Date().getTime());
                    }
                }catch (Exception e){
                    log.info("Thread:"+Thread.currentThread().getId()+" release distributed lock  exception="+e);
                }
            }
        }

    接下来我们开2个线程来竞争分布式锁:

    public class TestLock {
    
        public static void main(String[] args) throws InterruptedException {
            String lockName = "lock1";
            DistributedLock lockFoo = new DistributedLock(lockName);
            //lockFoo.acquireLock();
            //lockFoo.releaseLock();
            //
            System.out.println("主线程ID是:" + Thread.currentThread().getId());
            Thread thread1 = new MyThread("thread1",lockFoo);
            Thread thread2 = new MyThread("thread2",lockFoo);
            thread1.start();
            Thread.sleep(1000);
            thread2.start();
    
        }
    
    
    }
    
    /**
     * 自定义线程
     */
    class MyThread extends Thread {
        /*线程名称*/
        private String name;
        private DistributedLock lockFoo;
    
        public MyThread(String name,DistributedLock lockFoo) {
            this.name = name;
            this.lockFoo = lockFoo;
        }
    
        @Override
        public void run() {
    
            if(this.name.equals("thread1")) {
                this.lockFoo.acquireLock();
            }
    
            System.out.println("名称" + name + "的线程ID是:" + Thread.currentThread().getId());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(this.name.equals("thread2")) {
                this.lockFoo.releaseLock();
            }
        }
    }

    程序中thread1获取了锁后,thread2解锁失败。

  • 相关阅读:
    CentOS6设置密码过期时间
    scp
    windows查看进程
    mysql5.7密码问题
    mysql主从切换摘要
    mysql慢日志管理
    Linux学习(一)
    Linux学习(一)
    数据结构与算法(1)-算法时间复杂度
    数据结构与算法(1)-算法时间复杂度
  • 原文地址:https://www.cnblogs.com/zenghansen/p/12506028.html
Copyright © 2011-2022 走看看