zoukankan      html  css  js  c++  java
  • curator框架的使用以及实现分布式锁等应用与zkclient操作zookeeper,简化复杂原生API

    打开zookeeper集群

    先体会一下原生API有多麻烦(可略过):

    //地址
        static final String ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181";
        //session超时时间
        static final int SESSION_OUTTIME = 2000;//ms
        /**
         * 信号量,阻塞程序执行,用于等待zookeeper连接(异步的)成功,发送成功信号
         */
        static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
        public static void main(String[] args) throws Exception {
            //创建zk客户端(异步连接)   Watcher:监听连接
            ZooKeeper zooKeeper = new ZooKeeper(ADDR, SESSION_OUTTIME, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //获取事件的状态
                    Event.KeeperState keeperState = watchedEvent.getState();
                    Event.EventType eventType = watchedEvent.getType();
                    //如果是建立连接
                    if(Event.KeeperState.SyncConnected == keeperState){
                        if(Event.EventType.None == eventType){
                            //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                            connectedSemaphore.countDown();
                            System.out.println("zk 建立连接");
                        }
                    }
                }
            });
            //进行阻塞,等待zk连接成功
            connectedSemaphore.await();
            //连接成功执行操作:
            //同步     路径(不允许递归创建), 数据  权限, 类型(PERSISTENT:持久) :临时节点只是一次session有效,用来实现分布式锁
            String name = zooKeeper.create("/testRoot", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            String name1 = zooKeeper.create("/testRoot/child02", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            //异步   回调函数   , 回调的参数
            zooKeeper.create("/testRoot/child01", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
                    new AsyncCallback.StringCallback() {
                        @Override   //服务端响应吗: 0 成功   传入的path   参数  实际创建的path
                        public void processResult(int i, String s, Object o, String s1) {
                            System.out.println(i+" "+s+" "+o.toString()+" "+s1); }},"param");
            //删除 (路径必须是叶子节点,即没有下一级节点,不支持递归)
            //zooKeeper.delete("/testRoot/child01", -1);//版本号:-1表示删除所有的,像git一样,修改一次版本号加一,可删除指定版本号
            //同样支持异步,和创建一样传入回调
            //判断节点是否存在
            System.out.println(zooKeeper.exists("/testRoot/child01", false));//也可传入watch,异步回调等
            //获取
            byte[] data = zooKeeper.getData("/testRoot", false, null);
    		System.out.println(new String(data));
    		System.out.println(zooKeeper.getChildren("/testRoot", false));
            //修改
            zooKeeper.setData("/testRoot", "modify data root".getBytes(), -1);
    		byte[] data1 = zooKeeper.getData("/testRoot", false, null);
    		System.out.println(new String(data1));
    
    
            Thread.sleep(3000);//休眠防止异步
            zooKeeper.close();//释放资源
    

      

    zkclient:

      /** zookeeper地址 */
        static final String CONNECT_ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181";
        /** session超时时间 */
        static final int SESSION_OUTTIME = 5000;//ms
    
        public static void main(String[] args) {
            // 创建zkclient实例
            ZkClient zkc=new ZkClient(new ZkConnection(CONNECT_ADDR,SESSION_OUTTIME),10000);
            //增
            zkc.create("/test","测试zkclient",CreateMode.PERSISTENT);// 创建节点,可指定持久或者临时
            zkc.create("/test/heyi","heyi",CreateMode.PERSISTENT);// 创建节点,可指定持久或者临时
            zkc.createEphemeral("/haha","haha");//创建临时节点
            zkc.createPersistent("/test/yaozhen","姚振");//创建持久节点,设置值必须有父节点
            zkc.createPersistent("/320/houzheng",true);//支持递归创建,但是不能设置值,否则会报异常
            //删
            zkc.delete("/test/yaozhen");
            zkc.deleteRecursive("/320");//递归删除,会删除节点下的子节点
            //改
            zkc.writeData("/test","新内容");
            //查
            boolean exists = zkc.exists("/test");
            System.out.println(exists);
            zkc.readData("/jim",true);//如果节点为空返回null
            String readData = zkc.readData("/test");
            System.out.println(readData);
            //获取子节点和阅读子节点数据
            List<String> list = zkc.getChildren("/test");
            for(String p : list){
                System.out.println(p);
                String rp = "/test/" + p;
                String data = zkc.readData(rp);
                System.out.println("节点为:" + rp + ",内容为: " + data);
            }
            int i = zkc.countChildren("/test");//查询子节点数量
            System.out.println(i);
    

      相比之下zkclient简直太简单了!

    watcher也特别简单:

    子节点监听:

    // 创建zkclient实例
            ZkClient zkClient=new ZkClient(new ZkConnection(CONNECT_ADDR,SESSION_OUTTIME),10000);
            System.out.println("-----------");
            //监听子节点变化(不监听数据变化,只监听节点的新增删除)  IZkChildListener实现类
            zkClient.subscribeChildChanges("/father",(x,y)->{//x:父路径, y:所有子节点路径集合
                System.out.println("parentPath: " + x);
                System.out.println("currentChilds: " + y);
            });
            Thread.sleep(2000);
            zkClient.createPersistent("/father");//父节点添加删除也会监听
            Thread.sleep(1000);
            zkClient.createPersistent("/father/son","儿子");
            Thread.sleep(1000);
            zkClient.createPersistent("/father/daughter","女儿");
            Thread.sleep(1000);
            zkClient.delete("/father/son");
            Thread.sleep(1000);
            zkClient.deleteRecursive("/father");
    

      

    节点数据监听:

            //监听节点数据变化,监听之前节点必须存在,不监听子节点
            zkClient.createPersistent("/father", "1234");
            zkClient.subscribeDataChanges("/father", new IZkDataListener() {
                @Override   //节点数据变化
                public void handleDataChange(String s, Object o) throws Exception {
                    System.out.println("变更的节点为:" + s + ", 变更内容为:" + o);
                }
                @Override  //节点删除
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println("删除的节点为:" + s);
                }
            });
            Thread.sleep(3000);
            zkClient.writeData("/father", "father");//-1:最新版本的数据
            Thread.sleep(1000);
            zkClient.delete("/father");//节点删除野人不监听
            Thread.sleep(1000);
    

      

    状态监听:

            //监听服务连接状态,可手动启动关闭zookeeper查看触发
            zkClient.subscribeStateChanges(new IZkStateListener() {
                @Override
                public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                    if(state==Watcher.Event.KeeperState.SyncConnected){
                        System.out.println("连接zookeeper成功");
                    }else if(state==Watcher.Event.KeeperState.Disconnected){
                        System.out.println("zookeeper断开");
                    }else
                        System.out.println("other"+state);
                }
                @Override //连接关闭,过了session的设置时间,再连接session就会重置,触发监听
                public void handleNewSession() throws Exception {
                    System.out.println("newsession");
                }
            });
            Thread.sleep(Integer.MAX_VALUE);//用不关闭线程,一直监听
    

      

    curator框架:

     基本增删改查:

    //重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //通过工厂建立连接
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)//重试策略
                    .build();
            curatorFramework.start();//一定要开启连接
            //增  creatingParentsIfNeeded:同时创建父节点  withMode:指定节点类型(不加withMode默认为持久类型节点)
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                    .forPath("/super/son","儿子".getBytes());//路径、数据内容
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                    .forPath("/father/son","儿子".getBytes());//路径、数据内容
            //删
            curatorFramework.delete().deletingChildrenIfNeeded().forPath("/super");//递归删除子节点
            //改
            curatorFramework.setData().forPath("/father/son","新儿子".getBytes());
            //查
            byte[] bytes = curatorFramework.getData().forPath("/father/son");
            System.out.println(bytes.toString());
            //查找子节点
            curatorFramework.getChildren().forPath("/father").forEach(x-> System.out.println(x));
            //判断节点是否存在
            Stat stat = curatorFramework.checkExists().forPath("/super");
            System.out.println(stat);//不存在,返回bull
            //使用inBackground 绑定回调函数
            ExecutorService pool = Executors.newCachedThreadPool();
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                    .inBackground((cf,curatorEvent)->{//BackgroundCallback实现类
                        System.out.println("状态码"+curatorEvent.getResultCode());//0表示成功
                        System.out.println("type:"+curatorEvent.getType());//CREATE
                    },pool).forPath("/320/yaozhen","姚振".getBytes());
            //使用线程池做回调
            Thread.sleep(Integer.MAX_VALUE);//等待,方便查看打印输出
    

      

    监听:

    //重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //通过工厂建立连接
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)//重试策略
                    .build();
            curatorFramework.start();
            //建立一个cache缓存,监听节点
            final NodeCache nodeCache = new NodeCache(curatorFramework,"/father");
            nodeCache.start(true);//第一次启动的时候就会立刻在Zookeeper上读取对应节点的数据内容,并保存在Cache中
            //触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作
            nodeCache.getListenable().addListener(()->{ //NodeCacheListener实现类
                System.out.println("路径为:" + nodeCache.getCurrentData().getPath());
                System.out.println("数据为:" + new String(nodeCache.getCurrentData().getData()));
                System.out.println("状态为:" + nodeCache.getCurrentData().getStat());
                System.out.println("---------------------------------------");
            });
    
            Thread.sleep(1000);
            curatorFramework.create().forPath("/father", "123".getBytes());
    
            Thread.sleep(1000);
            curatorFramework.setData().forPath("/father", "456".getBytes());
    
            Thread.sleep(1000);
            curatorFramework.delete().forPath("/father");
    
            Thread.sleep(Integer.MAX_VALUE);
    

      监听节点

    //重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //通过工厂建立连接
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)//重试策略
                    .build();
            curatorFramework.start();
            //监听子节点:建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
            PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/father", true);
            //在初始化的时候就进行缓存监听
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            //监听子节点的新增,修改,删除
            cache.getListenable().addListener((cf,event)-> {//PathChildrenCacheListener实现类
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("添加子节点 :" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("更新子节点 :" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("删除子节点 :" + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            });
            //创建本身节点不发生变化
            curatorFramework.create().forPath("/father", "init".getBytes());
    
            //添加子节点
            Thread.sleep(1000);
            curatorFramework.create().forPath("/father/c1", "c1内容".getBytes());
            Thread.sleep(1000);
            curatorFramework.create().forPath("/father/c2", "c2内容".getBytes());
    
            //修改子节点
            Thread.sleep(1000);
            curatorFramework.setData().forPath("/father/c1", "c1更新内容".getBytes());
    
            //删除子节点
            Thread.sleep(1000);
            curatorFramework.delete().forPath("/father/c2");
    
            //删除本身节点
            Thread.sleep(1000);
            curatorFramework.delete().deletingChildrenIfNeeded().forPath("/father");
    
            Thread.sleep(Integer.MAX_VALUE);
    

      监听子节点

    分布式锁:

            //重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //通过工厂建立连接
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)//重试策略
                    .build();
            curatorFramework.start();
            //使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的
            final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test");
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    try {
                        countDownLatch.await();//线程等待一起执行
                        lock.acquire();//分布式锁,数据同步
                        //处理业务
                        j--;
                        System.out.println(j);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {//释放锁
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                },"t"+i).start();
            }
            Thread.sleep(1000);
            countDownLatch.countDown();//模拟十个线程一起并发.指定一起执行
        }
        static int j=10;
    

      这里模拟了一下十个线程并发,其实和ReentrantLock效果一样的,但是在分布式中也可以实现同步,而ReentrantLock就不行了

    分布式计数器:

    和分布式锁其实都是同理:

    /** zookeeper地址 */
        static final String CONNECT_ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181";
        /** session超时时间 */
        static final int SESSION_OUTTIME = 5000;//ms
        public static void main(String[] args) throws Exception {
            //重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //通过工厂建立连接
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)//重试策略
                    .build();
            curatorFramework.start();
            //分布式计数器
            DistributedAtomicInteger counter=new DistributedAtomicInteger(curatorFramework,"/super",new RetryNTimes(3,100));
            //初始化
            counter.forceSet(0);
            AtomicValue<Integer> value = counter.increment();//原子自增
            System.out.println("原值为"+value.preValue());
            System.out.println("更改后的值为"+value.postValue());
            System.out.println("状态"+value.succeeded());
        }
    

      

    分布式线程屏障同步:

    for (int i = 0; i < 5; i++) {//模拟5个客户端
                new Thread(()->{
                    try {
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                        CuratorFramework cf = CuratorFrameworkFactory.builder()
                                .connectString(CONNECT_ADDR)
                                .retryPolicy(retryPolicy)
                                .build();
                        cf.start();
                        //双重同步DistributedDoubleBarrier: 指定5个线程,到了5个会自动触发
                        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
                        Thread.sleep(1000 * (new Random()).nextInt(3));
                        System.out.println(Thread.currentThread().getName() + "已经准备");
                        barrier.enter();//准备,等待成员到达5个再一起往下执行
                        System.out.println("同时开始运行...");
                        Thread.sleep(1000 * (new Random()).nextInt(3));
                        System.out.println(Thread.currentThread().getName() + "运行完毕");
                        barrier.leave();//再次等待准备退出,5个都是leave状态时再一起退出
                        System.out.println("同时退出运行...");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                },"t" + i).start();
            }
    

      和java中CyicrBarrier一模一样,只是支持分布式

    static DistributedBarrier barrier;
        public static void main(String[] args) throws Exception {
            for(int i = 0; i < 5; i++){
                new Thread(()-> {
                        try {
                            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                            CuratorFramework cf = CuratorFrameworkFactory.builder()
                                    .connectString(CONNECT_ADDR)
                                    .sessionTimeoutMs(SESSION_OUTTIME)
                                    .retryPolicy(retryPolicy)
                                    .build();
                            cf.start();
                            barrier = new DistributedBarrier(cf, "/test");
                            System.out.println(Thread.currentThread().getName() + "设置barrier!");
                            barrier.setBarrier();//设置
                            barrier.waitOnBarrier();	//等待Barrier释放,准备一起执行
                            System.out.println("---------开始执行程序----------");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                },"t" + i).start();
            }
            Thread.sleep(10000);
            if(barrier != null){
                System.out.println("启动所有线程");
                barrier.removeBarrier();//删除屏障
            }
        }
    

      最后一个是只实现同时执行的,很奇怪,实际练习中开始执行程序这就是不执行,线程也一直卡在折了,启动这块都打印了!还得再研究下!

  • 相关阅读:
    R_Studio中对xls文件学生总成绩统计求和
    R_Studio读取xls文件
    Unity3D_(游戏)甜品消消乐03_游戏UI设计
    Unity3D_(游戏)甜品消消乐02_游戏核心算法
    Unity3D_(游戏)甜品消消乐01_游戏基础界面
    Unity3D_(游戏)双人3D坦克_简易版
    Unity3D_(游戏)卡牌04_游戏界面
    万年历查询地址 当天日期(带周)
    meta name="viewport" content="width=device-width,initial-scale=1.0" 解释
    Chrome调试(debugger)总是进入paused in debugger状态
  • 原文地址:https://www.cnblogs.com/houzheng/p/9773949.html
Copyright © 2011-2022 走看看