zoukankan      html  css  js  c++  java
  • zookeeper编程入门系列之zookeeper实现分布式进程监控和分布式共享锁(图文详解)

    本博文的主要内容有

        一、zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行

        二、zookeeper编程入门系列之zookeeper实现分布式进程监控

       三、zookeeper编程入门系列之zookeeper实现分布式共享锁

      我这里采用的是maven项目,这个很简单,不会的博友,见我下面写的这篇博客

    Zookeeper项目开发环境搭建(EclipseMyEclipse + Maven)

      这里,推荐用下面的eclipse版本(当然你若也有myeclipse,请忽视我这句话)

     

     

    Group Id:zhouls.bigdata

    Artifact Id:zkDemo

    Package:zhouls.bigdata.zkDemo

       将默认的jdk,修改为jdk1.7

      修改默认的pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>zhouls.bigdata</groupId>
      <artifactId>zkDemo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>zkDemo</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
    </project>

       修改后的pom.xml为

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>zhouls.bigdata</groupId>
      <artifactId>zkDemo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>zkDemo</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <!-- 此版本的curator操作的zk是3.4.6版本 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.10.0</version>
            </dependency>
    
        </dependencies>
    </project>

     

       junit是单元测试。

     

      也许,大家的jdk1.7会报错误,那就改为jdk1.8。

     一、zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行

       写一个TestCurator.java

      怎么通过Curator连接到zookeeper官网,其实是有固定的。

       这打开需要好几分钟的时间,里面会有示范代码,教我们怎么连接zookeeper。

      我这里的zookeeper集群是master(192.168.80.145)、slave1(192.168.80.146)和slave2(192.168.80.147)。

     

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] 

     

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] 

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] 

       现在,我想通过Curator来连接到zookeeper集群,并在里面创建临时节点。

       这里,永久节点为monitor、临时节点为test123。(当然,大家可以自行去命名)(同时,大家也可以通过命令行方式来创建,我这里就是以代码api形式来创建了)

     

     

        比如,这样,monitor是父节点(作为永久节点),test123是临时节点。

         而现在,是monitor都没有,它不会给我们一次性创建完。

     

       除非,大家在命令行里先创建好monitor节点,之后,然后上述代码可以操作成功。否则,就需如下修改代码。

    package zhouls.bigdata.zkDemo;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void testName() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/test123");//指定节点名称
                
    
        }
    
    }

     

       可以看到成功monitor生成,其实啊,/monitor/test123节点也是有的。(只是中间又消失了)

      为什么会中间消失了呢?是因为,test123是临时节点。创建完之后,它就会消失了。

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] ls /
    [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /monitor
    []

      那么,我想看,怎么用代码来实现呢?

      增加以下代码

       此时的代码是TestCurator.java

    package zhouls.bigdata.zkDemo;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void testName() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/test123");//指定节点名称
            while (true) {
                        ;
            }
        
    
        }
    
    }

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] ls /
    [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /monitor
    [test123]
    [zk: localhost:2181(CONNECTED) 3] 

      然后,我这边,把代码,来停掉,则它就会消失了。

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] ls /
    [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /monitor
    []
    [zk: localhost:2181(CONNECTED) 3] ls /monitor
    [test123]
    [zk: localhost:2181(CONNECTED) 4] ls /monitor
    []
    [zk: localhost:2181(CONNECTED) 5] 

      好的,那么,现在,又有一个疑问出来了,在往monitor节点里,注册节点如test123,那么,我怎么知道是哪一台的呢?则此时,需要做如下修改

       此刻的代码如下TestCurator.java

    package zhouls.bigdata.zkDemo;
    
    import java.net.InetAddress;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void testName() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/" + ip);//指定节点名称
            while (true) {
                        ;
            }
            
    
        }
    
    }

     

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] ls /
    [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /monitor
    []
    [zk: localhost:2181(CONNECTED) 3] ls /monitor
    [test123]
    [zk: localhost:2181(CONNECTED) 4] ls /monitor
    []
    [zk: localhost:2181(CONNECTED) 5] ls /monitor
    [169.254.28.160]
    [zk: localhost:2181(CONNECTED) 6] 

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] ls /
    [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /monitor
    [test123]
    [zk: localhost:2181(CONNECTED) 3] ls /monitor
    [169.254.28.160]
    [zk: localhost:2181(CONNECTED) 4] 

    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 1] ls /
    [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /monitor
    [test123]
    [zk: localhost:2181(CONNECTED) 3] ls /monitor
    [169.254.28.160]
    [zk: localhost:2181(CONNECTED) 4] 

      这个ip怎么不是我集群里的ip呢?是哪里的???

      原来是这里的

       因为,我是在test测试,所以,拿到的是windows本地的ip地址。

      如果,放在mian去测试,则就是拿到集群里的ip地址了。

     

     

      至此,我们是用临时节点的这个特性,来监控程序有没有运行的。并不是说临时节点就是来只做这个事!!!

    package zhouls.bigdata.zkDemo;
    
    import java.net.InetAddress;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void testName() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/" + ip);//指定节点名称
            while (true) {
                        ;
            }
            
    
            //或者
            
    //        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
    //                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
    //                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
    //                .forPath("/monitor/ + ip");// 指定节点名称
    //        while (true) {
    //            ;
    //        }
    
        }
    
    }

      可以将这个,写入到入口类或构造函数里。每次开始前都调用执行。以此来监控程序是否还在运行,非常重要!

    二、zookeeper编程入门系列之zookeeper实现分布式进程监控

     

       思路: 即在/下,先注册一个监视器,即monitor节点(为永久节点)

              然后,监视monitor节点下面的所有子节点(为临时节点)

       概念见

    Zookeeper概念学习系列之zookeeper实现分布式进程监控

      先执行

       然后执行

      ZkNodeWacter.java

    package zhouls.bigdata.zkDemo;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    /**
     * 这个监视器需要一直在后台运行,所以相当于是一个死循环的进程
     * @author zhouls
     *
     */
    public class ZkNodeWacter implements Watcher {
        CuratorFramework client;
        List<String> childrenList = new ArrayList<String>();
        public ZkNodeWacter() {
            //在启动监视器的时候,链接到zk
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;
            int connectionTimeoutMs = 3000;
            client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
            
            
            //监视monitor节点下面的所有子节点(为临时节点)
            try {
                //在monitor目录上注册一个监视器,这个监视器只能使用一次
                childrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
            } catch (Exception e) {
                e.printStackTrace();
            }
            
            
        }
        
        
        /**
         * 当monitor节点下面的子节点发生变化的时候,这个方法会被调用到
         */
        public void process(WatchedEvent event) {
            System.out.println("我被调用了:"+event);
            
            try {
                //重复注册监视器
                List<String> newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
                //先遍历原始的子节点list
                for (String ip : childrenList) {
                    if(!newChildrenList.contains(ip)){
                        System.out.println("节点消失:"+ip);
                    }
                }
                
                for (String ip : newChildrenList) {
                    if(!childrenList.contains(ip)){
                        System.out.println("新增节点:"+ip);
                    }
                }
                childrenList = newChildrenList;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        
        public static void main(String[] args) {
            ZkNodeWacter spiderWacter = new ZkNodeWacter();
            spiderWacter.start();//表示需要开启一个监视器
        }
    
    
        private void start() {
            while(true){
                ;
            }
        }
    
    }

      TestCurator.java

    package zhouls.bigdata.zkDemo;
    
    import java.net.InetAddress;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void testName() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/" + ip);//指定节点名称
            while (true) {
                        ;
            }
            
    
            
            
    //        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
    //                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
    //                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
    //                .forPath("/monitor/ + ip");// 指定节点名称
    //        while (true) {
    //            ;
    //        }
    
        }
    
    }

    三、zookeeper编程入门系列之zookeeper实现分布式共享锁

       这里,一般,都是创建临时有序子节点,怎么来创建,不难

       说到协调,我首先想到的是北京很多十字路口的交通协管,他们手握着小红旗,指挥车辆和行人是不是可以通行。如果我们把车辆和行人比喻成运行在计算机中的单元(线程),那么这个协管是干什么的?很多人都会想到,这不就是锁么?对,在一个并发的环境里,我们为了避免多个运行单元对共享数据同时进行修改,造成数据损坏的情况出现,我们就必须依赖像锁这样的协调机制,让有的线程可以先操作这些资源,然后其他线程等待。对于进程内的锁来讲,我们使用的各种语言平台都已经给我们准备很多种选择。

     

      

       TestCurator.java

    package zhouls.bigdata.zkDemo;
    
    import java.net.InetAddress;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void test1() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/" + ip);//指定节点名称
            while (true) {
                ;
            }
            
        }
    
        
        
    
            @Test
            public void test2() throws Exception {
                // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                String connectString = "master:2181,slave1:2181,slave2:2181";
                int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
                int connectionTimeoutMs = 3000;// 获取链接的超时时间
                CuratorFramework client = CuratorFrameworkFactory.newClient(
                        connectString, sessionTimeoutMs, connectionTimeoutMs,
                        retryPolicy);
                client.start();// 开启客户端
    
                InetAddress localhost = InetAddress.getLocalHost();
                String ip = localhost.getHostAddress();
                
                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/");// 指定节点名称
                while (true) {
                    ;
                }
    
        }
    
    }

      DistributedLock.java

    package zhouls.bigdata.zkDemo;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
          DistributedLock lock = null;
        try {
            lock = new DistributedLock("127.0.0.1:2181","test");
            lock.lock();
            //do something...
        } catch (Exception e) {
            e.printStackTrace();
        } 
        finally {
            if(lock != null)
                lock.unlock();
        }
        
        //lock.closeZk();//在cleanup方法中添加
     *
     */
    public class DistributedLock implements Lock, Watcher{
        private ZooKeeper zk;
        private String root = "/locks";//
        private String lockName;//竞争资源的标志
        private String waitNode;//等待前一个锁
        private String myZnode;//当前锁
        private CountDownLatch latch;//计数器
        private int sessionTimeout = 30000;//30秒
        private int waitTimeout = 30000;//等待节点失效最大时间 30秒
        private List<Exception> exception = new ArrayList<Exception>();
        
        /**
         * 创建分布式锁,使用前请确认zkConnString配置的zookeeper服务可用
         * @param zkConnString 127.0.0.1:2181
         * @param lockName 竞争资源标志,lockName中不能包含单词lock
         */
        public DistributedLock(String zkConnString, String lockName){
            this.lockName = lockName;
            // 创建一个与服务器的连接
             try {
                zk = new ZooKeeper(zkConnString, sessionTimeout, this);
                Stat stat = zk.exists(root, false);
                if(stat == null){
                    // 创建根节点
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
                }
            } catch (IOException e) {
                exception.add(e);
            } catch (KeeperException e) {
                exception.add(e);
            } catch (InterruptedException e) {
                exception.add(e);
            }
        }
    
        /**
         * zookeeper节点的监视器
         */
        public void process(WatchedEvent event) {
            if(this.latch != null) {  
                this.latch.countDown();  
            }
        }
        /**
         * 获取锁
         */
        public void lock() {
            if(exception.size() > 0){
                throw new LockException(exception.get(0));
            }
            try {
                if(this.tryLock()){
                    System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                    return;
                }
                else{
                    waitForLock(waitNode, waitTimeout);//等待获取锁
                }
            } catch (KeeperException e) {
                throw new LockException(e);
            } catch (InterruptedException e) {
                throw new LockException(e);
            } 
        }
    
        /**
         * 尝试获取锁
         */
        public boolean tryLock() {
            try {
                String splitStr = "_lock_";
                if(lockName.contains(splitStr))
                    throw new LockException("lockName can not contains \u000B");
                //创建临时有序子节点
                myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                System.err.println(myZnode + " is created ");
                //取出所有子节点
                List<String> subNodes = zk.getChildren(root, false);
                //取出所有lockName的锁
                List<String> lockObjNodes = new ArrayList<String>();
                for (String node : subNodes) {
                    String _node = node.split(splitStr)[0];
                    if(_node.equals(lockName)){
                        lockObjNodes.add(node);
                    }
                }
                //对所有节点进行默认排序,从小到大
                Collections.sort(lockObjNodes);
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                    //如果是最小的节点,则表示取得锁
                    return true;
                }
                //如果不是最小的节点,找到比自己小1的节点
                String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
                //获取比当前节点小一级的节点(Collections.binarySearch(lockObjNodes, subMyZnode):获取当前节点的角标)
                waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
            } catch (KeeperException e) {
                throw new LockException(e);
            } catch (InterruptedException e) {
                throw new LockException(e);
            }
            return false;
        }
        
        public boolean tryLock(long time, TimeUnit unit) {
            try {
                if(this.tryLock()){
                    return true;
                }
                return waitForLock(waitNode,time);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * 等待获取锁
         * @param lower :等待的锁
         * @param waitTime 最大等待时间
         * @return
         * @throws InterruptedException
         * @throws KeeperException
         */
        private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
            Stat stat = zk.exists(root + "/" + lower,true);
            //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if(stat != null){
                System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
                this.latch = new CountDownLatch(1);
                this.latch.await(waitTime, TimeUnit.MILLISECONDS);
                this.latch = null;
            }
            return true;
        }
    
    
        /**
         * 取消锁监控
         */
        public void unlock() {
            try {
                System.out.println(Thread.currentThread().getId()+",unlock " + myZnode);
                zk.delete(myZnode,-1);
                myZnode = null;
                //zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 关闭zk链接
         */
        public void closeZk(){
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void lockInterruptibly() throws InterruptedException {
            this.lock();
        }
    
        public Condition newCondition() {
            return null;
        }
        /**
         * 自定义异常信息
         * @author lenovo
         *
         */
        public class LockException extends RuntimeException {
            private static final long serialVersionUID = 1L;
            public LockException(String e){
                super(e);
            }
            public LockException(Exception e){
                super(e);
            }
        }
    }

       如有两个线程, 两个线程要同时到mysql中更新一条数据, 对数据库中的数据进行累加更新。由于在分布式环境下, 这两个线程可能存在于不同的机器上的不同jvm进程中, 所以这两个线程的关系就是垮主机跨进程, 使用java中的synchronized锁是搞不定的。

      概念,见

    Zookeeper概念学习系列之zookeeper实现分布式共享锁

      这里的节点也可以为lock。

      先执行以下的test3,再执行test4

     

     

     

    [zk: localhost:2181(CONNECTED) 9] ls /       
    [monitor, hbase, zookeeper, admin, lock, consumers, config, storm, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 10] ls /lock
    [169.254.28.160]
    [zk: localhost:2181(CONNECTED) 11] 

      然后,再执行test4

     

     

     

       然后,再执行下test4,试试,看看有什么变化

     

      可以看到,在增加。

     

      总的代码是TestCurator.java

    package zhouls.bigdata.zkDemo;
    
    import java.net.InetAddress;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.junit.Test;
    
    /**
     * 
     * @author zhouls
     *
     */
    public class TestCurator {
    
        @Test
        public void test1() throws Exception {
            // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
            int connectionTimeoutMs = 3000;// 获取链接的超时时间
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 开启客户端
    
            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/" + ip);//指定节点名称
            while (true) {
                ;
            }
            
        }
    
        
        
    
            @Test
            public void test2() throws Exception {
                // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                String connectString = "master:2181,slave1:2181,slave2:2181";
                int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
                int connectionTimeoutMs = 3000;// 获取链接的超时时间
                CuratorFramework client = CuratorFrameworkFactory.newClient(
                        connectString, sessionTimeoutMs, connectionTimeoutMs,
                        retryPolicy);
                client.start();// 开启客户端
    
                InetAddress localhost = InetAddress.getLocalHost();
                String ip = localhost.getHostAddress();
                
                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/monitor/");// 指定节点名称
                while (true) {
                    ;
                }
    
        }
            
            
            
            @Test
            public void test3() throws Exception {
                // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                String connectString = "master:2181,slave1:2181,slave2:2181";
                int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
                int connectionTimeoutMs = 3000;// 获取链接的超时时间
                CuratorFramework client = CuratorFrameworkFactory.newClient(
                        connectString, sessionTimeoutMs, connectionTimeoutMs,
                        retryPolicy);
                client.start();// 开启客户端
    
                InetAddress localhost = InetAddress.getLocalHost();
                String ip = localhost.getHostAddress();
                
                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                    .withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点
                    .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                    .forPath("/lock/" + ip);//指定节点名称
                while (true) {
                    ;
                }
                
            }
            
            
            
            
            @Test
            public void test4() throws Exception {
                // 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                String connectString = "master:2181,slave1:2181,slave2:2181";
                int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失
                int connectionTimeoutMs = 3000;// 获取链接的超时时间
                CuratorFramework client = CuratorFrameworkFactory.newClient(
                        connectString, sessionTimeoutMs, connectionTimeoutMs,
                        retryPolicy);
                client.start();// 开启客户端
    
                InetAddress localhost = InetAddress.getLocalHost();
                String ip = localhost.getHostAddress();
                
                client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
                .forPath("/lock/");// 指定节点名称
                while (true) {
                    ;
                }
    
        }
            
            
            
    
    }

      DistributedLock.java

    package zhouls.bigdata.zkDemo;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
          DistributedLock lock = null;
        try {
            lock = new DistributedLock("127.0.0.1:2181","test");
            lock.lock();
            //do something...
        } catch (Exception e) {
            e.printStackTrace();
        } 
        finally {
            if(lock != null)
                lock.unlock();
        }
        
        //lock.closeZk();//在cleanup方法中添加
     *
     */
    public class DistributedLock implements Lock, Watcher{
        private ZooKeeper zk;
        private String root = "/locks";//
        private String lockName;//竞争资源的标志
        private String waitNode;//等待前一个锁
        private String myZnode;//当前锁
        private CountDownLatch latch;//计数器
        private int sessionTimeout = 30000;//30秒
        private int waitTimeout = 30000;//等待节点失效最大时间 30秒
        private List<Exception> exception = new ArrayList<Exception>();
        
        /**
         * 创建分布式锁,使用前请确认zkConnString配置的zookeeper服务可用
         * @param zkConnString 127.0.0.1:2181
         * @param lockName 竞争资源标志,lockName中不能包含单词lock
         */
        public DistributedLock(String zkConnString, String lockName){
            this.lockName = lockName;
            // 创建一个与服务器的连接
             try {
                zk = new ZooKeeper(zkConnString, sessionTimeout, this);
                Stat stat = zk.exists(root, false);
                if(stat == null){
                    // 创建根节点
                    zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
                }
            } catch (IOException e) {
                exception.add(e);
            } catch (KeeperException e) {
                exception.add(e);
            } catch (InterruptedException e) {
                exception.add(e);
            }
        }
    
        /**
         * zookeeper节点的监视器
         */
        public void process(WatchedEvent event) {
            if(this.latch != null) {  
                this.latch.countDown();  
            }
        }
        /**
         * 获取锁
         */
        public void lock() {
            if(exception.size() > 0){
                throw new LockException(exception.get(0));
            }
            try {
                if(this.tryLock()){
                    System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                    return;
                }
                else{
                    waitForLock(waitNode, waitTimeout);//等待获取锁
                }
            } catch (KeeperException e) {
                throw new LockException(e);
            } catch (InterruptedException e) {
                throw new LockException(e);
            } 
        }
    
        /**
         * 尝试获取锁
         */
        public boolean tryLock() {
            try {
                String splitStr = "_lock_";
                if(lockName.contains(splitStr))
                    throw new LockException("lockName can not contains \u000B");
                //创建临时有序子节点
                myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                System.err.println(myZnode + " is created ");
                //取出所有子节点
                List<String> subNodes = zk.getChildren(root, false);
                //取出所有lockName的锁
                List<String> lockObjNodes = new ArrayList<String>();
                for (String node : subNodes) {
                    String _node = node.split(splitStr)[0];
                    if(_node.equals(lockName)){
                        lockObjNodes.add(node);
                    }
                }
                //对所有节点进行默认排序,从小到大
                Collections.sort(lockObjNodes);
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                    //如果是最小的节点,则表示取得锁
                    return true;
                }
                //如果不是最小的节点,找到比自己小1的节点
                String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
                //获取比当前节点小一级的节点(Collections.binarySearch(lockObjNodes, subMyZnode):获取当前节点的角标)
                waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
            } catch (KeeperException e) {
                throw new LockException(e);
            } catch (InterruptedException e) {
                throw new LockException(e);
            }
            return false;
        }
        
        public boolean tryLock(long time, TimeUnit unit) {
            try {
                if(this.tryLock()){
                    return true;
                }
                return waitForLock(waitNode,time);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * 等待获取锁
         * @param lower :等待的锁
         * @param waitTime 最大等待时间
         * @return
         * @throws InterruptedException
         * @throws KeeperException
         */
        private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
            Stat stat = zk.exists(root + "/" + lower,true);
            //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
            if(stat != null){
                System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
                this.latch = new CountDownLatch(1);
                this.latch.await(waitTime, TimeUnit.MILLISECONDS);
                this.latch = null;
            }
            return true;
        }
    
    
        /**
         * 取消锁监控
         */
        public void unlock() {
            try {
                System.out.println(Thread.currentThread().getId()+",unlock " + myZnode);
                zk.delete(myZnode,-1);
                myZnode = null;
                //zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
        
        /**
         * 关闭zk链接
         */
        public void closeZk(){
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void lockInterruptibly() throws InterruptedException {
            this.lock();
        }
    
        public Condition newCondition() {
            return null;
        }
        /**
         * 自定义异常信息
         * @author lenovo
         *
         */
        public class LockException extends RuntimeException {
            private static final long serialVersionUID = 1L;
            public LockException(String e){
                super(e);
            }
            public LockException(Exception e){
                super(e);
            }
        }
    }

      这个代码里,大家可以改为自己的集群,如我的是master:2181,slave1:2181,slave2:2181

  • 相关阅读:
    BluetoothGetRadioInfo 函数
    BluetoothFindRadioClose 函数
    BluetoothFindNextRadio 函数
    BluetoothFindFirstRadio 函数
    BLUETOOTH_DEVICE_INFO 函数
    BluetoothSetServiceState 函数
    Bluetooth Functions
    union联合体学习
    置顶
    Toppo: 1
  • 原文地址:https://www.cnblogs.com/zlslch/p/7242381.html
Copyright © 2011-2022 走看看