zoukankan      html  css  js  c++  java
  • Zookeeper--分布式锁和消息队列

    在java并发包中提供了若干锁的实现,它们是用于单个java虚拟机进程中的;而分布式锁能够在一组进程之间提供互斥机制,保证在任何时刻只有一个进程可以持有锁。

    分布式环境中多个进程的锁则可以使用Zookeeper来实现。

    下面这种方法是使用顺序节点实现共享锁,流程如下:

    对于lock()操作,首先让所有参与争锁的客户端都在/_locks目录下创建临时顺序节点,然后获取该路径下的所有节点,如果客户端创建的节点序列号最小则获得锁。否则开始监视它前一个节点并进入等待状态。

    对于unlock()操作,将自身创建的节点删除。此时后一个节点的监控将被触发,对应的客户端退出等待状态,取得锁。

    下面是一个简单的示例:

    ---

    /**
     * 分布式锁
     */
    public class DisLock implements Watcher {
    
        public static final String LOCK_ROOT = "/__locks__";
    
        private ZooKeeper zk;
    
        //锁名称,标识竞争的是哪个锁
        private String lockName;
    
        //当前创建的节点路径
        private String path;
    
        //前一个节点的路径
        private String prePath;
    
        //是否获取锁
        private boolean acquired;
    
        //构造函数,连接zk,检查父节点存在
        public DisLock(String lockName) throws KeeperException, InterruptedException, IOException {
            this.lockName = "/" + lockName;
            this.zk = new ZooKeeper("localhost:2181", 30000, this);
            Assert.notNull(zk, "zookeeper is null");
            if (zk.exists(LOCK_ROOT, false) == null) {
                zk.create(LOCK_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
            if (zk.exists(LOCK_ROOT + this.lockName, false) == null) {
                zk.create(LOCK_ROOT + this.lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
        }
    
        public void lock() {
            if (tryLock()) {
                return;
            } else {
                waitLock();
    
            }
        }
    
        //尝试获取锁
        public boolean tryLock() {
            if (acquired) {
                return true;
            }
            try {
                //创建临时节点,自动编号
                path = zk.create(LOCK_ROOT + lockName + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
    
                List<String> ls = zk.getChildren(LOCK_ROOT + lockName, false);
                Collections.sort(ls);
                if (path.equals(LOCK_ROOT + lockName + "/" + ls.get(0))) {
                    acquired = true;
                    return true;
                }
    
                for (int i = 0; i < ls.size(); i++) {
                    if (path.equals(LOCK_ROOT + lockName + "/" + ls.get(i))) {
                        prePath = LOCK_ROOT + lockName + "/" + ls.get(i - 1);
                        break;
                    }
                }
                return false;
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
    
        //释放锁
        public void unlock() {
            if (!acquired) {
                return;
            }
            try {
                zk.delete(path, -1);
                acquired = false;
                //System.out.println(Thread.currentThread().getName() + " free lock");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
        //设置监控并等待锁,前一个节点被删除后退出等待,得到锁
        private synchronized void waitLock() {
            try {
                Stat s = zk.exists(prePath, true);
                if (s == null) {
                    //等到锁,返回
                    acquired = true;
                    return;
                }
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            waitLock();
            return;
        }
    
        //监视节点变化,被监控节点被删除时激活等待锁的线程
        @Override
        public synchronized void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                //System.out.println("触发:"+watchedEvent.getPath());
                this.notify();
            }
        }
    }

    ---

    其中waitLock和process方法需要加synchronized关键字,以便使用wait和notify方法。

    测试方法:

    Zookeeper的另一种应用场景是处理FIFO消息队列,利用zk的自动编号,存储数据和数据一致性能力,模拟生产者-消费者模型。

    创建3个消费者从zk中获取数据,此时需要使用分布式锁,加锁获取数据的部分。出于模拟生产者和消费者都在不同的进程,所有不共享zk等对象。

    示例代码如下:

    生产者:

    public class Producer extends Thread {
    
        private ZooKeeper zk;
    
        private Random ran = new Random();
    
        private static AtomicInteger count = new AtomicInteger(0);
    
        Producer() throws IOException {
            this.zk = new ZooKeeper("localhost:2181", 30000, null);
        }
    
        void produce(String str) throws KeeperException, InterruptedException {
            String name = zk.create(ZkQueue.root + "/element", str.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
            //System.out.println(getName() + " create: " + str);
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    String msg = "msg" + count.getAndIncrement();
                    produce(msg);
                    Thread.sleep(ran.nextInt(1000));
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
            }
        }
    }

    ---循环向zk中写入递增的信息,中间延迟随机毫秒

    消费者:

    public class Consumer extends Thread {
    
        private ZooKeeper zk;
    
        private DisLock lock;
    
        Consumer() throws IOException, KeeperException, InterruptedException {
            lock = new DisLock("queue");
            this.zk = new ZooKeeper("localhost:2181", 30000, null);
        }
    
        private boolean consume() throws KeeperException, InterruptedException {
            lock.lock();
            try {
                List<String> list = zk.getChildren(root, true);
                if (list.isEmpty()) {
                    return true;
                }
                Collections.sort(list);
                String first = list.get(0);
                byte[] b = zk.getData(root + "/" + first, false, null);
                zk.delete(root + "/" + first, -1);
                String str = new String(b);
                System.out.println(getName() + " get:" + str);
            } finally {
                lock.unlock();
            }
            return false;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    if (consume()) {
                        Thread.sleep(1000);
                    }
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
            }
        }
    }

    ---循环从zk中取节点然后删除节点,对整个过程加锁

    启动类:

    public class ZkQueue implements Watcher{
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            System.out.printf("---->%s %s
    ",watchedEvent.getPath(),watchedEvent.getType());
        }
    
        private static ZooKeeper zk;
    
        public static final String root = "/zkqueue";
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
    
            zk = new ZooKeeper("localhost:2181", 30000, null);
    
            ZkUtils.delete(zk, "/__locks__/queue");//此处需保证该路径下没有子节点,否则可能获取到为最小
    
            ZkUtils.delete(zk, root);
            zk.create(root, "queue".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            //模拟2个生产者
            new Producer().start();
            new Producer().start();
    
            //模拟3个消费者
            new Consumer().start();
            new Consumer().start();
            new Consumer().start();
    
            Scanner s = new Scanner(System.in);
            s.nextLine();
    
            ZkUtils.delete(zk, root);
    
            // 关闭连接
            zk.close();
        }
    
    }

    ---

    输出结果:

    Thread-2 get lock
    Thread-2         get:msg0
    Thread-2 free lock
    Thread-3 get lock
    Thread-3         get:msg2
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg3
    Thread-3 get lock
    Thread-2 free lock
    Thread-3         get:msg4
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg5
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg6
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg7
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg8
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg1
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg9
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg10
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg11
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg12
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg13
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg14
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg15
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg16
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg17
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg18
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg19
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg20
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg21
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg22
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg23
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg24
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg25
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg26
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg27
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg28
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg29
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg30
    Thread-3 free lock
    Thread-2 get lock
    Thread-2 free lock
    Thread-4 get lock
    Thread-4 free lock
    Thread-3 get lock
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg31
    Thread-2 free lock
    Thread-2 get lock
    Thread-2         get:msg32
    Thread-2 free lock
    Thread-2 get lock
    Thread-2         get:msg33
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg34
    Thread-4 free lock
    Thread-3 get lock
    Thread-3         get:msg35
    Thread-3 free lock
    Thread-2 get lock
    Thread-2         get:msg36
    Thread-2 free lock
    Thread-4 get lock
    Thread-4 free lock
    Thread-3 get lock
    Thread-3 free lock
    Thread-2 get lock
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg37
    Thread-4 free lock
    Thread-2 get lock
    Thread-2         get:msg38
    Thread-2 free lock
    Thread-3 get lock
    Thread-3         get:msg39
    Thread-3 free lock
    Thread-4 get lock
    Thread-4         get:msg40
    Thread-4 free lock
    Thread-2 get lock
    Thread-2         get:msg41
    Thread-2 free lock
    Thread-3 get lock
    Thread-3 free lock
    Thread-4 get lock
    Thread-4 free lock
    Thread-2 get lock
    Thread-2 free lock
    Thread-4 get lock
    Thread-4         get:msg42
    Thread-4 free lock
    Thread-2 get lock
    Thread-2         get:msg43
    Thread-2 free lock
    Thread-3 get lock
    Thread-3         get:msg44
    Thread-3 free lock
    Thread-4 get lock
    Thread-4 free lock
    Thread-2 get lock
    Thread-2 free lock
    Thread-3 get lock
    Thread-3 free lock
    Thread-4 get lock
    Thread-4         get:msg45
    Thread-4 free lock
    ....
    View Code

    ---

    可见3个消费者线程随机获取到锁,数据在锁中被递增取出,没有重复和遗漏 

    end

  • 相关阅读:
    龙年新作:水印文字添加工具源码摘要
    C语言关键字 浪里白条:goto
    继续聊WPF——自定义命令
    CSS3新的鼠标样式介绍
    C语言深入理解 常量与变量
    XCode 4 不能运行的解决办法
    Runtime专题:详解IOS开发应用之并发Dispatch Queues
    C语言关键字 乱世枭雄:static与extern
    一步步带你做vue后台管理框架(一)——介绍框架
    怎么在谷歌浏览器中安装.crx扩展名的离线Chrome插件?
  • 原文地址:https://www.cnblogs.com/luangeng/p/7398174.html
Copyright © 2011-2022 走看看