zoukankan      html  css  js  c++  java
  • ZooKeeper分布式

    1:zk的相关特性

    1、一致性:数据一致性,数据按顺序分批入库。
    2、原子性:事务要么都成功,要么都失败,不会局部化。
    3、单一视图:客户端连接集群中的任一zk节点,数据都是一致的。
    4、可靠性:每次对zk的操作状态都保存在服务器中。
    5、实时性:客户端可以读取zk服务端的最新数据

    2:linux上环境变量在/etc/profile 中编辑过后需要执行source /etc/profile 重新导入刚刚的配置

    3:Zookeeper中zoo.cfg配置

    1、tickTime:用于计算的时间单元。比如session超时:N*tickTime.
    2、initLimit:用于集群,允许从节点连接并同步到 master节点的初始化连接时间,以tickTime 的倍数来表示.
    3、syncLimit:用于集群, master主节点与 从节点 之间发送消息,请求和应答 时间长度(心跳机制)
    4、dataDir:必须配置。
    5、dataLogDir:日志目录。
    6、clientPort:连接服务器的端口,默认2181

    4:基本数据类型

    1、是一个树型结构,类似前端开发中的tree.js组件。
    2、每个节点称之为znode,它可以有子节点,也可以有数据。
    3、每个节点分为临时节点和永久节点,临时节点会在客户端断开后消失。
    4、每个zk节点都各自的版本号,可以通过命令行来显示节点信息。
    5、每个节点数据发生变化,那么节点的版本号会累加(乐观锁)。
    6、删除、修改过期节点,版本号不匹配则会报错。
    6、每个zk节点存储的数据不宜过大,几K即可。
    7、节点可以设置acl,可以通过权限来控制用户的访问

    5:Linux的ZK客户端命令行学习:

    ./zkCli.sh可以启动客户端,使用help命令查看命令详解,使用Ctrl+C可以退出客户端:

    zkCli.sh -server [ip]:[port] 连接zk

    查看服务器状态

    zkServer.sh status
    启停服务器
    zkServer.sh start

    ZooKeeper -server host:port cmd args

    stat path [watch] //stat命令用于查看节点的状态信息

    set path data [version] //set命令用于设置节点的数据

    ls path [watch] //ls命令用于获取路径下的节点信息,注意路径为绝对路径

    delquota [-n|-b] path //delquota命令用于删除配额,-n为子节点个数,-b为节点数据长度

    ls2 path [watch] //ls2命令是ls命令的增强版,比ls命令多输出本节点信息

    setAcl path acl // setAcl命令用于设置节点Acl,Acl由三部分构成:1为scheme,2为user,3为permission,一般情况下表示为scheme:id:permissions

    setquota -n|-b val path //setquota命令用于设置节点个数以及数据长度的配额

    history //history用于列出最近的命令历史,可以和redo配合使用

    redo cmdno //redo命令用于再次执行某个命令

    printwatches on|off //printWatchers命令用于设置和显示监视状态,值为on或则off

    delete path [version] //delete命令用于删除节点,如delete /nodeDelete

    sync path //sync命令用于强制同步,由于请求在半数以上的zk server上生效就表示此请求生效,那么就会有一些zk server上的数据是旧的。sync命令就是强制同步所有的更新操作。

    listquota path //查看指定znode的配额

    rmr path //递归删除

    get path [watch] //get命令用于获取节点的信息,注意节点的路径必须是以/开头的绝对路径。如get /

    create [-s] [-e] path data acl //create命令用于创建节点,其中-s为顺序充点,-e临时节点

    addauth scheme auth //addauth命令用于节点认证,使用方式:如addauth digest username:password

    quit //退出客户端

    getAcl path //获取节点的Acl,如getAcl /node1

    close //close命令用于关闭与服务端的链接

    connect host:port //连接zk服务端,与close命令配合使用可以连接或者断开zk服务端

    返回信息的具体含义:

    cZxid = 0x0 //节点创建时的zxid

    ctime = Thu Jan 01 08:00:00 CST 1970 //节点创建时间

    mZxid = 0x0 //节点最近一次更新时的zxid

    mtime = Thu Jan 01 08:00:00 CST 1970 //节点最近一次更新的时间

    pZxid = 0x2c //子节点的id

    cversion = 10 //子节点数据更新次数

    dataVersion = 0 //本节点数据更新次数

    aclVersion = 0 //节点ACL(授权信息)的更新次数

    ephemeralOwner = 0x0 //如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0

    dataLength = 0 //节点数据长度,本例中为hello world的长度

    numChildren = 10 //子节点个数

    6:zk的作用体现:

    1、master节点选举,主节点挂了以后,从节点就会接手工作,并且保证这个节点是唯一的,这也就是所谓的首脑模式,从而保证我们的集群是高可用的。

    2、统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算的特别多。

    3、发布与订阅,类似消息队列MQ(amq,rmq),dubbo发布者把数据存在znode上,订阅者会读取这个数据。

    4、提供分布式锁,分布式环境中不同进程之间争夺资源,类似多线程中的锁。

    5、集群管理,集群中保证数据的强一致性。

    7::Zookeeper-watcher机制

    1、针对每个节点的操作,都会有一个监督者->wathcer。
    2、当监控的某个对象(znode)发生了变化。则触发wathcer事件。
    3、zk中的wathcer是一次性的,触发后立即销毁。
    4、父节点、子节点 增删改查都能够触发其wathcer
    5、针对不同类型的操作,触发的wathcer事件是不同的。子节点的创建事件,子节点的删除事件,子节点数据变化事件
    6、ls 为父节点设置watcher,创建子节点触发:NodeChildChanged
    7、ls 为父节点设置watcher,删除子节点触发:NodeChildChanged
    8、ls 为父节点设置watcher,修改子节点不触发事件

    get和stat的watch机制都是把当前节点作为父节点的

    8:Apache Curator客户端的使用

    需要依赖一下

    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    </dependency>
    public class CuratorOperator {

    public CuratorFramework client = null;
    public static final String zkServerPath = "192.168.1.110:2181";

    /**
    * 实例化zk客户端
    */
    public CuratorOperator() {
    /**
    * 同步创建zk示例,原生api是异步的
    *
    * curator链接zookeeper的策略:ExponentialBackoffRetry
    * baseSleepTimeMs:初始sleep的时间
    * maxRetries:最大重试次数
    * maxSleepMs:最大重试时间
    */
    // RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

    /**
    * curator链接zookeeper的策略:RetryNTimes
    * n:重试的次数
    * sleepMsBetweenRetries:每次重试间隔的时间
    */
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

    /**
    * curator链接zookeeper的策略:RetryOneTime
    * sleepMsBetweenRetry:每次重试间隔的时间
    */
    // RetryPolicy retryPolicy2 = new RetryOneTime(3000);

    /**
    * 永远重试,不推荐使用
    */
    // RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)

    /**
    * curator链接zookeeper的策略:RetryUntilElapsed
    * maxElapsedTimeMs:最大重试时间
    * sleepMsBetweenRetries:每次重试间隔
    * 重试时间超过maxElapsedTimeMs后,就不再重试
    */
    // RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

    client = CuratorFrameworkFactory.builder()
    .connectString(zkServerPath)
    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
    .namespace("workspace").build();
    client.start();
    }

    /**
    *
    * @Description: 关闭zk客户端连接
    */
    public void closeZKClient() {
    if (client != null) {
    this.client.close();
    }
    }

    public static void main(String[] args) throws Exception {
    // 实例化
    CuratorOperator cto = new CuratorOperator();
    boolean isZkCuratorStarted = cto.client.isStarted();
    System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

    // 创建节点
    String nodePath = "/super/imooc";
    byte[] data = "superme".getBytes();
    cto.client.create().creatingParentsIfNeeded()
    .withMode(CreateMode.PERSISTENT)
    .withACL(Ids.OPEN_ACL_UNSAFE)
    .forPath(nodePath, data);

    // 更新节点数据
    // byte[] newData = "batman".getBytes();
    // cto.client.setData().withVersion(0).forPath(nodePath, newData);

    // 删除节点
    // cto.client.delete()
    // .guaranteed() // 如果删除失败,那么在后端还是继续会删除,直到成功
    // .deletingChildrenIfNeeded() // 如果有子节点,就删除
    // .withVersion(0)
    // .forPath(nodePath);
    不会删除workspace


    // 读取节点数据
    // Stat stat = new Stat();
    // byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
    // System.out.println("节点" + nodePath + "的数据为: " + new String(data));
    // System.out.println("该节点的版本号为: " + stat.getVersion());


    // 查询子节点
    // List<String> childNodes = cto.client.getChildren()
    // .forPath(nodePath);
    // System.out.println("开始打印子节点:");
    // for (String s : childNodes) {
    // System.out.println(s);
    // }
    不会打印workspace

    // 判断节点是否存在,如果不存在则为空
    // Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
    // System.out.println(statExist);


    // watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
    // cto.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
    // cto.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

    // 为节点添加watcher
    // NodeCache: 监听数据节点的变更,会触发事件
    // final NodeCache nodeCache = new NodeCache(cto.client, nodePath);
    // // buildInitial : 初始化的时候获取node的值并且缓存
    // nodeCache.start(true);
    // if (nodeCache.getCurrentData() != null) {
    // System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
    // } else {
    // System.out.println("节点初始化数据为空...");
    // }
    // nodeCache.getListenable().addListener(new NodeCacheListener() {
    // public void nodeChanged() throws Exception {
    // if (nodeCache.getCurrentData() == null) {
    // System.out.println("空");
    // return;
    // }
    // String data = new String(nodeCache.getCurrentData().getData());
    // System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
    // }
    // });


    // 为子节点添加watcher
    // PathChildrenCache: 监听数据节点的增删改,会触发事件
    String childNodePathCache = nodePath;
    // cacheData: 设置缓存节点的数据状态
    final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
    /**
    * StartMode: 初始化方式
    * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
    * NORMAL:异步初始化
    * BUILD_INITIAL_CACHE:同步初始化
    */
    childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

    List<ChildData> childDataList = childrenCache.getCurrentData();
    System.out.println("当前数据节点的子节点数据列表:");
    for (ChildData cd : childDataList) {
    String childData = new String(cd.getData());
    System.out.println(childData);
    }

    childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
    System.out.println("子节点初始化ok...");
    }

    else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
    String path = event.getData().getPath();
    if (path.equals(ADD_PATH)) {
    System.out.println("添加子节点:" + event.getData().getPath());
    System.out.println("子节点数据:" + new String(event.getData().getData()));
    } else if (path.equals("/super/imooc/e")) {
    System.out.println("添加不正确...");
    }

    }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
    System.out.println("删除子节点:" + event.getData().getPath());
    }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    System.out.println("修改子节点路径:" + event.getData().getPath());
    System.out.println("修改子节点数据:" + new String(event.getData().getData()));
    }
    }
    });

    Thread.sleep(100000);

    cto.closeZKClient();
    boolean isZkCuratorStarted2 = cto.client.isStarted();
    System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
    }

    public final static String ADD_PATH = "/super/imooc/d";

    }
     
    public class MyCuratorWatcher implements CuratorWatcher {

    @Override
    public void process(WatchedEvent event) throws Exception {
    System.out.println("触发watcher,节点路径为:" + event.getPath());
    }

    }
    public class MyWatcher implements Watcher {

    @Override
    public void process(WatchedEvent event) {
    System.out.println("触发watcher,节点路径为:" + event.getPath());
    }
    }

    9:zk设置配置文件

    public class Client1 {

    public CuratorFramework client = null;
    public static final String zkServerPath = "192.168.1.110:2181";

    public Client1() {
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    client = CuratorFrameworkFactory.builder()
    .connectString(zkServerPath)
    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
    .namespace("workspace").build();
    client.start();
    }

    public void closeZKClient() {
    if (client != null) {
    this.client.close();
    }
    }

    // public final static String CONFIG_NODE = "/super/imooc/redis-config";
    public final static String CONFIG_NODE_PATH = "/super/imooc";
    public final static String SUB_PATH = "/redis-config";
    public static CountDownLatch countDown = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
    Client1 cto = new Client1();
    System.out.println("client1 启动成功...");

    final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
    childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

    // 添加监听事件
    childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    // 监听节点变化
    if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    String configNodePath = event.getData().getPath();
    if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
    System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);

    // 读取节点数据
    String jsonConfig = new String(event.getData().getData());
    System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);

    // 从json转换配置
    RedisConfig redisConfig = null;
    if (StringUtils.isNotBlank(jsonConfig)) {
    redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
    }

    // 配置不为空则进行相应操作
    if (redisConfig != null) {
    String type = redisConfig.getType();
    String url = redisConfig.getUrl();
    String remark = redisConfig.getRemark();
    // 判断事件
    if (type.equals("add")) {
    System.out.println("监听到新增的配置,准备下载...");
    // ... 连接ftp服务器,根据url找到相应的配置
    Thread.sleep(500);
    System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");
    // ... 下载配置到你指定的目录
    Thread.sleep(1000);
    System.out.println("下载成功,已经添加到项目中");
    // ... 拷贝文件到项目目录
    } else if (type.equals("update")) {
    System.out.println("监听到更新的配置,准备下载...");
    // ... 连接ftp服务器,根据url找到相应的配置
    Thread.sleep(500);
    System.out.println("开始下载配置文件,下载路径为<" + url + ">");
    // ... 下载配置到你指定的目录
    Thread.sleep(1000);
    System.out.println("下载成功...");
    System.out.println("删除项目中原配置文件...");
    Thread.sleep(100);
    // ... 删除原文件
    System.out.println("拷贝配置文件到项目目录...");
    // ... 拷贝文件到项目目录
    } else if (type.equals("delete")) {
    System.out.println("监听到需要删除配置");
    System.out.println("删除项目中原配置文件...");
    }

    // TODO 视情况统一重启服务
    }
    }
    }
    }
    });

    countDown.await();

    cto.closeZKClient();
    }

    }
    public class RedisConfig {

    private String type; // add 新增配置 update 更新配置 delete 删除配置
    private String url; // 如果是add或update,则提供下载地址
    private String remark; // 备注

    public String getType() {
    return type;
    }
    public void setType(String type) {
    this.type = type;
    }
    public String getUrl() {
    return url;
    }
    public void setUrl(String url) {
    this.url = url;
    }
    public String getRemark() {
    return remark;
    }
    public void setRemark(String remark) {
    this.remark = remark;
    }
    }
    RedisConfig.json相关代码
    {"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
    {"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
    {"type":"delete","url":"","remark":"delete"}

    10:zookeeper集群搭建

    	zk集群, 主从节点, 心跳机制(选举模式)
    
    	zookeeper集群搭建注意点:
    	在dataDir创建并配置数据文件 myid 内容为1/2/3 对应 server.1/2/3
    	通过 ./zkCli.sh -server [ip]:[port] 检测集群是否配置成功
    	例:
    		修改conf文件夹下的zoo.cfg文件加入:
    			server.1=192.168.56.105:2888:3888
    			server.2=192.168.56.105:2889:3889
    			server.3=192.168.56.105:2890:3890
    		再到dataDir里添加myid文件内容为1, 第二台集群则myid配置为2
    
    • 在连接zk超时的时候,不支持自动重连,需要手动操作
    • Watch注册一次就会失效,需要反复注册
    • 不支持递归创建节点

    Apache curator:

    • Apache 的开源项目
    • 解决Watch注册一次就会失效的问题
    • 提供的 API 更加简单易用
    • 提供更多解决方案并且实现简单,例如:分布式锁
    • 提供常用的ZooKeeper工具类
    • 编程风格更舒服,

     

    12

  • 相关阅读:
    [DOJ练习] 无向图的邻接矩阵表示法验证程序
    [DOJ练习] 求无向图中某顶点的度
    [邻接表形式]有向图的建立与深度,广度遍历
    [DOJ练习] 有向图的邻接表表示法验证程序(两种写法)
    [Java 学习笔记] 异常处理
    [总结]单源最短路(朴素Dijkstra)与最小生成树(Prim,Kruskal)
    时间选择插件jquery.timepickr
    页面值传入后台出现中文乱码
    CheckTreecheckbox树形控件
    JQuery EasyUI DataGrid
  • 原文地址:https://www.cnblogs.com/zyy1688/p/10443287.html
Copyright © 2011-2022 走看看