Zookeeper 源码(七)请求处理
以单机启动为例讲解 Zookeeper 是如何处理请求的。先回顾一下单机时的请求处理链。
// 单机包含 3 个请求链:PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
请求的调用链如下:
PrepRequestProcessor.processRequest() <- ZooKeeperServer.submitRequest() <- ZooKeeperServer.processPacket() <- NettyServerCnxn.receiveMessage() <- CnxnChannelHandler.processMessage() <- CnxnChannelHandler.messageReceived()
RequestProcessor 接口
public interface RequestProcessor {
public static class RequestProcessorException extends Exception {
public RequestProcessorException(String msg, Throwable t) {
super(msg, t);
}
}
// 处理请求
void processRequest(Request request) throws RequestProcessorException;
// 关闭当前及子处理器,处理器可能是线程
void shutdown();
}
一、PrepRequestProcessor
PrepRequestProcessor 是服务器的请求预处理器,能够识别出当前客户端是否是事务请求,对于事务请求,进行一系列预处理,如创建请求事务头,事务体,会话检查,ACL 检查等。
(1) PrepRequestProcessor 构造函数
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// 已提交请求队列
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
// 下个处理器
private final RequestProcessor nextProcessor;
// Zookeeper 服务器
ZooKeeperServer zks;
public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
// 初始化线程
super("ProcessThread(sid:" + zks.getServerId() + " cport:"
+ zks.getClientPort() + "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;
this.zks = zks;
}
}
说明:类的核心属性有 submittedRequests 和 nextProcessor,前者表示已经提交的请求,而后者表示提交的下个处理器。
(2) RequestProcessor 接口实现
// 接收请求
public void processRequest(Request request) {
submittedRequests.add(request);
}
// 关闭线程
public void shutdown() {
LOG.info("Shutting down");
submittedRequests.clear();
submittedRequests.add(Request.requestOfDeath);
nextProcessor.shutdown();
}
既然请求都提交到 submittedRequests 中了,必然有地方消费 submittedRequests,下面看一下线程的处理过程。
(3) run(核心)
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) { // 请求类型为 PING
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (Request.requestOfDeath == request) { // 结束线程
break;
}
pRequest(request); // 处理请求(核心)
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
说明:run 函数是对 Thread 类 run 函数的重写,其核心逻辑相对简单,即不断从队列中取出 request 进行处理,其会调用 pRequest 函数,while 自旋这样做的好处是充分利用 CPU,避免线程频繁切换线程。
二、SyncRequestProcessor
在分析了 PrepRequestProcessor 处理器后,接着来分析 SyncRequestProcessor,该处理器将请求存入磁盘,其将请求批量的存入磁盘以提高效率,请求在写入磁盘之前是不会被转发到下个处理器的。
SyncRequestProcessor 除了会定期的把 request 持久化到本地磁盘,同时他还要维护本机的 txnlog 和 snapshot,这里的基本逻辑是:
- 每隔 snapCount/2 个 request 会重新生成一个 snapshot 并滚动一次 txnlog,同时为了避免所有的 zookeeper server 在同一个时间生成 snapshot 和滚动日志,这里会再加上一个随机数,snapCount 的默认值是 10w 个 request
(1) 重要属性
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
private final ZooKeeperServer zks;
// queuedRequests 接收外界传递的请求队列
private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
private final RequestProcessor nextProcessor;
// 快照处理线程
private Thread snapInProcess = null;
volatile private boolean running;
// 等待被刷新到磁盘的请求队列
private final LinkedList<Request> toFlush = new LinkedList<Request>();
private final Random r = new Random(System.nanoTime());
// 快照个数
private static int snapCount = ZooKeeperServer.getSnapCount();
// 关闭线程
private final Request requestOfDeath = Request.requestOfDeath;
}
(2) run(核心方法)
public void run() {
try {
// 1. 初始化,日志数量为 0
int logCount = 0;
// 确保所有的服务器在同一时间不是使用的同一个快照
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
// 2. 没有需要刷新到磁盘的请求,则 take 取出数据,会阻塞
if (toFlush.isEmpty()) {
si = queuedRequests.take();
// 3. 有则 poll 取出数据,不会阻塞
} else {
si = queuedRequests.poll();
// 没有请求则先将已有的请求刷新到磁盘
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 4. 将请求添加至日志文件,只有事务性请求才会返回 true
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// 4.1 生成滚动日志 roll the log
zks.getZKDatabase().rollLog();
// 4.2 生成快照日志,如果 snapInProcess 线程仍在进行快照则忽略本次快照
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
// 5. 查看此时 toFlush 是否为空,如果为空,说明近段时间读多写少,直接交给下一个处理器处理
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
(3) flush(刷新到磁盘)
private void flush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException {
if (toFlush.isEmpty())
return;
// 1. 提交至 ZK 数据库
zks.getZKDatabase().commit();
// 2. 将所有的请求提交到下个处理器处理
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
// 刷新到磁盘
((Flushable)nextProcessor).flush();
}
}
说明:该函数主要用于将toFlush队列中的请求刷新到磁盘中。
三、FinalRequestProcessor
FinalRequestProcessor 负责把已经 commit 的写操作应用到本机,对于读操作则从本机中读取数据并返回给 client,这个 processor 是责任链中的最后一个
FinalRequestProcessor 是一个同步处理的 processor,主要的处理逻辑就在方法 processRequest 中:
- 如果 request.hdr != null,则表明 request 是写操作,则调用 zks.processTxn(hdr, txn) 来把 request 关联的写操作执行到内存状态中
- 如果是写操作,则调用 zks.getZKDatabase().addCommittedProposal(request);
把 request 加入到 ZKDatabase.committedLog 队列中,这个队列主要是为了快速和 follower 同步而保留的 - 为各类操作准备响应数据,对于写操作则根据 processTxn 的结果来回复,如果是读操作,则读取内存中的状态
- 发送响应数据给 client
processRequest 的处理逻辑非常长,我们一点点分析。
(1) 处理事务请求
public void processRequest(Request request) {
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
// 1. 请求委托 ZookeeperServer 处理,zks 会针对事务和非事务请求会分别处理
rc = zks.processTxn(request);
// 2. request.hdr!=null 则是事务请求,即写操作,outstandingChanges 保存有所有的事务请求记录
// PrepRequestProcessor 会将事务请求添加到集合中,FinalRequestProcessor 则事务请求已经处理完毕需要移除
if (request.getHdr() != null) {
// 事务请求头
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
long zxid = hdr.getZxid();
// zk 有严格的执行顺序,如果小于 zxid 则认为已经处理完毕
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
}
// 3. 如果是事务请求,则把 request 加入到 ZKDatabase.committedLog 队列中
if (request.isQuorum()) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
}
processRequest 将请求委托给了 zk 处理,我们看一下 ZookeeperServer 是如何处理请求的。
public ProcessTxnResult processTxn(Request request) {
return processTxn(request, request.getHdr(), request.getTxn());
}
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
Record txn) {
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
// 写操作(事务请求)
rc = getZKDatabase().processTxn(hdr, txn);
} else {
// 读操作(非事务请求)
rc = new ProcessTxnResult();
}
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
} else if (request != null && request.isLocalSession()) {
request.request.rewind();
int timeout = request.request.getInt();
request.request.rewind();
sessionTracker.addSession(request.sessionId, timeout);
} else {
LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
(2) 请求响应
// 1. 对于写操作(事务请求)根据 processTxn() 的结果来获取响应数据
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
break;
}
// 2. 对于读操作(非事务请求)从内存数据库中获取响应数据
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// 直接从内存数据库中获取响应数据
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
参考:
- 《Zookeeper请求处理》:https://www.cnblogs.com/leesf456/p/6140503.html
- 《【Zookeeper】源码分析之请求处理链(二)之PrepRequestProcessor》:https://www.cnblogs.com/leesf456/p/6412843.html
- 《【Zookeeper】源码分析之请求处理链(三)之SyncRequestProcessor》:https://www.cnblogs.com/leesf456/p/6438411.html
- 《【Zookeeper】源码分析之请求处理链(四)之FinalRequestProcessor》:https://www.cnblogs.com/leesf456/p/6472496.html
- 从 Paxos 到 Zookeeper : 分布式一致性原理与实践
每天用心记录一点点。内容也许不重要,但习惯很重要!