zoukankan      html  css  js  c++  java
  • zookeeper(四)Curator框架 的分布式锁

    其实在学之前我也有个疑虑,我为啥要学curator,撇开涨薪这些外在的东西,就单技术层面来讲,学curator能帮我做些什么?这就不得不从zookeeper说起,上篇我已经大篇幅讲了zk是做什么的了,但真正要靠zk去实现多服务器自动拉取更新的配置文件等功能是非常难的,如果没有curator,直接去写的话基本上能把你累哭,就好比连Mybatis或者jpa都没有,让你用原生的代码去写个网站一样,你可以把curator当做一个比较强大的工具,有了它操作zk不再是事,说这么多,是时候进入正题了:

    curator 官网:http://curator.apache.org

    使用curator去实现的几块内容:

     
    学习目录:
    1.使用curator建立与zk的连接
    2.使用curator添加/递归添加节点
    3.使用curator删除/递归删除节点
    4.使用curator创建/验证 ACL(访问权限列表)
    5.使用curator监听 单个/父 节点的变化(watch事件)
    ---------------------------------------------
    6.基于curator实现zookeeper分布式锁(需要掌握基本的多线程知识)
     
    前置条件:已掌握zookeeper的基本操作,对zookeeper有所了解,如果没有掌握请翻阅我前面的博客去学习.

    本节所需要引入的依赖有以下三个,建议直接全部引入即可:

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.12</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.0</version>
            </dependency>
    

      1.通过curator建立与zk的连接

    需要准备连接zk的url,建议直接写成工具类,因为接下来会频繁用到,功能类似于jdbc.

    public class ZkClientUtil {
        private static final int BASE_SLEEP_TIME_MS = 5000; //定义失败重试间隔时间 单位:毫秒
        private static final int MAX_RETRIES = 3; //定义失败重试次数
        private static final int SESSION_TIME_OUT = 1000000; //定义会话存活时间,根据业务灵活指定 单位:毫秒
        private static final String ZK_URI = "192.168.174.132:2181";//你自己的zkurl和端口号
        private static final String NAMESPACE = "laohan_jianshen";
        //工作空间,可以不指定,建议指定,功能类似于项目包,之后创建的所有的节点都会在该工作空间下,方便管理
        
        public static CuratorFramework build(){
        //创建比较简单,链式编程,很爽,基本上指定点参数就OK了
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES);//重试策略
            CuratorFramework client = CuratorFrameworkFactory
                    .builder()
                    .connectString(ZK_URI)
                    .retryPolicy(retryPolicy)
                    .namespace(NAMESPACE)
                    .sessionTimeoutMs(SESSION_TIME_OUT)
                    .build();
            return client;
        }
    }
    

      2.通过curator添加/递归添加节点

    //通过上一步获取到的client,直接启动该client,值得注意的是client在使用前必须先启动:
    client.start;
    client
    .create()//创建节点
    .withMode(CreateMode.xxx)//节点属性:永久节点/临时节点/有序节点 通过CreateMode.即可看到
    .withACL(ZooDefs.Ids.xxx)//节点访问权限,通过Ids.即可看到 默认是OPEN_ACL_UNSAFE(开放不安全权限)
    .forPath("/yourpath","yourdata".getBytes());//指明你的节点路径,数据可以不指定,数据必须是byte[]
    

      创建递归节点:

    //比如我想一次性创建/yourpath/a/b/c/1/2/3...这样的节点,如果按传统方法会累死你
    //curator可以一次性创建好,只需要在创建时添加creatingParentsIfNeeded即可.
    client
    .create()//创建节点
    .creatingParentsIfNeeded()//创建父节点,如果需要的话
     
    ...
    

      3.使用curator删除/递归删除节点

    client
    .delete() //删除
    .guaranteed()//保证一定帮你删了它
    .withVersion(0)//指定要删节点的版本号
    .forPath("/yourpath")//指定要删节点的路径
    

      递归删除:

    //比如我当前的节点结构是这样:/yourpath/a/b/c/1/2/3  我想删除a节点下面的所有目录
    //传统方法累死个人,现在只需要添加deletingChildrenIfNeeded即可
    client
    .delete() //删除
    .deletingChildrenIfNeeded()//如果它有儿子都给删了...
    

      4.使用curator创建/验证 ACL(访问权限列表)

    //为了保证安全,有时需要对节点的访问权限做一些限制,否则可能会引起重要信息泄露/篡改/删除等
    //节点ACL的创建方式有两种,一种是使用ZK提供的,一种是自定义的
    //1.ZK提供的,比较简单,拿来即用,在创建节点时指明withACL即可
    client
    .create()
    .withACL(ZooDefs.Ids.READ_ACL_UNSAFE)//指明该节点是只读节点,还有其他属性,可以通过Ids.查看
    

      

    //创建自定义ACL,需要自己new Id(),并指明是否是加密的,然后账号和密码是多少,加密策略使用zk提供的:
    List<ACL> aclList = new ArrayList<ACL>();
    ACL acl1 = new ACL(ZooDefs.Perms.READ,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
    ACL acl2 = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:123456")));
    aclList.add(acl1);
    aclList.add(acl2);
    //如此我就创建好了两种不同的权限账号,user只能对该节点有读的权限,但root用户对该节点有所有权限
    

      

    //ACL验证,创建好节点之后,可以在服务器的zk安装目录的bin目录下 连接客户端./zkCli
    //然后通过ls /该目录  查看是否可以访问 正常是不能访问的 会提示权限不够
    //下面我们通过curator去连接,要想访问该节点需要在创建client时就指明账号和密码:
    CuratorFramework client = CuratorFrameworkFactory
    .builder()
    .authorization("digest","root:123456".getBytes())//指明使用了加密,用户名和密码用:隔开,以byte[]输入
    //如此,接下来通过该client可以对刚刚创建的节点具有所有权限,如果登录的是user,则只具有读权限.
    

      5.通过curator创建单个节点及其父节点的watch事件

    由于zk的watch事件是只能被触发一次的,触发完即销毁监听,这显然不是我们想要的,在实际开发中更多的场景是需要对某个节点持续监听,所以这里我只介绍创建持续监听的单节点/父节点

    //对单个节点创建watch事件
    //定义NodeCache,指明被监听节点的路径:
    final NodeCache nodeCache = new NodeCache(client,"/yourpath");
    nodeCache.start(true);//开启
    nodeCache
    .getCurrentData()//可以获取该监听节点的数据
    .getPath();//可以获取该监听节点的路径
     
    

      

    //对指定父节点创建watch事件,只要其任何一个子节点,或子节点的子节点...发生变化,就会触发watch事件.
    //定义PathChildrenCache,指明要watch的目录
    final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"yourpath");
    //启动,启动策略有三种:同步,异步提交,异步 用的比较多的就是下面这种,用StartMode.可以查看到
    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    //对该节点创建监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                        //TODO 可以通过PathChildrenCacheEvent.拿到你想要的数据和路径等
        }
    });
    

      至此,curator的常用内容已学习完毕,建议每个都亲自操作一下,为之后的自动配置和分布式锁操作打下基础.

    ===================================================================

    package com.demo.zookeeper.curator.lock;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessLock;
    import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.utils.CloseableUtils;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * num: 模拟 库存 10个
     * 流程: 创建10个线程 同时去减num, 会出现 问题
     * 解决:  curator 的 分布式锁解决方案 InterProcessLock 加锁 acquire() 解锁 release()
     */
    public class DistributedLockDemo2 {
        // ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
        private final String lockPath = "/distributed-lock";
    
        // ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),
        // 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
        private String connectString;
        // Curator 客户端重试策略
        private RetryPolicy retry;
        // Curator 客户端对象
        private CuratorFramework client;
        // client2 用户模拟其他客户端
        private CuratorFramework client2;
        CountDownLatch countDownLatch = new CountDownLatch(10);
        // 数据
        private static int num = 10;
    
        // 初始化资源
        @Before
        public void init() throws Exception {
            // 设置 ZooKeeper 服务地址为本机的 2181 端口
            connectString = "127.0.0.1:2181";
            // 重试策略
            // 初始休眠时间为 1000ms, 最大重试次数为 3
            retry = new ExponentialBackoffRetry(1000, 3);
            // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
            client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
            //client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
            // 创建会话
            client.start();
            //client2.start();
        }
        @Test
        public void sharedLock() throws Exception {
            // 创建共享锁
            final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
            // lock2 用于模拟其他客户端
            final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);
    
            for (int i=0;i<10;i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            lock.acquire();
                            if(num >1){
                                num--;
                                System.out.println(Thread.currentThread().getName()+","+num);
                            }
                            lock.release();
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
            countDownLatch.await();
            System.out.println("### num:" + num);
    
        }
        // 释放资源
        @After
        public void close() {
            CloseableUtils.closeQuietly(client);
        }
    }
    

     

  • 相关阅读:
    MAC下cocos2dx环境搭建
    eclipse混淆打包出错
    eclipseme升级
    MyEclipse 10 中增加插件
    j2me图片处理大全
    关于svn使用
    NFS相关
    BMP文件格式图解
    UDA1341TS
    OpenOCD初始化脚本(uboot)
  • 原文地址:https://www.cnblogs.com/GotoJava/p/13697207.html
Copyright © 2011-2022 走看看