zoukankan      html  css  js  c++  java
  • zookeeper学习笔记

    zookeeper安装与基本命令

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    要安装zookeeper首先需要安装jdk,可以去ZooKpper官网下载最新的版本。

    并解压到指定目录配置zoo.cfg , 在conf文件夹下复制一份新的zoo_sample.cfg并重新命名为zoo.cfg。

    安装步骤

    # cd /opt
    # wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz 
    # tar -xzvf zookeeper-3.4.8.tar.gz 
    # mv zookeeper-3.4.8 zookeeper
    # cd zookeeper/
    # cd conf/ 
    # mv zoo_sample.cfg zoo.cfg

    启动zk要到bin目录下,执行zkServer.sh start就可以了。

    停止当然是zkServer.sh stop

    查看状态zkServer.sh status

    链接zk使用zkCli.sh -server 127.0.0.1:2181

    输入help回车后查看帮助信息

    ZooKeeper -server host:port cmd args
        stat path [watch]
        set path data [version]
        ls path [watch]
        delquota [-n|-b] path
        ls2 path [watch]
        setAcl path acl
        setquota -n|-b val path
        history 
        redo cmdno
        printwatches on|off
        delete path [version]
        sync path
        listquota path
        rmr path
        get path [watch]
        create [-s] [-e] path data acl
        addauth scheme auth
        quit 
        getAcl path
        close 
        connect host:port

    创建节点以及值

    create /node1 value1

    查看节点

    使用ls指令查看当前ZK中所包含的内容ls /

    查看节点中的值

    get /node1

    更新节点中的值

    set /node2 value3

    删除节点

    delete /node2

    watch介绍

    watch表示监听事件,比如执行命令ls /node2

    然后我们新开个窗口连上zk,在node2下面新建个子节点,建完后马上之前加了watch的窗口就能收到新建的事件了。

    这种场景适合用在配置变更的时候各个子节点都需要重新加载配置。

    zoo.cfg配置信息详解

    # The number of milliseconds of each tick
    tickTime=2000 ##ZooKeeper的最小时间单元,单位毫秒(ms),默认值为3000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10 ##Leader服务器等待Follower启动并完成数据同步的时间,默认值10,表示tickTime的10倍
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5 ##Leader服务器和Follower之间进行心跳检测的最大延时时间,默认值5,表示tickTime的5倍
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/tmp/zookeeper ##ZooKeeper服务器存储快照文件的目录,必须配值,建议放置在var目录下
    # the port at which the clients will connect
    clientPort=2181 ## 服务器对外服务端口,默认值为2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1

    zookeeper权限控制 ACL操作

    CL全称为Access Control List(访问控制列表),用于控制资源的访问权限。Zookeeper利用ACL策略控制节点的访问权限,如节点数据读写、节点创建、节点删除、读取子节点列表、设置节点权限等。

    Zookeeper的ACL,scheme、id、permission,通常表示为:

    scheme:id:permission。
    • schema代表授权策略
    • id代表用户
    • permission代表权限

    permission分为下面几种:

    • CREATE(c):创建子节点的权限
    • READ(r):读取节点数据的权限
    • DELETE(d):删除节点的权限
    • WRITE(w):修改节点数据的权限
    • ADMIN(a):设置子节点权限的权限

    密码权限控制

    语法:digest:username:BASE64(SHA1(password)):权限信息
    首先我们创建一个节点

    create /test 10

    创建完成之后设置权限

    setAcl /test digest:zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=:crdwa

    然后来验证下权限是否可用

    [zk: localhost:2181(CONNECTED) 2] get /test
    Authentication is not valid : /test

    执行命令会发现没有权限操作

    进行认证操作后再次执行get操作就可以了

    addauth digest zhangsan:123456

    这边需要注意的是密码的生成方式,密码是加密的,不是明文的,我们需要借助于Zookeeper中的一个类来加密密码,如果是代码中操作就直接加密了,在shell中就得单独去加密了。

    java -Djava.ext.dirs=/Users/zhangsan/Documents/java/zookeeper-3.4.7/lib -cp /Users/zhangsan/Documents/java/zookeeper-3.4.7/zookeeper-3.4.7.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider zhangsan:123456

    输出加密内容如下:

    zhangsan:123456->zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=

    IP权限控制

    还有一种就是通过IP来控制节点的访问权限,一般不建议使用,因为IP发生变动的可能性比较大

    语法是:ip:IP信息:权限信息

    创建一个节点

    create /ip2 ip

    设置IP访问权限

    setAcl /ip2 ip:192.168.31.139:r

    Java连接zookeeper

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId
        <version>3.4.7</version>
    </dependency>
    /**
     * 连接ZK测试
     */
    public class ConnTest {
    
        public static void main(String[] args) {
            try {
                // 建立连接
                ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 1000 * 10, null);
                // 创建节点
                zooKeeper.create("/connTest", "connTest".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                byte[] result = zooKeeper.getData("/connTest", false, new Stat());
                System.out.println(new String(result));
                
                //zooKeeper.delete("/connTest", -1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    使用curator客户端更方便操作zookeeper

            <!--<dependency>-->
                <!--<groupId>org.apache.zookeeper</groupId>-->
                <!--<artifactId>zookeeper</artifactId>-->
                <!--<version>3.4.7</version>-->
            <!--</dependency>-->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.4.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.4.0</version>
            </dependency>    
    /**
     * zk客户端
     * 测试监听zk节点变化
     */
    public class CuratorTest {
    
        public static void main(String[] args) throws Exception {
            // 1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            // 2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString("localhost:2181")
                    .sessionTimeoutMs(1000 * 10)
                    .retryPolicy(retryPolicy)
                    .build();
            // 3 开启连接
            cf.start();
            
    //        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/yjh/aa","aa内容".getBytes());
    //        System.out.println(new String(cf.getData().forPath("/yjh/aa")));
    //        cf.setData().forPath("/yjh/aa", "修改aa内容".getBytes());
    //        System.out.println(new String(cf.getData().forPath("/yjh/aa")));
    //        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/yjh");
            
    //        if (cf.checkExists().forPath("/tt") == null) {
    //            cf.create().creatingParentsIfNeeded().forPath("/tt","tt".getBytes());
    //        }
    //        byte[] data = cf.getData().usingWatcher(new Watcher() {  
    //            public void process(WatchedEvent event) {
    //                System.out.println("节点监听器 : " + event.getType().getIntValue() + "	" + event.getPath());  
    //            }  
    //        }).forPath("/tt");
    //        System.out.println(new String(data));
            
             ExecutorService pool = Executors.newFixedThreadPool(2);
                
             final NodeCache nodeCache = new NodeCache(cf, "/test", false);
             nodeCache.start(true);
             nodeCache.getListenable().addListener(
                new NodeCacheListener() {
                    public void nodeChanged() throws Exception {
                        System.out.println(nodeCache.getCurrentData().getPath() + "数据改变了, 新的数据是: " +
                            new String(nodeCache.getCurrentData().getData()));
                    }
                }, 
                pool
                );
            Thread.sleep(Integer.MAX_VALUE);
        }
    
    }

    zookeeper分布式锁

    /**
     * zk分布式锁
     */
    public class LockTest {
        
        static int count = 2;
    
        public static void main(String[] args) {
            // 1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            // 2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString("localhost:2181")
                    .sessionTimeoutMs(1000 * 10)
                    .retryPolicy(retryPolicy)
                    .build();
            // 3 开启连接
            cf.start();
            final InterProcessMutex lock = new InterProcessMutex(cf, "/mylock");
            final CountDownLatch latch = new CountDownLatch(1);
            ExecutorService pool = Executors.newFixedThreadPool(20);
            for (int i = 0; i < 20; i++) {
                pool.execute(() -> {
                    try {
                        System.err.println(1);
                        latch.await();
                        lock.acquire();
                        Thread.sleep(100);
                        //synchronized (LockTest.class) {
                            if (count > 0) {
                                count--;
                                lock.release();
                                System.out.println(count);
                            }
                            
                        //}
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            System.out.println("开始执行");
            latch.countDown();
            pool.shutdown();
        }
    
    }

    zookeeper作为服务注册中心

    首先编写一个服务注册中心类,监听服务的注册与停止

    /**
     * 测试服务注册中心
     * 监听服务注册与停止
     */
    public class ServiceClient {
    
        public static void main(String[] args) {
            // client.get("http://localhost/get");
            // client.get("http://localhost/get2");
            // client.get("nginx地址"); --->   get./get2
            
            // 从zk中获取服务地址列表,选择一个进行请求,本地执行负载均衡
            
            // 1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            // 2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString("localhost:2181")
                    .sessionTimeoutMs(1000 * 10)
                    .retryPolicy(retryPolicy)
                    .build();
            // 3 开启连接
            cf.start();
    
            // 开始监听
            try {
                final PathChildrenCache childrenCache = new PathChildrenCache(cf, "/service", true);
                childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
                childrenCache.getListenable().addListener(
                    new PathChildrenCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
                                throws Exception {
                                switch (event.getType()) {
                                case CHILD_ADDED:
                                    System.out.println("CHILD_ADDED: " + event.getData().getPath());
                                    break;
                                case CHILD_REMOVED:
                                    System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                                    break;
                                case CHILD_UPDATED:
                                    System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                                    break;
                                default:
                                    break;
                            }
                        }
                    }
                );
                List<String> urls = cf.getChildren().forPath("/service");
                for (String url : urls) {
                    System.out.println(url);
                }
                Thread.sleep(200000000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    另外编写两个服务类,用于模拟两个服务,分别往zookeeper创建节点,相当于注册服务,由上面的服务注册中心进行监听

    用户服务:

    /**
     * 测试服务注册中心
     * 模拟用户服务
     */
    public class UserServiceApplication {
    
        public static void main(String[] args) {
            // 1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            // 2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString("localhost:2181")
                    .sessionTimeoutMs(1000 * 10)
                    .retryPolicy(retryPolicy)
                    .build();
            // 3 开启连接
            cf.start();
            try {
                cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.1", "".getBytes());
                Thread.sleep(200000000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    订单服务:

    /**
     * 测试服务注册中心
     * 模拟订单服务
     */
    public class OrderServiceApplication {
    
        public static void main(String[] args) {
            // 1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            // 2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString("localhost:2181")
                    .sessionTimeoutMs(1000 * 10)
                    .retryPolicy(retryPolicy)
                    .build();
            // 3 开启连接
            cf.start();
            try {
                cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.2", "".getBytes());
                Thread.sleep(200000000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    然后分别启动上面这三个类,服务注册中心就可以监听用户服务和订单服务的注册和停止了。

    ps:编写一个简单的zookeeper操作工具类

    /**
     * Created by 唐哲
     * 2018-06-24 16:26
     * zk操作工具类
     */
    public class ZkUtils {
    
        private static final Integer ZK_SESSION_TIMEOUT = 10 * 1000;
        private static CountDownLatch latch = new CountDownLatch(1);
        private static ZkUtils instance = new ZkUtils();
        private static ZooKeeper zk;
    
        public synchronized static ZkUtils getInstance(String host, int port) {
            if (zk == null) {
                connect(host, port);
            }
            return instance;
        }
    
        private static void connect(String host, int port) {
            String connectString = host + ":" + port;
            try {
                zk = new ZooKeeper(connectString, ZK_SESSION_TIMEOUT, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println("已经触发了" + event.getType() + "事件!");
                        // 判断是否已连接ZK, 连接后计数器递减
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown();
                        }
                    }
                });
                // 若计数器不为0, 则等待
                latch.await();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public String addNode(String nodeName) {
            Stat stat;
            try {
                stat = zk.exists(nodeName, false);
                if (stat == null) {
                    return zk.create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public String addNode(String nodeName, String data) {
            Stat stat;
            try {
                stat = zk.exists(nodeName, false);
                if (stat == null) {
                    return zk.create(nodeName, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public String addNode(String nodeName, String data, List<ACL> acl, CreateMode createMode) {
            Stat stat;
            try {
                stat = zk.exists(nodeName, false);
                if (stat == null) {
                    return zk.create(nodeName, data.getBytes(), acl, createMode);
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public void removeNode(String nodeName) {
            try {
                zk.delete(nodeName, -1);
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    
        public void removeNode(String nodeName, int version) {
            try {
                zk.delete(nodeName, version);
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    
        public String setData(String nodeName, String data) {
            Stat stat;
            try {
                stat = zk.exists(nodeName, false);
                if (stat != null) {
                    zk.setData(nodeName, data.getBytes(), -1);
                    return data;
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 监控数据节点变化
         */
        public void monitorDataUpdate(String nodeName) {
            try {
                zk.getData(nodeName, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        // 节点的值有修改
                        if(event.getType() == EventType.NodeDataChanged) {
                            System.out.println(nodeName + "修改了值" + event.getPath());
                            // 触发一次就失效,所以需要递归注册
                            monitorDataUpdate(nodeName);
                        }
                    }
                }, new Stat());
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            ZkUtils zkUtils = ZkUtils.getInstance("localhost", 2181);
            //zkUtils.removeNode("/test");
            
    //        String result = zkUtils.addNode("/test");
    //        System.out.println(result);
    //
    //        result = zkUtils.addNode("/test", "10");
    //        System.out.println(result);
            String result = zkUtils.setData("/test", "hello");
            System.out.println(result);
    
            zkUtils.monitorDataUpdate("/test");
    
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatch.await();
        }
    
    }
  • 相关阅读:
    ABAP语法篇1 DATA新用法
    SAP RFC和BAPI
    SAP标准屏幕中字段描述增强
    HoloLens开发手记 - 使用Windows设备控制台 Using Windows Device Portal
    HoloLens开发手记
    HoloLens开发手记
    HoloLens开发手记
    HoloLens开发手记
    HoloLens开发手记
    HoloLens开发手记
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9229688.html
Copyright © 2011-2022 走看看