zoukankan      html  css  js  c++  java
  • zookeeper 的心跳

    假定:主机 A, B 通过 tcp 连接发送数据,如果拔掉 A 主机的网线,B 是无法感知到的。但是如果 A 定时给 B 发送心跳,则能根据心跳的回复来判断连接的状态。

    以 zookeeper 为例:zk client 会记录上一次发送数据的时间(lastSend)和上一次接收数据的时间(lastHeard),zk client 给 server 发送心跳(ping),这些心跳和其他命令一起发送给 zk server,如果 zk client 发现好长的时间没有接收到数据,认为超时,则断开与 server 的连接,并重连服务器。

    // zookeeper 3.3.3
    // void org.apache.zookeeper.ClientCnxn.SendThread.run()
    public void run() {
        long now = System.currentTimeMillis();
        long lastHeard = now;
        long lastSend = now;
        
        // 这里的 zooKeeper 是客户端
        while (zooKeeper.state.isAlive()) {
            try {
                if (sockKey == null) {
                    // don't re-establish connection if we are closing
                    if (closing) {
                        break;
                    }
               // 连接 zk server
                    startConnect();
                    lastSend = now;
                    lastHeard = now;
                }
                int idleRecv = (int) (now - lastHeard);
                int idleSend = (int) (now - lastSend);
                int to = readTimeout - idleRecv;
                if (zooKeeper.state != States.CONNECTED) {
                    to = connectTimeout - idleRecv;
                }
                // 接收数据超时,抛异常,异常会在后面的 catch 块中处理
                if (to <= 0) {
                    throw new SessionTimeoutException(
                            "Client session timed out, have not heard from server in "
                            + idleRecv + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                if (zooKeeper.state == States.CONNECTED) {
                    int timeToNextPing = readTimeout/2 - idleSend;
                    // 发送 ping 命令(心跳),更新 lastSend
                    if (timeToNextPing <= 0) {
                        sendPing();
                        lastSend = now;
                        enableWrite();
                    } else {
                        if (timeToNextPing < to) {
                            to = timeToNextPing;
                        }
                    }
                }
    
                selector.select(to);
                Set<SelectionKey> selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                // Everything below and until we get back to the select is
                // non blocking, so time is effectively a constant. That is
                // Why we just have to do this once, here
                now = System.currentTimeMillis();
                for (SelectionKey k : selected) {
                    SocketChannel sc = ((SocketChannel) k.channel());
                    if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                        if (sc.finishConnect()) {
                            lastHeard = now;
                            lastSend = now;
                            primeConnection(k);
                        }
                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        if (outgoingQueue.size() > 0) {
                            // We have something to send so it's the same
                            // as if we do the send now.
                            lastSend = now;
                        }
                        if (doIO()) {
                            lastHeard = now;
                        }
                    }
                }
                if (zooKeeper.state == States.CONNECTED) {
                    if (outgoingQueue.size() > 0) {
                        enableWrite();
                    } else {
                        disableWrite();
                    }
                }
                selected.clear();
            } catch (Exception e) {
                if (closing) {
                    if (LOG.isDebugEnabled()) {
                        // closing so this is expected
                        LOG.debug("An exception was thrown while closing send thread for session 0x"
                                + Long.toHexString(getSessionId())
                                + " : " + e.getMessage());
                    }
                    break;
                } else {
                    // this is ugly, you have a better way speak up
                    if (e instanceof SessionExpiredException) {
                        LOG.info(e.getMessage() + ", closing socket connection");
                    } else if (e instanceof SessionTimeoutException) {
                        LOG.info(e.getMessage() + RETRY_CONN_MSG);
                    } else if (e instanceof EndOfStreamException) {
                        LOG.info(e.getMessage() + RETRY_CONN_MSG);
                    } else {
                        LOG.warn("Session 0x"
                                + Long.toHexString(getSessionId())
                                + " for server "
                                + ((SocketChannel)sockKey.channel())
                                    .socket().getRemoteSocketAddress()
                                + ", unexpected error"
                                + RETRY_CONN_MSG,
                                e);
                    }
                    // 断开连接
                    cleanup();
                    if (zooKeeper.state.isAlive()) {
                        eventThread.queueEvent(new WatchedEvent(
                                Event.EventType.None,
                                Event.KeeperState.Disconnected,
                                null));
                    }
    
                    now = System.currentTimeMillis();
                    lastHeard = now;
                    lastSend = now;
                }
            }
        }
        cleanup();
        try {
            selector.close();
        } catch (IOException e) {
            LOG.warn("Ignoring exception during selector close", e);
        }
        if (zooKeeper.state.isAlive()) {
            eventThread.queueEvent(new WatchedEvent(
                    Event.EventType.None,
                    Event.KeeperState.Disconnected,
                    null));
        }
        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                                 "SendThread exitedloop.");
    }

     zk server 对 session 也会有一个跟踪,它也会关掉超时的 session,具体逻辑在

    void org.apache.zookeeper.server.SessionTrackerImpl.run()

    zk server 每收到一个请求,就会触发 touchSession

  • 相关阅读:
    PHP之简单实现MVC框架
    socket泄露的问题
    gdb 调试多线程
    MMAP和DIRECT IO区别
    三年回首:C基础
    定时器管理:nginx的红黑树和libevent的堆
    strsep和strtok_r替代strtok
    缓存穿透和缓存失效
    mmap为什么比read/write快(兼论buffercache和pagecache)
    B+Tree和MySQL索引分析
  • 原文地址:https://www.cnblogs.com/allenwas3/p/9163929.html
Copyright © 2011-2022 走看看