zoukankan      html  css  js  c++  java
  • Zookeeper(一)客户端

    Zookeeper-客户端

    例子:

    // org.apache.zookeeper.ZooKeeperMain
    public class ZooKeeperMain {
        public static void main(String args[]) throws CliException, IOException, InterruptedException {
        	//1. 初始化zk配置,并建立连接
            ZooKeeperMain main = new ZooKeeperMain(args);
            //2. 一直等待控制台读入命令行 并执行
            main.run();
        }
    	public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
            //1.1 连接配置解析
            cl.parseOptions(args);
            System.out.println("Connecting to " + cl.getOption("server"));
            connectToZK(cl.getOption("server"));
        }
        //1.2 建立连接
        protected void connectToZK(String newHost) throws InterruptedException, IOException {
            //连接已经存在 关闭连接 重新创建
            if (zk != null && zk.getState().isAlive()) {
                zk.close();
            }
            host = newHost;
            boolean readOnly = cl.getOption("readonly") != null;
            if (cl.getOption("secure") != null) {
                System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
                System.out.println("Secure connection is enabled");
            }
            zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
        }
        //1.3 自定义监听器
        private class MyWatcher implements Watcher {
            public void process(WatchedEvent event) {
                if (getPrintWatches()) {
                    ZooKeeperMain.printMessage("WATCHER::");
                    ZooKeeperMain.printMessage(event.toString());
                }
            }
        }
    }
    

    问题

    1. zk怎么体现最终一致性:
    2. zk的监控在客户端和服务端的连接过程中起到什么作用:节点更新,服务端通知客户端,客户端调用回调方法处理
    3. zk对节点的原子操作是怎么体现的:版本控制,节点内部维护三种版本
    4. 客户端与服务端连接会话中的各个状态下 客户端处理什么样的事情

    基本功能

    1. 以树形结构存储数据,叶子节点可以存储数据

      • 文件系统
      • 配置管理
      • 命名服务
    2. 当某个节点的子节点变更,连接在这个节点的client可以实时监听到变化

      • 集群管理
    3. client对节点操作时,是原子操作

      • 远程锁:分布式锁

    基本术语

    • States客户端状态:
    public enum States {
        CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
        CLOSED, AUTH_FAILED, NOT_CONNECTED;
        public boolean isAlive() {
            return this != CLOSED && this != AUTH_FAILED;
        }
        public boolean isConnected() {
            return this == CONNECTED || this == CONNECTEDREADONLY;
        }
    }
    
    • Packet传给服务端的数据包
    //ClientCnxn内部定义的一个堆协议层的封装,用作zk中请求和响应的载体;
    static class Packet {
        
    }
    
    • ClientCnxnSocket底层与服务端通信类
    //真正与服务端连接的抽象类;有两个子类分别使用jdk.nio/netty.nio实现会话操作
    abstract class ClientCnxnSocket {
        abstract void connect(InetSocketAddress addr) throws IOException;
        //会从outgoingQueue中取出一个可发送的Packet对象,
        //同时生成一个客户端请求序号XID并将其设置到Packet请求头中去,
        //然后序列化后再发送,请求发送完毕后,会立即将该Packet保存到pendingQueue中,
        //以便等待服务端响应返回后进行相应的处理。
        abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
                ClientCnxn cnxn) throws IOException, InterruptedException;
    }
    
    • WatchedEvent监控事件
    //包含发生的事件,zookeeper当前状态信息,事件涉及的节点路径
    public class WatchedEvent {
        final private KeeperState keeperState;
        final private EventType eventType;
        private String path;
    }
    
    • Watcher:事件处理类的基本父类
    • KeeperState:Event事件中Zookeeper可能存在的所有状态
    • EventType:Zookeeper中各种Event类型
    • WatcherType:
    //内部包含两个类Event,WatchType
    public interface Watcher {
        
    }
    

    与服务端建立连接

    ZooKeeperAdmin
    //主要用于集群的管理任务,如重配置集群成员;
    @InterfaceAudience.Public
    public class ZooKeeperAdmin extends ZooKeeper {
    	//ZooKeeperAdmin构造器最终调用父类构造器
    	public ZooKeeperAdmin(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
            super(connectString, sessionTimeout, watcher, canBeReadOnly);
        }
    }
    
    Zookeeper:客户端
    //功能:1.初始化服务客户端连接服务端 1).创建客户端对象 2).启动客户端内部线程
    //	   2.提供操作数据功能 	      1).向服务端发送请求
    @InterfaceAudience.Public
    public class ZooKeeper implements AutoCloseable {
    	protected final ClientCnxn cnxn;
        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly, HostProvider aHostProvider)
                throws IOException {
            this(connectString, sessionTimeout, watcher, canBeReadOnly,
                    aHostProvider, null);
        }
        public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly, HostProvider aHostProvider,
                ZKClientConfig clientConfig) throws IOException {
            LOG.info("Initiating client connection, connectString=" + connectString
                    + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
            if (clientConfig == null) {
                clientConfig = new ZKClientConfig();
            }
            this.clientConfig = clientConfig;
            watchManager = defaultWatchManager();
            watchManager.defaultWatcher = watcher;
            //解析连接ip:port
            ConnectStringParser connectStringParser = new ConnectStringParser(
                    connectString);
            hostProvider = aHostProvider;
            //1. 创建管理连接的客户端 ChrootPath为客户端自定义的路径头
            cnxn = createConnection(connectStringParser.getChrootPath(),
                    hostProvider, sessionTimeout, this, watchManager,
                    getClientCnxnSocket(), canBeReadOnly);
            //2. 启动客户端内部线程        
            cnxn.start();
        }
        protected ClientCnxn createConnection(String chrootPath,
                HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
                ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
                boolean canBeReadOnly) throws IOException {
            return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
                    watchManager, clientCnxnSocket, canBeReadOnly);
        }
    }
    
    ClientCnxn
    //维护服务端和客户端之间的网络连接,并进行一系列的网络通信:维护一个可用服务器的列表,当某客户端需要时可透明的切换服务
    public class ClientCnxn {
        final SendThread sendThread;
        final EventThread eventThread;
        //客户端可以连接的服务端地址集合
        private final HostProvider hostProvider;
        //需要发送给服务端的数据包:最终通过SendThread调用clientCnxnSocket.doTransport发送给服务端
        private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
        //已经发送给服务端但还未得到响应的数据包集合
        private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
        
        public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
                long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
            this.zooKeeper = zooKeeper;
            this.watcher = watcher;
            this.sessionId = sessionId;
            this.sessionPasswd = sessionPasswd;
            this.sessionTimeout = sessionTimeout;
            this.hostProvider = hostProvider;
            this.chrootPath = chrootPath;
            connectTimeout = sessionTimeout / hostProvider.size();
            readTimeout = sessionTimeout * 2 / 3;
            readOnly = canBeReadOnly;
            sendThread = new SendThread(clientCnxnSocket);                                           eventThread = new EventThread();
            this.clientConfig = zooKeeper.getClientConfig();
            initRequestTimeout();
        }
        public void start() {
            // 0.建立连接会话 1.sasl验证 startConnect
            // 2.创建监听事件到事件队列中 3.保持心跳
            sendThread.start();
            // 处理事件队列中的事件
            eventThread.start();
        }
    }
    
    
    SendThread
    1. 维护了客户端与服务端之间的会话生命周期(通过一定周期频率内向服务端发送PING包检测心跳),如果会话周期内客户端与服务端出现TCP连接断开,那么就会自动且透明地完成重连操作。
    2. 管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。
    3. 将来自服务端的事件传递给EventThread去处理。
     //ClientCnxn内部类:为传出请求队列服务并生成心跳
    class SendThread extends ZooKeeperThread {
        private final ClientCnxnSocket clientCnxnSocket;
        private InetSocketAddress rwServerAddress = null;
        SendThread(ClientCnxnSocket clientCnxnSocket) {
            super(makeThreadName("-SendThread()"));
            state = States.CONNECTING;
            this.clientCnxnSocket = clientCnxnSocket;
            setDaemon(true);
        }
        @Override
        public void run() {
            //赋值
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            //更新时间
            clientCnxnSocket.updateNow();
            //更新上一次发送和接收的时间
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            //设置最大心跳ping间隔 10s
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;
            while (state.isAlive()) {
                try {
                    //一开始 客户端还没连接上服务端 尝试初始化sasl认证并且连接服务端
                    if (!clientCnxnSocket.isConnected()) {
                        // 与服务端连接断开时 不再重建会话 直接跳出循环
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            //从服务端可连接集合中获取下一个地址 如果全部尝试过 则等待1s
                            serverAddress = hostProvider.next(1000);
                        }
                        //clientCnxnSocket作为底层与服务端通信的类
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                    //后来连接上了 说明认证也已经初始化好了 通过发送认证包给服务建立验证
                    if (state.isConnected()) {
                        // 确认是否需要发送认证失败事件
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    //向服务端发送当前客户端sasl认证初始化请求
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                    LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            //获得zk的sasl认证状态
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    //与服务端进行身份验证时发生错误 状态更改且需要发送认证事件
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    //验证通过
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }
                            //是否需要发送认证事件
                            if (sendAuthEvent) {
                                // 生成相应的事件 并放入事件队列中
                                eventThread.queueEvent(new WatchedEvent(
                                    Watcher.Event.EventType.None,
                                    authState,null));
                                if (state == States.AUTH_FAILED) {
                                    eventThread.queueEventOfDeath();
                                }
                            }
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small
                        //防止丢失 所有有下一次ping
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() 			- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            //保持和服务端的心跳 发送ping
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
                    // 如果当前是读写模式 则寻找读写服务器 todo
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
    				//确保所有前提都满足 
                    //取出等待队列的头部发送给服务端并从队列中移除 并将其保存到pendingQueue中
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        if (LOG.isDebugEnabled()) {
                            // closing so this is expected
                            LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId())  + " : " + e.getMessage());
                        }
                        break;
                    } else {
                       //。。。。一堆抛出错误
                       //根据连接状态处理当前仍旧往队列中投放的事件
                       cleanAndNotifyState();
                    }
                }
            }
            synchronized (state) {
                //清除当前队列中所有等待的事件 不做处理
                cleanup();
            }
           	//当连接失效 主动关闭和服务端的连接
            clientCnxnSocket.close();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                              Event.KeeperState.Disconnected, null));
            }
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                                                    Event.KeeperState.Closed, null));
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
              "SendThread exited loop for session: 0x"  + Long.toHexString(getSessionId()));
        }
        //发送心跳
        private void sendPing() {
            lastPingSentNs = System.nanoTime();
            RequestHeader h = new RequestHeader(-2, OpCode.ping);
            queuePacket(h, null, null, null, null, null, null, null, null);
        }
        //和服务端创建连接会话
        private void startConnect(InetSocketAddress addr) throws IOException {
            saslLoginFailed = false;
            //如果之前连接过 则缓1s
            if(!isFirstConnect){
                try {
                    Thread.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected exception", e);
                }
            }
            //连接状态改为正在连接
            state = States.CONNECTING;
            String hostPort = addr.getHostString() + ":" + addr.getPort();
            MDC.put("myid", hostPort);
            //为当前线程设置线程名称
            setName(getName().replaceAll("\(.*\)", "(" + hostPort + ")"));
            //客户端连接是否需要认证 Y:新建认证 如果认证过 断开重新认证
            if (clientConfig.isSaslClientEnabled()) {
                try {
                    if (zooKeeperSaslClient != null) {
                        zooKeeperSaslClient.shutdown();
                    }
                    //初始化客户端sasl验证 sasl状态为初始化initial
                    zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
                                                                  clientConfig);
                } catch (LoginException e) {
                    //在SASL客户端初始化的过程中认证失败了,与和zk服务端连接过程出现的认证失败不同
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it.");
                    //为当前认证失败创建新的监控任务
                    eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);
            //与服务端通信
            clientCnxnSocket.connect(addr);
        }
    }
    
    
    EventThread
    1. 负责客户端的事件处理,并触发客户端注册的Watcher监听。
    2. EventThread中的watingEvents队列用于临时存放那些需要被触发的Object,包括客户端注册的Watcher和异步接口中注册的回调器AsyncCallback。
    3. 同时,EventThread会不断地从watingEvents中取出Object,识别具体类型(Watcher或AsyncCallback),并分别调用process和processResult接口方法来实现对事件的触发和回调。
    //ClientCnxn内部类:无限处理等待队列中的监听事务
    class EventThread extends ZooKeeperThread {
        //等待处理的事件队列
        private final LinkedBlockingQueue<Object> waitingEvents =
        new LinkedBlockingQueue<Object>();
        @Override
        @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
        public void run() {
            try {
                  isRunning = true;
                  while (true) {
                     Object event = waitingEvents.take();
                      //eventOfDeath代表出现了身份认证失败 
                     if (event == eventOfDeath) {
                        wasKilled = true;
                     } else {
                         //核心处理
                        processEvent(event);
                     }
                     if (wasKilled)
                        synchronized (waitingEvents) {
                           if (waitingEvents.isEmpty()) {
                              isRunning = false;
                              break;
                           }
                        }
                  }
               }//省略日志代码
        }
    }
    
    

    由上述事件处理线程的run方法得出问题:

    1. 添加事件:队列中的事件waitingEvents是从哪里添加的

      EventThread内部的queueEvent,queueCallback,queuePacket,queueEventOfDeath

    //客户端访问服务端时使用:如操作节点数据等
    public void queueEvent(WatchedEvent event) {
        queueEvent(event, null);
    }
    private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
        if (event.getType() == EventType.None && sessionState == event.getState()) {
            return;
        }
        sessionState = event.getState();
        final Set<Watcher> watchers;
        if (materializedWatchers == null) {
            // 根据事件信息生成一系列的观察者:由zk实现
            watchers = watcher.materialize(event.getState(),
                    event.getType(), event.getPath());
        } else {
            watchers = new HashSet<Watcher>();
            watchers.addAll(materializedWatchers);
        }
        //将watcher集合和对应的事件组装 执行处理时 循环watchers处理
        WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
        waitingEvents.add(pair);
    }
    //添加异步回调事件:TODO 什么情况下会用到
    public void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
        waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
    }
    //客户端连接出错等情况下使用 TODO
    @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
    public void queuePacket(Packet packet) {
        if (wasKilled) {
            synchronized (waitingEvents) {
                if (isRunning) waitingEvents.add(packet);
                else processEvent(packet);
            }
        } else {
            waitingEvents.add(packet);
        }
    }
    
    
    1. 处理事件:processEvent(event)

      事件类型有三种:WatcherSetEventPair,LocalCallback

      核心就是调用watcher处理

    WatcherSetEventPair pair = (WatcherSetEventPair) event;
    for (Watcher watcher : pair.watchers) {
         watcher.process(pair.event);
    }
    
    
  • 相关阅读:
    Java 蓝桥杯 算法训练 貌似化学
    Java 蓝桥杯 算法训练 貌似化学
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    Java 蓝桥杯 算法训练 字符串的展开 (JAVA语言实现)
    JAVA-蓝桥杯-算法训练-字符串变换
    Ceph:一个开源的 Linux PB 级分布式文件系统
    shell 脚本监控程序是否正在执行, 如果没有执行, 则自动启动该进程
  • 原文地址:https://www.cnblogs.com/hangzhi/p/10782382.html
Copyright © 2011-2022 走看看