zoukankan      html  css  js  c++  java
  • zookeeper原理之投票的网络通信流程

    通信流程图:

     

    接收数据 Notification 和发送 ToSend
    ToSender 
    Notification
    leader; 被推荐的服务器 sid 
    zxid; 被推荐的服务器当前最新的事务 id 
    peerEpoch; 被推荐的服务器当前所处的 epoch 
    electionepoch; 当前服务器所处的 epoch
    stat 当前服务器状态 
    sid 接收消息的服务器 sid(myid)
    leader; //被推荐的服务器 sid
    zxid; 被推荐的服务器最新事务 id
    peerEpoch; 被推荐的服务器当前所处的 epoch
    electionEpoch 选举服务器所处的 epoch
    stat; 选举服务器当前的状态
    sid; 选举服务器的 sid 
    通信过程源码分析
    每个 zk 服务启动后创建 socket 监听
    protected Election createElectionAlgorithm(int electionAlgorithm){
    	//….
    	case 3:
    		qcm = createCnxnManager();
    		QuorumCnxManager.Listener listener = 
    		qcm.listener;
    		if(listener != null){
    			listener.start();
    		}
    	// 启动监听listener 实现了线程,所以在 run 方法中可以看到构建ServerSocket 的请求,这里专门用来接收其他zkServer
    	// 的投票请求
    	// 这块后续再分析
    	@Override
    	public void run() {
    			int numRetries = 0;
    			InetSocketAddress addr;
    			while((!shutdown) && (numRetries < 3)){
    				try {
    					ss = new ServerSocket();
    				}
    			}
    		}
    	}
    
    FastLeaderElection.lookForLeader
    这个方法在前面分析过,里面会调用 sendNotifications 来发送投票请求
    public Vote lookForLeader() throws InterruptedException {
    	 //省略部分代码
    	 sendNotifications(); //这个方法,会把当前zk 服务器的信息添加到 sendqueue
    	 /*
    	 * Loop in which we exchange 
    	notifications until we find a leader
    	 */
    	 while ((self.getPeerState() == ServerState.LOOKING) &&
    	 //省略部分代码
    }
    
    FastLeaderElection.sendqueue
    sendQueue 这个队列的数据,是通过 WorkerSender 来进行获取并发送的。而这个 WorkerSender 线程,在构建 fastLeaderElection 时,会启动
    class WorkerSender extends ZooKeeperThread {
    	public void run() {
    		while (!stop) {
    			try {//从队列中获取 ToSend 对象
    				ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
    				if(m == null) continue;
    				process(m);
    				//省略部分代码
    				void process(ToSend m) {
    					ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
    					m.leader, m.zxid,
    					m.electionEpoch,
    					m.peerEpoch);
    					manager.toSend(m.sid, requestBuffer); // 这里就是调用 QuorumCnxManager
    														// 进行消息发送
    				}
    			}
    		}
    	}
    }
    QuorumCnxManager.toSend
    public void toSend(Long sid, ByteBuffer b) {
    
    		if (this.mySid == sid) { // 如果接受者是自己,直接放置到接收队列
    			b.position(0);
    			addToRecvQueue(new Message(b.duplicate(), sid));
    		} else { // 否则发送到对应的发送队列上
    			ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
    			// 判断当前的 sid 是否已经存在于发送队列,如果是,则直接把已经存在的数据发送出去
    			ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
    			if (bqExisting != null) {
    				addToSendQueue(bqExisting, b);
    			} else {
    				addToSendQueue(bq, b);
    			}
    			connectOne(sid); // 连接申请调用链 connectOne-->initiateConnection-
    								// ->startConnection , startConnection 就是发送方启动入口
    		}
    	}
    startConnection
    private boolean startConnection(Socket sock, Long sid) {
    		// 省略部分代码
    		if (sid > this.mySid) {
    			// 为了防止重复建立连接,只允许 sid 大的主动连接 sid 小的
    			closeSocket(sock);
    		} else {
    			// 构建一个发送线程和接收线程,负责针对当前连接的数据传递,后续的逻辑比较简单,就不做分析
    			SendWorker sw = new SendWorker(sock, sid);
    			RecvWorker rw = new RecvWorker(sock, din, sid, sw);
    			sw.setRecv(rw);
    		}
    	}
    SendWorker 会监听对应 sid 的阻塞队列,启动的时候回如果队列为空时会重新发送一次最前最后的消息,以防上一次处理是服务器异常退出,造成上一条消息未处理成功;然后就是不停监听队里,发现有消息时调用send 方法RecvWorker:RecvWorker 不停监听 socket 的 inputstream,读取消息放到消息接收队列中,消息放入队列中,qcm 的流程就完毕了。
    QuorumCnxManager.Listener
    listener 监听到客户端请求之后,开始处理消息
    public void run() {
    		// 省略部分代码
    		while (!shutdown) {
    			Socket client = ss.accept();
    			setSockOpts(client);
    			LOG.info("Received connection request" + client.getRemoteSocketAddress());
    			if (quorumSaslAuthEnabled) {
    				receiveConnectionAsync(client);
    			} else {
    				receiveConnection(client); // 接收客户端请求
    			}
    		}
    	}
    
    QuorumCnxManager.receiveConnection
    public void receiveConnection(final Socket sock) {
    		DataInputStream din = null;
    		try {
    			// 获取客户端的数据包
    			din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
    			handleConnection(sock, din);// 调用 handle 进行处理
    		} catch (IOException e) {
    			LOG.error("Exception handling connection, addr: {}, closing server connection",
    					sock.getRemoteSocketAddress());
    			closeSocket(sock);
    		}
    	}
    
    handleConnection
    private void handleConnection(Socket sock, DataInputStream din)throws IOException {
    			 Long sid = null;
    			 try {
    				 //获取客户端的 sid,也就是 myid
    				 sid = din.readLong();
    			 	if (sid < 0) { 
    			 		sid = din.readLong();
    			 		if (sid < this.mySid) {
    			 			//为了防止重复建立连接,只允许 sid 大的主动连接 sid 小的
    			 			SendWorker sw = senderWorkerMap.get(sid);
    			 			if (sw != null) {
    			 				sw.finish();//关闭连接
    			 			}
    			 			LOG.debug("Create new connection to server: " + sid);
    			 			closeSocket(sock);//关闭连接
    			 			connectOne(sid);//向 sid 发起连接
    			 		} else {//同样,构建一个 SendWorker 和RecvWorker 进行发送和接收数据
    			 			SendWorker sw = new 
    			 			SendWorker(sock, sid);
    			 			RecvWorker rw = new 
    			 			RecvWorker(sock, din, sid, sw);
    			 			sw.setRecv(rw);
    			 		}
    			 	}
    			 }
    		}
    
  • 相关阅读:
    PHP is_float()、 is_double()、is_real()函数
    自动驾驶关键技术分解和流程
    自动驾驶行业内时间表和技术解析
    自动驾驶架构与实现路径
    ADAS单目摄像头行驶区域环境光检测图片标注
    多目标检测整合算法
    道路场景语义分割算法
    TTC测距算法
    TSR交通标志检测与识别
    Mobileye_EyeQ4功能和性能
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13540997.html
Copyright © 2011-2022 走看看