zoukankan      html  css  js  c++  java
  • 分布式系列七: zookeeper简单用法

    zookeeper是分布式开源框架, 是Google Chubby的一个实现, 主要作为分布式系统的协调服务. Dobbo等框架使用了其功能.

    zookeeper特性

    • 顺序一致性: 事务请求最终会严格按顺序执行
    • 原子性:
    • 可靠性:
    • 实时性:
    • 单一视图:

    安装

    使用windows的linux子系统时: cd /mnt/e/chromedownload/转到windows下载路径
    拷贝 cp /mnt/e/chromedownload/zookeeper.tar.gz /program/zookeeper.tar.gz
    转到 cd /program 如果没有的先mkdir program
    解压 tar -zxvf zookeeper.tar.gz
    转到 cd ZK_HOME/conf
    拷贝 cp zoo_sample.cfg zoo.cfg

    转到 cd ZK_HOME/bin
    启动 sh zkServer.sh start

    集群搭建

    • zoo.cfg中配置集群, 配置格式如 server.id=ip:port:port ; 有几台就配置几个.例如:
      server.1=192.168.1.145:2897:3181
      server.2=192.168.1.146:2897:3181
      server.3=192.168.1.147:2897:3181
    • zoo.cfg中的dataDir所配置的目录下新增myid文件, 值对应server.id的id
    • zoo.cfg中, 如果需要observer,则添加peerType=observer, 并且修改server.id=ip:port:port:observer

    zoo.cfg 配置

    • tickTime=2000 zk的方法最小时间单位
    • initTime=10 时长为10*tickTime, follow节点和leader节点同步的时间
    • syncLimit=5 时长为5*tickTime, leader和follow几点进行心跳检测的最大延迟时间
    • dataDir=/tmp/zookeeper 存储快照文件的目录
    • dataLogDir = /log/zookeeper 事务日志的存储路径, 默认在dataDir下
    • clientPort=2181 连接zookeeper的默认端口

    zookeeper几个概念

    • znode: zookeeper数据存储为树形结构, 深度层级没有限制, znode是数据存储节点,是zookeeper的最小存储单元. 分为1.持久化节点;2.持久化有序节点;3.临时节点;4.临时有序节点;
    • 临时节点, 是会话时生成的节点, 会话结束后节点会自动删除.

    客户端命令操作

    help 可以查看客户端支持的命令

    • create [-s] [-e] node : 创建节点 -s是有序 -e临时节点
    • get path [watch] 获取节点的信息
    • set path data [version] : 修改节点的值
    • delete path [version] : 删除节点, 当节点有子节点时无法删除

    [version] 乐观锁

    [Watcher] 提供了发布/订阅, 允许客户端向服务器端注册一个监听, 当服务端触发指定事件时会触发watcher,服务端向客户端发送一个通知.
    Watcher是一次性的, 触发一次后自动失效

    信息节点

    • stat path 可以查看节点的信息
    cversion=0 子节点的版本
    AclVersion=0 acl的版本号, 权限控制相关 
    dataVersion=1 数据的版本号
    cxid 创建的事务id
    mzxid 最后一次修改的事务id
    pzxid 子节点最后一次修改的事务id
    ephemeralOwner 临时会话的id
    dataLength 数据长度
    numChidren 子节点数量
    

    java开发

    引用依赖

     <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.4-beta</version>
    </dependency>
    

    java代码

    /**
     * 定义Watcher
     */
    public class MyWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            if(event.getState()== Event.KeeperState.SyncConnected){
                System.out.println("---->>>>>"+event.getType());
                System.out.println("---->>>>>"+event.getPath());
            }
        }
    }
    
    //测试
    public class MySession {
    
        private static final String CONN = "localhost:2181";
        private static Stat stat = new Stat();
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            ZooKeeper zooKeeper = new ZooKeeper(CONN,1000,new MyWatcher());
    
            // 创建
            zooKeeper.create("/xlx","this is a string".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            // 查询,注册watcher
            byte[] rst = zooKeeper.getData("/xlx",true,stat);
            System.out.println(new String(rst));
    
            //删除(只能删除永久节点)
            zooKeeper.delete("/xlx",-1);
    
            // 创建
            zooKeeper.create("/xlx","this is a string".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            // 查询
            byte[] rs = zooKeeper.getData("/xlx",true,stat);
            System.out.println(new String(rs));
    
            // 注册watcher
            zooKeeper.exists("/xlx/yy",true);
    
            // 创建
            zooKeeper.create("/xlx/yy","this is a sub child string".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            // 获取子节点
            List<String> children = zooKeeper.getChildren("/xlx", true);
            System.out.println(children);
    
            //删除(只能删除永久节点)
            zooKeeper.delete("/xlx/yy",-1);
    
            // 修改
            zooKeeper.setData("/xlx","this is a modified string".getBytes(),-1);
            byte[] rss = zooKeeper.getData("/xlx",true,stat);
            System.out.println(new String(rss));
    
            //删除(只能删除永久节点)
            zooKeeper.delete("/xlx",-1);
    
            // watcher 异步的, 这里停留段时间才可以查看到watcher打印的信息
            Thread.sleep(2000);
        }
    }
    

    三方API

    • zkClient
    • curator 这个用的较多,Netflix开源

    curator开发

    特点:

    • 抽象层次更高
    • 链式编程风格
    • 异步回调
    public class CuratorSession {
    
        private static final String CONN = "localhost:2181";
    
        public static void main(String[] args) throws Exception {
            // 创建
            CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONN, 2000, 5000, new ExponentialBackoffRetry(1000,3));
            curatorFramework.start();
            System.out.println(curatorFramework.getState());
            //另一种方式
            //curatorFramework = CuratorFrameworkFactory.builder().build();
    
            // 新增节点
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/curator/chd/dd","dfadfe".getBytes());
    
            //读取
            byte[] data = curatorFramework.getData().forPath("/curator/chd/dd");
            java.lang.String string = new java.lang.String(data);
            System.out.println(string);
    
            // 修改
            Stat stat = curatorFramework.setData().forPath("/curator/chd/dd","fdaefv".getBytes());
            System.out.println(stat);
    
            curatorFramework.setData().inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println(event);
                }
            }).forPath("/curator/chd/dd","fafdae".getBytes());
    
            //删除
            curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/chd");
    
            Thread.sleep(2000);
        }
    }
    

    zookeeper应用场景

    • 分布式锁

      可以使用redis,数据库,zookeeper等实现.
      排他锁(写锁), 共享锁(读锁)

    • 发布/订阅(配置中心)

      配置中心数据的特点: 1. 数据量比较小;2.各台服务器的配置内容一致;3.配置信息在运行时会发生变更;

      配置中心有两种实现方式: pull,push; zookeeper结合这两种方式实现配置中心. 具体为: 客户端向服务器注册自己关注的节点, 当节点发生变化时, 服务器向客户端发送watcher事件通知,再由客户端pull变化的节点数据.

    • 负载均衡

      leader处理请求, 并将请求路由到特定节点. 核心是负载均衡的算法.

    • leader选举

    • 命名服务

    • 分布式队列

    分布式锁实现(java api)

    原理: 当需要锁时, 客户端会向服务端的节点A写有序的子节点, 此时可能有其他的客户端也同时需要获取锁,因此节点A下可能会有若干子节点001,002,003...., 然后客户端获取父节点A的所有子节点,并取得其中最小的节点001, 如果001是当前客户端创建的, 则当前客户端获取到锁;如果001不是当前客户端创建, 比如当前客户端创建了003, 则取003的前一个节点,也就是002添加监控watcher, 当002监控事件为删除时, 创建了003的客户端得到一个通知, 说明前一个节点已经释放, 此时003可以获取到锁.而002节点则监控001, 也就是说每个节点监控其前一个节点, 前一个节点释放, 则表示当前节点可以获取到锁了.

    public class DistributeLock {
    
        private static final String ROOT_LOCKS="/LOCK";
        private ZooKeeper zooKeeper;
        private int sessionTimeout;
        private String lockID;
        private final static byte[] date={1,2};
        private CountDownLatch countDownLatch = new CountDownLatch(1);
    
        public DistributeLock() throws IOException, InterruptedException {
            this.zooKeeper = ZookeeperClient.getZookeeperClient();
            this.sessionTimeout = ZookeeperClient.SESSIONTIMEOUT;
        }
    
        public boolean lock(){
            try{
                lockID = zooKeeper.create(ROOT_LOCKS+"/",date, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(Thread.currentThread().getName()+"->成功创建了lock节点["+lockID+"], 开始去竞争锁");
                List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS,true);
                SortedSet<String> sortedSet = new TreeSet<>();
                for(String children:childrenNodes){
                    sortedSet.add(ROOT_LOCKS+"/"+children);
                }
                String first = sortedSet.first();
                if (first.equals(lockID)){
                    System.out.println(Thread.currentThread().getName()+"->成功获得锁,lock节点为:["+lockID+"]");
                    return true;
                }
    
                SortedSet<String> lessThanLockId = ((TreeSet<String>) sortedSet).headSet(lockID);
                if (!lessThanLockId.isEmpty()){
                    String prevLockID = lessThanLockId.last();
                    zooKeeper.exists(prevLockID,new LockWatcher(countDownLatch));
                    countDownLatch.await(sessionTimeout,TimeUnit.MILLISECONDS);
                    System.out.println(Thread.currentThread().getName()+" 成功获取锁:["+lockID+"]");
                }
            }catch (Exception e){e.printStackTrace();}
            return false;
        }
    
        public boolean unlock(){
            System.out.println(Thread.currentThread().getName()+"->开始释放锁:["+lockID+"]");
            try {
                zooKeeper.delete(lockID,-1);
                System.out.println("节点["+lockID+"]成功被删除");
                return true;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            return false;
        }
    
    
        public static void main(String[] args) {
            final CountDownLatch latch=new CountDownLatch(10);
            Random random=new Random();
            for(int i=0;i<10;i++){
                new Thread(()->{
                    DistributeLock lock=null;
                    try {
                        lock=new DistributeLock();
                        latch.countDown();
                        latch.await();
                        lock.lock();
                        Thread.sleep(random.nextInt(500));
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        if(lock!=null){
                            lock.unlock();
                        }
                    }
                }).start();
            }
        }
    }
    

    这些代码写下来就算入门了, curator真正有用的使用场景还没接触到, 比如分布式锁,leader选举等, curator有示例程序, 可以在github上查看Curator源码.

  • 相关阅读:
    朴英敏: 用crash工具分析Linux内核死锁的一次实战【转】
    ext3,ext4,xfs和btrfs文件系统性能对比【转】
    STM32MP157——Remoteproc和RPMsg【转】
    使用edac工具来检测服务器内存故障.【转】
    面试题-python 什么是生成器(generator)?
    面试题-python 什么是迭代器(Iterator)?
    面试题-python 浅拷贝和深拷贝(copy模块)
    selenium+python自动化104-如何获取隐藏元素text文本
    面试题-websocket 接口如何测试?
    jmeter压测学习47-发soap请求测试webservice接口
  • 原文地址:https://www.cnblogs.com/walkinhalo/p/9740198.html
Copyright © 2011-2022 走看看