zoukankan      html  css  js  c++  java
  • CuratorFramework使用

    CuratorFramework
    Framework
    是ZooKeeper Client更高的抽象API
    自动连接管理:
    1. 当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明
    2. 监控节点数据变化事件NodeDataChanged,需要时调用updateServerList()方法
    3. Curator recipes自动移除监控

    更清晰的API: 简化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口
    提供Recipes实现 : 选举,共享锁, 路径cache, 分布式队列,分布式优先队列等

    CuratorFrameworkFactory类提供了两个方法, 一个工厂方法newClient, 一个构建方法build. 使用工厂方法newClient可以创建一个默认的实例, 而build构建方法可以对实例进行定制. 当CuratorFramework实例构建完成, 紧接着调用start()方法, 在应用结束的时候, 需要调用close()方法. CuratorFramework是线程安全的. 在一个应用中可以共享同一个zk集群的CuratorFramework.

    CuratorFramework API采用了连贯风格的接口(Fluent Interface). 所有的操作一律返回构建器, 当所有元素加在一起之后, 整个方法看起来就像一个完整的句子. 比如下面的操作:

    client.create().forPath("/head")
    client.delete().inBackground().forPath("/head")
    client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child")
    client.getData().watched().inBackground().forPath("/test")

    方法说明:
    create(): 发起一个create操作. 可以组合其他方法 (比如mode 或background) 最后以forPath()方法结尾
    delete(): 发起一个删除操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
    checkExists(): 发起一个检查ZNode 是否存在的操作. 可以组合其他方法(watch 或background) 最后以forPath()方法结尾
    getData(): 发起一个获取ZNode数据的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
    setData(): 发起一个设置ZNode数据的操作. 可以组合其他方法(version 或background) 最后以forPath()方法结尾
    getChildren(): 发起一个获取ZNode子节点的操作. 可以组合其他方法(watch, background 或get stat) 最后以forPath()方法结尾
    inTransaction(): 发起一个ZooKeeper事务. 可以组合create, setData, check, 和/或delete 为一个操作, 然后commit() 提交

    通知(Notification)
    通过ClientListener实现。 通过CuratorFramework实例里的addListener()注册listener. listener实现以下方法:
    - eventReceived() 当一个后台操作完成或者指定的watch被触发时该方法被调用

    CuratorEvent
    CuratorEvent(在以前版本为ClientEvent)是对各种操作触发相关事件对象(POJO)的一个完整封装, 而事件对象的内容跟事件类型相关, 事件类型可通过getType()获得。下面是对应关系:

    名称空间(Namespace)

    因为一个zk集群会被多个应用共享, 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选). 这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root. 使用代码如下:

    CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
     …
    client.create().forPath("/test", data);
    // 实际上节点路径为: "/MyApp/test"

    临时连接
    Curator还提供了临时的CuratorFramework: CuratorTempFramework,意思是在一个容易失败的网络如WAN中,向zooKeeper的单一请求。 一定时间不活动后连接会被关闭,只提供了有限的api. 临时 CuratorFramework基于Camille Fournier的一篇文章:http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html.

    创建builder时不是调用build()而是调用buildTemp()。 3分钟不活动连接就被关闭,也可以指定不活动的时间。 它只提供了下面几个方法:

    /**
         * Stop the client
         */
        public void     close();
    
        /**
         * Start a transaction builder
         *
         * @return builder object
         * @throws Exception errors
         */
        public CuratorTransaction inTransaction() throws Exception;
    
        /**
         * Start a get data builder
         *
         * @return builder object
         * @throws Exception errors
         */
        public TempGetDataBuilder getData() throws Exception;

    创建实例
    两种方法,newClient()或build(),实例如下,来自于官网

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *   http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    package framework;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class CreateClientExamples
    {
        public static CuratorFramework createSimple(String connectionString)
        {
            // these are reasonable arguments for the ExponentialBackoffRetry. The first
            // retry will wait 1 second - the second will wait up to 2 seconds - the
            // third will wait up to 4 seconds.
            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
    
            // The simplest way to get a CuratorFramework instance. This will use default values.
            // The only required arguments are the connection string and the retry policy
            return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
        }
    
        public static CuratorFramework  createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs)
        {
            // using the CuratorFrameworkFactory.builder() gives fine grained control
            // over creation options. See the CuratorFrameworkFactory.Builder javadoc
            // details
            return CuratorFrameworkFactory.builder()
                .connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc. etc.
                .build();
        }
    }

    CRUD操作,来自于官网

    package com.colobu.zkrecipe.framework;
    
    import java.util.List;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.framework.api.CuratorListener;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.Watcher;
    
    public class CrudExample {
    
        public static void main(String[] args) {
    
        }
    
        public static void create(CuratorFramework client, String path, byte[] payload) throws Exception {
            // this will create the given ZNode with the given data
            client.create().forPath(path, payload);
        }
    
        public static void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
            // this will create the given EPHEMERAL ZNode with the given data
            client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
        }
    
        public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) throws Exception {
            // this will create the given EPHEMERAL-SEQUENTIAL ZNode with the given
            // data using Curator protection.
            return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
        }
    
        public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception {
            // set data for the given node
            client.setData().forPath(path, payload);
        }
    
        public static void setDataAsync(CuratorFramework client, String path, byte[] payload) throws Exception {
            // this is one method of getting event/async notifications
            CuratorListener listener = new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    // examine event for details
                }
            };
            client.getCuratorListenable().addListener(listener);
            // set data for the given node asynchronously. The completion
            // notification
            // is done via the CuratorListener.
            client.setData().inBackground().forPath(path, payload);
        }
    
        public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
            // this is another method of getting notification of an async completion
            client.setData().inBackground(callback).forPath(path, payload);
        }
    
        public static void delete(CuratorFramework client, String path) throws Exception {
            // delete the given node
            client.delete().forPath(path);
        }
    
        public static void guaranteedDelete(CuratorFramework client, String path) throws Exception {
            // delete the given node and guarantee that it completes
            client.delete().guaranteed().forPath(path);
        }
    
        public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
            /**
             * Get children and set a watcher on the node. The watcher notification
             * will come through the CuratorListener (see setDataAsync() above).
             */
            return client.getChildren().watched().forPath(path);
        }
    
        public static List<String> watchedGetChildren(CuratorFramework client, String path, Watcher watcher) throws Exception {
            /**
             * Get children and set the given watcher on the node.
             */
            return client.getChildren().usingWatcher(watcher).forPath(path);
        }
    }

    事务

    package com.colobu.zkrecipe.framework;
    
    import java.util.Collection;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.api.transaction.CuratorTransaction;
    import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
    import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
    
    public class TransactionExample {
    
        public static void main(String[] args) {
    
        }
    
        public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception {
            // this example shows how to use ZooKeeper's new transactions
            Collection<CuratorTransactionResult> results = client.inTransaction().create().forPath("/a/path", "some data".getBytes())
                    .and().setData().forPath("/another/path", "other data".getBytes())
                    .and().delete().forPath("/yet/another/path")
                    .and().commit(); // IMPORTANT! The transaction is not submitted until commit() is called
    
            for (CuratorTransactionResult result : results) {
                System.out.println(result.getForPath() + " - " + result.getType());
            }
            return results;
        }
    
        /*
         * These next four methods show how to use Curator's transaction APIs in a
         * more traditional - one-at-a-time - manner
         */
        public static CuratorTransaction startTransaction(CuratorFramework client) {
            // start the transaction builder
            return client.inTransaction();
        }
    
        public static CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction) throws Exception {
            // add a create operation
            return transaction.create().forPath("/a/path", "some data".getBytes()).and();
        }
    
        public static CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction) throws Exception {
            // add a delete operation
            return transaction.delete().forPath("/another/path").and();
        }
    
        public static void commitTransaction(CuratorTransactionFinal transaction) throws Exception {
            // commit the transaction
            transaction.commit();
        }
    }

    自己写的练习
    1) 新建 maven项目,添加依赖,添加curator-recipes即可

    <dependency >
          <groupId >org.apache.curator </ groupId>
          <artifactId > curator-recipes</ artifactId >
          <version >2.6.0 </ version>
        </dependency >

    2) 采用CuratorFrameworkFactory.newClient() 或者 build() 方式 创建CuratorFramework实例,如下所示:

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                   CuratorFramework client;
    
             //      client= CuratorFrameworkFactory.newClient(CONNECTSTRING, retryPolicy);
    
                   client=CuratorFrameworkFactory. builder().
                               connectString( CONNECTSTRING )
                               .connectionTimeoutMs(30000)
                               .sessionTimeoutMs(30000)
                               .canBeReadOnly( false ).retryPolicy(retryPolicy)
                               .namespace( NAMESPACE )
                               .defaultData( null )
                               .build();

    3) client.start() 启动,结束时调用client.close() 关闭
    4) 通过fluent方式进行相关操作,如下增删改查:

     client.create().forPath( PATH, "hello world" .getBytes());
         byte [] bs=client.getData().forPath( PATH);
         System. out.println( "新建的节点,data为: " + new String(bs));
    
         client.setData().forPath( PATH ,"hello china".getBytes());
         // 由于是在background模式下获取的data,此时的 bs可能为null 
        byte [] bs2=client.getData().watched().inBackground().forPath( PATH);
        System. out.println( "新修改的节点,data为: " + new String(bs2!=null ? bs2 : new byte[0]));
    
       client.delete().forPath( PATH );

    完整代码如下:

    package curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.utils.CloseableUtils;
    import org.apache.zookeeper.data.Stat;
    
    public class CuratorClientExample {
            public static String CONNECTSTRING= "127.0.0.1:2181";
            public static String PATH= "/crud";
            public static String NAMESPACE= "fortest";
            public static void main(String[] args) {
    
                   RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                   CuratorFramework client;
    
             //      client= CuratorFrameworkFactory.newClient(CONNECTSTRING, retryPolicy);
    
                   client=CuratorFrameworkFactory. builder().
                               connectString( CONNECTSTRING )
                               .connectionTimeoutMs(30000)
                               .sessionTimeoutMs(30000)
                               .canBeReadOnly( false ).retryPolicy(retryPolicy)
                               .namespace( NAMESPACE )
                               .defaultData( null )
                               .build();
                   client.start();
    
                   try {
                               client.create().forPath( PATH ,"hello world".getBytes());
                                byte [] bs=client.getData().forPath( PATH);
                               System. out .println("新建的节点,data为: " + new String(bs));
    
                               client.setData().forPath( PATH ,"hello china".getBytes());
                                // 由于是在background模式下获取的data,此时的 bs可能为null 
                                byte [] bs2=client.getData().watched().inBackground().forPath( PATH);
                               System. out .println("新修改的节点,data为: " + new String(bs2!=null ? bs2 : new byte[0]));
    
                               client.delete().forPath( PATH );
                               Stat stat=client.checkExists().forPath( PATH );
                                // Stat 就是对zonde所有属性的一个映射, stat=null表示节点不存在! 
                               System. out .println(stat);
                        } catch (Exception e) {
                                // TODO Auto-generated catch block
                               e.printStackTrace();
                        } finally {
                               CloseableUtils. closeQuietly(client);
                        }    
            }
    }

    参考资料:
    - http://curator.apache.org/curator-framework/index.html
    - http://supben.iteye.com/blog/2094077
    - http://macrochen.iteye.com/blog/1366136

  • 相关阅读:
    多数据源报表解析之简单多源报表
    8.5.4 Optimizing InnoDB Redo Logging 优化InnoDB Redo Logging
    8.5.2 Optimizing InnoDB Transaction Management
    8.5.1 Optimizing Storage Layout for InnoDB Tables
    Linux_RHEL7_YUM
    Linux_RHEL7_YUM
    Python基本语法_函数_返回值
    Python基本语法_函数_返回值
    8.4 Optimizing Database Structure 优化数据库结构
    8.3.7 InnoDB and MyISAM Index Statistics Collection InnoDB 和MyISAM 索引统计信息收集
  • 原文地址:https://www.cnblogs.com/a-du/p/9892108.html
Copyright © 2011-2022 走看看