zoukankan      html  css  js  c++  java
  • Zookeeper+Curator 分布式锁

    本来想着基于zk临时节点,实现一下分布式锁,结果发现有curator框架。PS:原声API真的难用,连递归创建path都没有?

    配置curator maven的时候,md配置了好几个小时,最后发现集中定义依赖版本号 我本来都是写数字的,结果到了zookeeper.version ,我竟然写了 <zookeeper.version>zookeeper-3.4.7</zookeeper.version> 把英文也写上去了 可能是从maven-repository copy过来的 很郁闷。

    curator提供的可重入分布式锁看起来也没什么可封装的,因为它和ReentrantLock确实很。在需要的地方,new一个,再调用对象的方法就好了。

    这个锁就是 InterProcessMutex 类,其构造方法需要我们传入当前CuratorFramework对象,还有要锁定的节点。对了 这个节点是临时节点,再客户端断开连接后,锁不会一直存在,但也不会立即就失去锁,因为ZK需要根据缺省的时间判断你是真的断开了还是某种网络原因。

    首先说明它是一把可重入锁。注意在当前线程的for循环中,他们都是用的是同一把锁,同把锁才可重入。

        public void fun() {
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml");
            CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework");
            InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws");
            for (int i = 0; i < 10; i++) {
                try {
                    lock.acquire();
                    System.out.println("yes");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        //lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    其次模拟分布式环境,在十个线程中各获取锁(锁相同的path),并执行1s的任务,可以发现,多线程被锁同步。

    public void fun() throws Exception {
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml");
            CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework");
            InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws");
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
    
                        try {
                            lock.acquire();
                            System.out.println("yes");
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                lock.release();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
    
                    }
                }).start();
            }
            System.in.read();
        }

    acquire支持传递等待超时时间,返回值是boolean类型。代表超时时间内是否成功获取到锁。

    public void fun() throws Exception {
            ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-curator.xml");
            CuratorFramework curatorFramework = (CuratorFramework) context.getBean("curatorFramework");
            System.out.println(curatorFramework);
            InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test/ws");
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
    
                        try {
                            if (lock.acquire(1000, TimeUnit.MILLISECONDS)) {
                                System.out.println("yes");
                                Thread.sleep(1000);
                            } else {
                                System.out.println("no");
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                lock.release();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
    
                    }
                }).start();
            }
            System.in.read();
        }

    最后附上spring配置:

        <!-- 重连配置 -->
        <bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
            <constructor-arg index="0" value="1000"/>
            <constructor-arg index="1" value="3"/>
        </bean>
    
        <bean id="curatorFramework" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient"
              init-method="start">
            <constructor-arg index="0" value="server:port,server:port,server:port"/>
            <constructor-arg index="1" ref="retryPolicy"/>
        </bean>
  • 相关阅读:
    哈希表及其应用分析
    程序员常用的查找算法
    程序猿必备排序算法及其时间复杂度分析
    递归和回溯求解8皇后问题
    链表种类及其常用操作
    为什么要使用稀疏矩阵??
    微服务项目持续集成部署流程简介
    微服务项目的docker自动化部署流程
    (高考标准分)数据拟合==>多项式方程==>excel公式算成绩(标准分)
    awk用名称对应关系批量重命名
  • 原文地址:https://www.cnblogs.com/tdws/p/5874686.html
Copyright © 2011-2022 走看看