zoukankan      html  css  js  c++  java
  • zookeeper 分布式锁

    1,为什么需要分布式锁

    在分布式集群环境中,多个程序同时在跑,就存在多个线程对同一个资源的共享并且操作问题。因此会存在线程安全问题。

    2,在单个JVM 程序中,存在线程安全问题的时候,我们一般可以用synchronized 同步代码块,或者使用Lock 锁等方式,使得在多线程环境下,同一时刻只能由一个线程去操作共享资源。

    3,在分布式多个JVM 环境中,我们一般使用分布式锁,使得在同一时刻,只能由集群中的某一台JVM 服务获取到锁,同时有操作的权力,其他JVM 服务只能等锁释放之后,再次去获取锁,获得锁的服务才能去操作。

    4,zookeeper 如何实现分布式锁

    zookeeper 的数据结构中,节点Znode的类型可以分为:临时(ephemeral)和  永久(persistent)。

    通过zookeeper 实现分布式锁,主要是通过临时节点来实现。临时节点的生命周期是一个连接会话,即客户端连接zookeeper服务器,创建了临时节点,当连接中断,则这个节点就会被删除。同时zookeeper的节点名称只能唯一。

    所以实现思路:

    集群的JVM同时创建一个临时节点,创建成功的JVM 获得锁,操作业务逻辑,操纵完成,释放锁

    package com.zklock;
    
    public interface Lock {
    
        public void lock();
    
        public void unlock();
    
    }
    package com.zklock;
    
    import org.I0Itec.zkclient.ZkClient;
    
    public abstract class ZookeeperAbstractLock implements Lock {
    
        // zk连接地址
        private static final String CONNECTSTRING = "127.0.0.1:2181";
        // 创建zk连接,protected 让子类去实现
        protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
        // 分布式锁的临时节点,protected 让子类去实现
        protected static final String PATH = "/lock";
    
        // 获取锁
        public void lock() {
            if (getLock()) {
                // 获取到了锁
                System.out.println("######获取锁成功######");
            } else {
                // 没有获取到锁的JVM 进行等待
                waitLock();
                // 等拥有锁的JVM释放之后,继续获取锁
                lock();
            }
    
        }
    
        abstract void waitLock();
    
        
        abstract boolean getLock();
    
        // 释放锁
        public void unlock() {
            if (zkClient != null) {
                zkClient.close();
                System.out.println("######锁已经释放######");
            }
        }
    
    }

    具体实现

    package com.zklock;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.I0Itec.zkclient.IZkDataListener;
    
    public class ZookeeperLock extends ZookeeperAbstractLock {
    
        private CountDownLatch countDownLatch = null;
    
        @Override
        void waitLock() {
    
            // zookeeper 数据监听的接口
            IZkDataListener izkDataListener = new IZkDataListener() {
                public void handleDataChange(String arg0, Object arg1) throws Exception {
    
                }
    
                public void handleDataDeleted(String path) throws Exception {
                    // 一直在监听,节点有变化的时候原子量就是-1
                    if (countDownLatch != null) {
    
                        countDownLatch.countDown();
                    }
                }
    
            };
            // 已经有JVM 获取到锁,现在需要等待锁释放,所以zkClient会去监听
            zkClient.subscribeDataChanges(PATH, izkDataListener);
            if (zkClient.exists(PATH)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    // 一直进行等待,等待监听的路径有变化
                    countDownLatch.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            // 删除监听
            zkClient.unsubscribeDataChanges(PATH, izkDataListener);
    
        }
    
        @Override
        boolean getLock() {
            // 获取锁
            if (zkClient != null) {
                try {
                    zkClient.createEphemeral(PATH);
                    return true;
                } catch (Exception e) {
                    System.out.println(e);
                    System.out.println("#####创建临时节点失败####");
                    return false;
                }
            }
            return false;
        }
    }

    测试:

    package com.zklock;
    
    
    public class Test implements Runnable{
        
        ZookeeperLock zookeeperLock  = new ZookeeperLock();
        
        public static void main(String[] args) {
            for(int i = 0; i < 50; i++){
                Test test = new Test();
                new Thread(test).start();
            }
        }
    
        public void run() {
            try {
                zookeeperLock.lock();
                System.out.println("id= "+System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                zookeeperLock.unlock();
            }
        }
    
    }
  • 相关阅读:
    超时检测
    非阻塞IO
    阻塞IO
    IO的概念
    http_server实例代码
    套接字中的recv与send的注意事项
    tcp流式套接字和udp数据报套接字编程区别
    TCP的粘包
    socket创建UDP服务端和客户端
    面向连接与面向非连接的传输服务区别
  • 原文地址:https://www.cnblogs.com/pickKnow/p/11338579.html
Copyright © 2011-2022 走看看