多个NIOServerCnxn公用同一个zkServer对象
处理CRUD请求逻辑:
largeRequestThreadshold (请求大小阈值,默认为-1)
zk处理请求时,并不是并行处理。requestThrottler会将请求放到队列submittedRequests(LinkedBlockingQueue)中
requestThrottler为请求限流器,其实也是一个线程
ZK中如何优雅的停止?
请求限流器这类的线程,如果要停止,其实就是使run方法跑完。因为run方法中有while循环逻辑,所以只要使while循环停止,就可以优雅的停止线程。
ZK限流的逻辑:
if (maxRequests > 0) {
while (!killed) {
if (dropStaleRequests && request.isStale()) {
// Note: this will close the connection
// closeSession
dropRequest(request);
ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
request = null;
break;
}
// 真正被处理的请求
if (zks.getInProcess() < maxRequests) {
break;
}
// 超过了则睡眠一会
throttleSleep(stallTime);
}
}
请求处理器链:
PrepRequestsProcessor
SyncRequestProcessor
FinalRequestsProcessor
当两个create请求被处理时,如何既要保证安全性,又要保证一定的高效能性?(安全:加锁,取ChangeRecord容器的节点信息;高效:执行请求操作是异步的,避免同步串行阻塞)
不安全主要有:顺序节点需要序号递增,要考虑线程安全
在前一个create请求进行创建之前,会记录一条ChangeRecord记录,这里记录的是最新的父节点的信息(比如cVersion属性),下一个请求进来时,会直接从ChangeRecord中去取父节点的最新信息。
此时因为上一个请求可能还没来得及修改DataTree,当然也可能没来得及做持久化操作。所以下一个请求只能从ChangeRecord中去取。
当然,在对ChangeRecord进行操作时,要对操作加上同步锁,避免不安全(因为线程执行都是异步的)。
ZK是否绝对可靠?
理论上来说,是可以做到的。因为ZK是采用日志加快照的方式备份数据到磁盘中。恢复的时候会根据最近一次快照加日志文件去恢复原始Datatree。
但是在写日志文件时,因为采用了多次缓存到缓存流,然后一次性写入日志文件。如果在一组日志还未持久化,ZK集群挂了。此时这些还未持久化的日志就无法存盘。导致这部分日志丢失。
ZK中几个RequestProcessor作用解析:
PrepRequestProcessor
1、以线程加队列的方式来处理Request,保证了顺序执行
2、设置了Request的Hdx和Txn
3、根据不同的命令操作类型生成ChangeRecord表示对某一个节点的一次修改。需要注意的是,一个操作可能对应不止一条的ChangeRecord,比如一个create请求就包括了创建当前节点,以及修改父节点的属性。
把ChangeRecord放到OutstandingChanges队列中,OutstandingChanges意思是排队的修改记录。当我们需要create一个节点时,我们需要获取父节点的信息。如果OutstandingChanges中没有父节点信息,则要通过DataBase中拿,影响效率。
4、验证ACL
5、把Request交给nextProcessor处理
SyncRequestProcessor
1、以线程加队列的方式来处理Request,保证了顺序执行
2、把Request中的Hdx和Txn进行持久化【请求持久化:日志:DataTree持久化:快照】
3、单独创建一个线程打快照
4、把Request交给nextProcessor处理
FinalRequestsProcessor
1、根据Request更新ZKDatabase
2、触发Watch
3、发送Response给客户端
else if (toFlush.isEmpty()) {
// toFlush是一个队列,表示需要进行持久化的Request,当Request被持久化了之后,就会把Request从toFlush中移除,直接调用nextProcessor
// 所以如果一个Request的hdr为null,表示不用进行持久化,
// 并且当toFlush队列中有值时不能先执行这个Request请求,得等toFlush中请求都执行完了之后才能执行
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
// 把当前请求添加到待flush队列中
toFlush.add(si); // 写 读
解释:如果队列中还有写请求,则继续讲请求add到队列中待处理。