zoukankan      html  css  js  c++  java
  • 单机器搭建 zk 集群

    在一台机器上配置 2 节点的 zk 集群,zk1 和 zk2 的 serverid 分别为 1 和 2,本机 ip 是 192.168.40.1

    zk1 相关配置:

    dataDir=E:/test/zk_project/data
    dataLogDir=E:/test/zk_project/log
    clientPort=2181
    server.1=192.168.40.1:2888:3888
    server.2=192.168.40.1:2889:3889

    zk2 相关配置:

    dataDir=E:/test/zk_project_r/data
    dataLogDir=E:/test/zk_project_r/log
    clientPort=2182
    server.1=192.168.40.1:2888:3888
    server.2=192.168.40.1:2889:3889

    配置集群,重要就是 ip 和 端口的配置,以 zk1 为例,2888 端口是它成为 leader 后,follower 连接这个端口传输数据,而 3888 端口是选举端口,集群中的节点连接到这个端口发送投票信息。myid 文件放在 dataDir 目录中,配置 server 的 id。

    public static class QuorumServer {
        public QuorumServer(long id, InetSocketAddress addr,
                InetSocketAddress electionAddr) {
            this.id = id;
            this.addr = addr;
            this.electionAddr = electionAddr;
        }
    
        // 成为 leader 后的地址,以 zk1 为例,即 192.168.40.1:2888
        public InetSocketAddress addr;
        // 参与选举的地址,以 zk1 为例,即 192.168.40.1:3888
        public InetSocketAddress electionAddr;
        public long id;
        public LearnerType type = LearnerType.PARTICIPANT;
    }

    分别定位到了使用这两个端口的代码。

    连接选举地址的代码:

    // void org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(long sid)
    synchronized void connectOne(long sid){
        if (senderWorkerMap.get(sid) == null){
            InetSocketAddress electionAddr;
            if (self.quorumPeers.containsKey(sid)) {
                electionAddr = self.quorumPeers.get(sid).electionAddr;
            } else {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
            try {
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Opening channel to server " + sid);
                }
                Socket sock = new Socket();
                setSockOpts(sock);
                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connected to server " + sid);
                }
                initiateConnection(sock, sid);
            } catch (UnresolvedAddressException e) {
                // Sun doesn't include the address that causes this
                // exception to be thrown, also UAE cannot be wrapped cleanly
                // so we log the exception in order to capture this critical
                // detail.
                LOG.warn("Cannot open channel to " + sid
                        + " at election address " + electionAddr, e);
                throw e;
            } catch (IOException e) {
                LOG.warn("Cannot open channel to " + sid
                        + " at election address " + electionAddr,
                        e);
            }
        } else {
            LOG.debug("There is a connection already for server " + sid);
        }
    }

    连接 leader 的代码:

    // void org.apache.zookeeper.server.quorum.Learner.connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException
    protected void connectToLeader(InetSocketAddress addr) 
    throws IOException, ConnectException, InterruptedException {
        sock = new Socket();        
        sock.setSoTimeout(self.tickTime * self.initLimit);
        for (int tries = 0; tries < 5; tries++) {
            try {
                sock.connect(addr, self.tickTime * self.syncLimit);
                sock.setTcpNoDelay(nodelay);
                break;
            } catch (IOException e) {
                if (tries == 4) {
                    LOG.error("Unexpected exception",e);
                    throw e;
                } else {
                    LOG.warn("Unexpected exception, tries="+tries+
                            ", connecting to " + addr,e);
                    sock = new Socket();
                    sock.setSoTimeout(self.tickTime * self.initLimit);
                }
            }
            Thread.sleep(1000);
        }
        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
                sock.getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }   

    当前 zk 监听选举端口的代码:

    // void org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.run()
    public void run() {
        int numRetries = 0;
        InetSocketAddress addr;
        while((!shutdown) && (numRetries < 3)){
            try {
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                if (self.getQuorumListenOnAllIPs()) {
                    int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                    addr = new InetSocketAddress(port);
                } else {
                    addr = self.quorumPeers.get(self.getId()).electionAddr;
                }
                LOG.info("My election bind port: " + addr.toString());
                setName(self.quorumPeers.get(self.getId()).electionAddr
                        .toString());
                ss.bind(addr);
                while (!shutdown) {
                    Socket client = ss.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request "
                            + client.getRemoteSocketAddress());
                    receiveConnection(client);
                    numRetries = 0;
                }
            } catch (IOException e) {
                LOG.error("Exception while listening", e);
                numRetries++;
                try {
                    ss.close();
                    Thread.sleep(1000);
                } catch (IOException ie) {
                    LOG.error("Error closing server socket", ie);
                } catch (InterruptedException ie) {
                    LOG.error("Interrupted while sleeping. " +
                              "Ignoring exception", ie);
                }
            }
        }
        LOG.info("Leaving listener");
        if (!shutdown) {
            LOG.error("As I'm leaving the listener thread, "
                    + "I won't be able to participate in leader "
                    + "election any longer: "
                    + self.quorumPeers.get(self.getId()).electionAddr);
        }
    }

    当前 zk 成为 leader 后,监听端口的代码:

    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        try {
            if (self.getQuorumListenOnAllIPs()) {
                ss = new ServerSocket(self.getQuorumAddress().getPort());
            } else {
                ss = new ServerSocket();
            }
            ss.setReuseAddress(true);
            if (!self.getQuorumListenOnAllIPs()) {
                ss.bind(self.getQuorumAddress());
            }
        } catch (BindException e) {
            if (self.getQuorumListenOnAllIPs()) {
                LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
            } else {
                LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
            }
            throw e;
        }
        this.zk=zk;
    }
  • 相关阅读:
    drf-通过drf-extensions扩展来实现缓存
    social_django第三方登录 没有token解决方法
    python-项目日志配置使用
    drf-支付宝支付
    git 相关命令
    django第三方登录与邮箱验证流程
    django项目部署
    数组中的方法
    滚动到页面底部,更新数据
    图片卷边
  • 原文地址:https://www.cnblogs.com/allenwas3/p/9460347.html
Copyright © 2011-2022 走看看