zoukankan      html  css  js  c++  java
  • Zookeeper—学习笔记(一)

    1、Zookeeper基本功能

    (增 删 改 查;注册,监听)

    两点:

      1、放数据(少量)。

      2、监听节点。

     注意:

      Zookeeper中的数据不同于数据库中的数据,没有表,没有记录,没有字段;

      Zookeeper中的数据是key-value对,key可以有子key

      value为二进制数据。

    2、应用场景

    2.1、服务器上下线动态感知

    2.2、配置文件管理

     

     3、Zookeeper本身就是一个HA集群

    Zookeeper自身就是一个十分可靠的分布式系统。

    这个分布式系统只有一个程序,进程:QuorumpeerMain,只不过这个进程在工作的时候有多种不同状态

    3.1、zookeeper集群结构示意图

    3.2、白话谈选举过程-zookeeper的整体运行机制

      leader和follower通信端口,集群内部工作端口2888

       选举端口3888

      假如集群中共的节点按照 myid 1,2,3,4,5 的顺序一次启动;id为1的节点最先启动,它启动之后想集群中的2888端口发出消息,此时没有leader在2888端口回应1号节点,1号节点就知道了,此时集群中是没有leader的,然后1号节点发起选举,不停的往3888端口发选举消息,并且告诉大家投他自己,也就是1号;此时id为2的节点启动,同样的第一件事也是向2888广播消息,没有人回应,知道没有leader,此时的1号节点不断的在3888宣传自己,2号节点收到3888端口的消息,发现集群中有个兄弟节点发起了,投票且投1号,2号节点查看自己的id后发现,我的id是2比1大啊,我投我自己,然后2号节点向3888端口不停的广播投2号;此时1号节点在自己的3888端口收到消息发现,有人投2号且id比我的大,那我也都2号;此时1号节点和2号节点都不停的往3888发起选举且投2号;就在这时候3号点启动,同样第一件事给集群2888广播消息,没有人回应,但是会收到3888端口的选举信息,经查看发现id都比我小(1<3 2<3),我要投自己3;往3888广播选举3号的消息;然后1号节点和2号节点收到消息后,发现又来了一个大的节点,那我们都投3吧,此时超过了半数( 3 > 5 / 2)节点,3号节点将自己的转台切换为leader,成功上位;此后4号节点和5号节点已启动就发现已经有leader了,自动变为follower;

      有没有发现;节点发起选举时只会选举自己(自私),当发现有id大于自身的节点也参与选举时,他会无私的支持最大者(无私)。

      如果是运行的过程中leader挂了,在重新投票的过程中,投票信息会带有节点的数据版本信息,只有最新数据的且id较大者,会被选为新的leader。  

     3.3、安装Zookeeper集群

    上传安装包,解压

    修改conf/zoo.cfg

    # The number of milliseconds of each tick

    tickTime=2000

    # The number of ticks that the initial

    # synchronization phase can take

    initLimit=10

    # The number of ticks that can pass between

    # sending a request and getting an acknowledgement

    syncLimit=5

    # the directory where the snapshot is stored.

    # do not use /tmp for storage, /tmp here is just

    # example sakes.

    dataDir=/root/zkdata

    # the port at which the clients will connect

    clientPort=2181

    # Set to "0" to disable auto purge feature

    #autopurge.purgeInterval=1

    server.1=hdp-01:2888:3888

    server.2=hdp-02:2888:3888

    server.3=hdp-03:2888:3888

    配置文件修改完后,将安装包拷贝给hdp-02 和 hdp-03

    接着,到hdp-01上,新建数据目录/root/zkdata,并在目录中生成一个文件myid,内容为1

    接着,到hdp-02上,新建数据目录/root/zkdata,并在目录中生成一个文件myid,内容为2

    接着,到hdp-03上,新建数据目录/root/zkdata,并在目录中生成一个文件myid,内容为3

    启动zookeeper集群:

    3.3.1、自动脚本

    #!bin/bash
    for host in hdp-01 hdp-02 hdp-03
    do
    echo "${host}:${1}ing"
    ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"
    done
    
    sleep 2
    
    for host in hdp-01 hdp-02 hdp-03
    do
    ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"
    done
    chmod +x zkmanager.sh
    ssh $host会进入这台机器的主目录(/root)下下执行文件中的命令啊,在启动过程中会在该目录下生成Zookeeper的日志。zookeeper.out,,可以查看日志信息。

    使用

    ./zkmanager start
    ./zkmanager stop

    4、zkCli客户端

    默认链接localhost

    bin/zkCli.sh 

    连接指定主机

    bin/zkCli.sh -server hpd-01 -p 2181

    4.1、管理数据

    4.1.1、ls-查看节点

    ls /
    ls
    /zookeeper

    4.1.2、get-查看数据

    get /zookeeper

    4.1.3、create-创建节点

    命令行只能存放字符串数据,没有办法存放二进制数据

    "hellozk"之后的数据全部是该节点的元数据信息,目的是为了维护数据版本的一致性。

    create /aa "hellozk"

    4.1.4、set-修改节点的值

    数据版本会增加1

    set /aa hellospark

     

    4.1.5、rmr-递归删除数据节点

    rmr /aa

    4.2、监听节点

    注意:注册的监听器在正常收到一次所监听的事件后,就失效。

    4.2.1、get注册监听

    get注册监听,只有当监听节点的数据发生变化,才会出发,监听节点添加新节点不会出发注册的监听器;

    且一次监听注册只起一次作用,触发之后,失效。除非再次注册。

    监听/aa 节点的数值变化,一旦/aa节点的值发生了变化(假如值被hdp-03上的zkCli修改了),向zk注册watch事件的客户端(假如是hdp-01上的zkCli)就会收到zk的通知,则hdp-01上的zkCli就会收到zk发回的通知,当然zkCli收到通知后只是输出到了控制台。

    get /aa watch

     

    会看到,状态(表示和服务器链接状态良好),以及事件的类型type(数据节点的值发生了变化),以及哪一个节点

    4.2.2、ls-注册子节点变化事件

    ls是获取子节点,监听子节点事件用 ls /aa watch

    hdp-01的zkCli注册ls 监听

    ls /aa watch

     hdp-02的zkCli对aa节点添加字节点,hdp-01会收到zk的事件通知。

    create /aa/xx 132

    5、zookeeper数据存储机制

    5.1、数据存储形式

    zookeeper中对用户的数据采用kv形式存储

    只是zk有点特别:

    key:是以路径的形式表示的,那就以为着,各key之间有父子关系,比如

    / 是顶层key

    用户建的key只能在/ 下作为子节点,比如建一个key: /aa  这个key可以带value数据

     也可以建一个key:   /bb

    也可以建key: /aa/xx

    zookeeper中,对每一个数据key,称作一个znode

    综上所述,zk中的数据存储形式如下:

    5.2、znode类型

    zookeeper中的znode有多种类型:

    1、PERSISTENT  持久的:创建者就算跟集群断开联系,该类节点也会持久存在与zk集群中

    2、EPHEMERAL  短暂的:创建者一旦跟集群断开联系,zk就会将这个节点删除

    3、SEQUENTIAL  带序号的:这类节点,zk会自动拼接上一个序号,而且序号是递增的 

     

    组合类型:

    PERSISTENT  :持久不带序号

    EPHEMERAL  :短暂不带序号

    PERSISTENT  且 SEQUENTIAL   :持久且带序号

    EPHEMERAL  且 SEQUENTIAL  :短暂且带序号

    6、java客户端

    6.1、创建zk客户端

    timeOut指定会话超时时间,即客户端关闭连接后,会话还可以保持多久;
    watcher是一个接口,即为当客户端收到zk事件通知时候要进行什么逻辑操作。
    //构造一个了解Zookeeper的客户端对象
    ZooKeeper zk = new
    ZooKeeper(“hdp-01:2181,hdp-02:2181,hdp-03:2181”,timeOut,watcher)

    6.2、增删改查

    public class ZookeeperClientDemo {
        ZooKeeper zk = null;
        @Before
        public void init()  throws Exception{
            // 构造一个连接zookeeper的客户端对象
            zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
        }
        
        
        @Test
        public void testCreate() throws Exception{
    
            // 参数1:要创建的节点路径  参数2:数据  参数3:访问权限  参数4:节点类型
            String create = zk.create("/eclipse", "hello eclipse".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            
            System.out.println(create);
            
            zk.close();
            
        }
        
        
        @Test
        public void testUpdate() throws Exception {
            
            // 参数1:节点路径   参数2:数据    参数3:所要修改的版本,-1代表任何版本
            zk.setData("/eclipse", "我爱你".getBytes("UTF-8"), -1);
            
            zk.close();
            
        }
        
        
        @Test    
        public void testGet() throws Exception {
            // 参数1:节点路径    参数2:是否要监听    参数3:所要获取的数据的版本,null表示最新版本
            byte[] data = zk.getData("/eclipse", false, null);
            System.out.println(new String(data,"UTF-8"));
            
            zk.close();
        }
        
        
        
        @Test    
        public void testListChildren() throws Exception {
            // 参数1:节点路径    参数2:是否要监听   
            // 注意:返回的结果中只有子节点名字,不带全路径
            List<String> children = zk.getChildren("/cc", false);
            
            for (String child : children) {
                System.out.println(child);
            }
            
            zk.close();
        }
        
        
        @Test
        public void testRm() throws InterruptedException, KeeperException{
            
            zk.delete("/eclipse", -1);
            
            zk.close();
        }
        
        
        
    
    }
    View Code

    6.2.1、创建节点create

    byte[] 数据 要求不能大于1M

    ACL是访问权限Access Control List:什么样的人可以访问这个数据节点,一般是内部一套系统使用,这里就选择开放权限

    String path = zk.create(String path,byte[] data,List<ACL> acl,CreateMode createMode);

     

    节点类型

    6.2.2、修改数据setData

    version指明要修改哪个版本的数据,如果用户不关心,哪个版本,可以设置成-1,表示修改全部版本。

    返回该节点数据的元数据Stat

     

    6.2.3、查找数据getData

    watch:表示要不要监听

    Stat:用元数据来表示要回去哪个版本,最新版本用null表示

    6.2.4、查找子节点getChildren

    返回所有子节点的名字,不带全路径,只有子节点名字

     

    6.2.5、删除节点delete

    -1表示删除所有版本

    6.3、注册监听

    注意:注册的监听器在正常收到一次所监听的事件后,就失效。

    zk会另外起一个线程,去等待zk发来的通知,并作出我们设置的watcher逻辑(若没有设置watcher逻辑,而是是指了boolean true,则直接输出收到的通知)

    Watcher是一个接口:客户端收到监听事件后的回调逻辑

     6.3.1、get监听节点数据变化

          @Before
        public void init() throws Exception {
            // 构造一个连接zookeeper的客户端对象
            zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
        }
    
          @Test
        public void testGetWatch() throws Exception {
    
            byte[] data = zk.getData("/mygirls", true, null); // 监听节点数据变化
            
            List<String> children = zk.getChildren("/mygirls", true); //监听节点的子节点变化事件
    
            System.out.println(new String(data, "UTF-8"));
    
            Thread.sleep(Long.MAX_VALUE);
    
        }

     注意:注册的监听器在正常收到一次所监听的事件后,就失效。可以在new Zookeeper(创建客户端)的时候,指定默认的回调逻辑同事在回调逻辑中继续注册监听,以后直接在getData时写true就可以了,这样就可以一直监听。

    注意:new Zookeeper的时候,会出发连接成功事件(发生路径null,type类型none

    可以用转太判断来避开初始事件

     

    构造Zookeeper的时候,添加回调逻辑,后续getData是监听节点数据是注册监听就只要设置true,那么就会调用对象构造时候的回调逻辑(默认逻辑)。

    public class ZookeeperWatchDemo {
    
        ZooKeeper zk = null;
    
        @Before
        public void init() throws Exception {
            // 构造一个连接zookeeper的客户端对象
            zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {
    
                @Override
                public void process(WatchedEvent event) {
    
                    if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) {
                        System.out.println(event.getPath()); // 收到的事件所发生的节点路径
                        System.out.println(event.getType()); // 收到的事件的类型
                        System.out.println("赶紧换照片,换浴室里面的洗浴套装....."); // 收到事件后,我们的处理逻辑
    
                        try {
                            zk.getData("/mygirls", true, null);
    
                        } catch (KeeperException | InterruptedException e) {
                            e.printStackTrace();
                        }
                    }else if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged){
                        
                        System.out.println("子节点变化了......");
                    }
    
                }
            });
        }
    
        @Test
        public void testGetWatch() throws Exception {
    
            byte[] data = zk.getData("/mygirls", true, null); // 监听节点数据变化
            
            List<String> children = zk.getChildren("/mygirls", true); //监听节点的子节点变化事件
    
            System.out.println(new String(data, "UTF-8"));
    
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }
    View Code

      6.3.2、监听节点的子节点变化事件

     

    6.4、Zookeeper客户端工作线程

    sendThread

    eventThread

    Zookeeper中的eventThread是守护线程

     守护线程:  A是B的守护线程,若B线程结束,不管A线程有没有执行完毕,A线程都会退出。(主人已挂,仆人陪葬)

    thread.setDaemon(true);

     7、实例服务器上下线,动态感知

    这是一个简单的分布式系统:

    业务功能:模拟业务,这里只是简单的时间查询;客户端去请求服务器获得时间信息,而且服务器动态上下线时候,客户端会实时感知。

    服务器:TimeQueryServer

    // 构造zk客户端连接// 注册服务器信息// 启动业务线程开始处理业务
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    public class TimeQueryServer {
        ZooKeeper zk = null;
        
        // 构造zk客户端连接
        public void connectZK() throws Exception{
            
            zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);          
        }
        
        // 注册服务器信息
        public void registerServerInfo(String hostname,String port) throws Exception{
            
            // 先判断注册节点的父节点是否存在,如果不存在,则创建
            Stat stat = zk.exists("/servers", false);
            if(stat==null){
                zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            
            // 注册服务器数据到zk的约定注册节点下【短暂+序号】
            String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            
            System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create);   
        }
        
        public static void main(String[] args) throws Exception {
            
            TimeQueryServer timeQueryServer = new TimeQueryServer();
            
            // 构造zk客户端连接
            timeQueryServer.connectZK();
            
            // 注册服务器信息
            timeQueryServer.registerServerInfo(args[0], args[1]);
            
            // 启动业务线程开始处理业务
            new TimeQueryService(Integer.parseInt(args[1])).start();  
        }
    }

    业务类:TimeQueryService

    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Date;
    
    public class TimeQueryService extends Thread{
        
        int port = 0;
        public TimeQueryService(int port){
            
            this.port = port;
        }        
        @Override
        public void run() {
            
            try {
           // ServerSocket监听port端口 ServerSocket ss
    = new ServerSocket(port); System.out.println("业务线程已绑定端口"+port+"准备接受消费端请求了....."); while(true){
              // 拿到客户端请求socket Socket sc
    = ss.accept(); InputStream inputStream = sc.getInputStream(); OutputStream outputStream = sc.getOutputStream(); outputStream.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }

    客户端(消费者:):Consumer

    // 构造zk连接对象// 查询在线服务器列表
    // 处理业务(向一台服务器发送时间查询请求)
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    
    public class Consumer {
    
        // 定义一个list用于存放最新的在线服务器列表
        private volatile ArrayList<String> onlineServers = new ArrayList<>();
    
        // 构造zk连接对象
        ZooKeeper zk = null;
    
        // 构造zk客户端连接;注册默认监听后的回调逻辑
        public void connectZK() throws Exception {
          
            zk = new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {
    
                @Override
                public void process(WatchedEvent event) {
              // 链接状态正常,且为节点的子节点变化事件
    if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { try { // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查询在线服务器列表 public void getOnlineServers() throws Exception {      // 查询服务器列表,再次开启监听 List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<>();      // 遍历节点的子节点 for (String child : children) {
           // 获取节点数据,此时的业务没有设置监听数值变化的需求
    byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查询了一次zk,当前在线的服务器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑选一台当前在线的服务器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次请求挑选的服务器为:" + server);           // 客户端socket请求 Socket socket = new Socket(hostname, port); OutputStream out = socket.getOutputStream();// 网络输出流:向服务端发送请求。 InputStream in = socket.getInputStream();// 网络输入流:接受服务端发回的请求。 out.write("haha".getBytes()); out.flush(); byte[] buf = new byte[256]; int read = in.read(buf); System.out.println("服务器响应的时间为:" + new String(buf, 0, read)); out.close(); in.close(); socket.close(); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 构造zk连接对象 consumer.connectZK(); // 查询在线服务器列表 consumer.getOnlineServers(); // 处理业务(向一台服务器发送时间查询请求) consumer.sendRequest(); } }
  • 相关阅读:
    android studio下生成jni头文件
    ndk编译android的lame库
    hbuilder在android手机里用chrome调试,只显示了设备名称,却没有inspect按钮
    mac下升级terminal/终端的subversion版本方法
    mac下编译optool方法
    ubuntu16.04 64位server安装php7
    Ubuntu系统启用Apache Mod_rewrite模块
    cakephp之查询
    Ubuntu 中搭建 LAMP 及 php 开发工具
    [JS]视频总结-第四部分_JavaScript案例-定时器的使用
  • 原文地址:https://www.cnblogs.com/arjenlee/p/9628753.html
Copyright © 2011-2022 走看看