zookeeper源码分析
下载zookeeper源码,这次git上下载的版本是3.4.13。因为是使用ant构建的,下载到本地后构建比较麻烦。所以取巧的使用了maven关联zookeeper对应版本并且关联maven上对应版本的源码的形式查看源码。这种方式是依赖的源码文件只读,不能在源码上面进行编辑。
相关流程图:
客户端启动,我们查看zkCli.sh 文件中内容发现入口ZooKeeperMain
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh
fi
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS
org.apache.zookeeper.ZooKeeperMain "$@"
查看main方法
public static void main(String args[])
throws KeeperException, IOException, InterruptedException
{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();@6
}
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);//命令行输入参数解析
System.out.println("Connecting to " + cl.getOption("server"));
connectToZK(cl.getOption("server"));//连接到指定服务器
//zk = new ZooKeeper(cl.getOption("server"),
//Integer.parseInt(cl.getOption("timeout")), new MyWatcher());
}
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
zk = new ZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
new MyWatcher(), readOnly);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
...
//服务地址信息加入一个list并且打乱顺序
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//创建一个ClientCnxnSocket对象(socketNIO)并且创建了两个线程sendThread = new SendThread(clientCnxnSocket)、eventThread = new EventThread()
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();//启动上面的两个线程
}
SendThread.run(){...
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {...
startConnect(serverAddress);//@1
clientCnxnSocket.updateLastSendAndHeard();//记录一些时间
...}
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);@3
}
@1 startConnect{...
logStartConnect(addr);
clientCnxnSocket.connect(addr);//调用createSock()创建socketChannnel @2
...}
@2 SocketChannel createSock() throws IOException {
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}
@3 doTransport{...
synchronized (this) {
selected = selector.selectedKeys();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();//一些时间记录
sendThread.primeConnection();@4
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);@5
}
}
}
...}
@4 primeConnection{...//将request包装成Packet放入outgoingQueue队列中
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
...
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
...}
@5 doIO{...//将outgoingQueue中的packet传输到服务端 outgoingQueue待发送队列 pendingQueue已发送等待结果队列
if (sockKey.isReadable()) {int rc = sock.read(incomingBuffer);...
//获取连接结果 连接成功回调下面这个方法
readConnectResult(){...
sendThread.onConnected{...
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));...}
enableRead()
if (sockKey.isWritable()) {...
Packet p = findSendablePacket{...
ListIterator<Packet> iter = outgoingQueue.listIterator();
while (iter.hasNext()) {
Packet p = iter.next();
if (p.requestHeader == null) {
// We've found the priming-packet. Move it to the beginning of the queue.
iter.remove();
outgoingQueue.add(0, p);//将packet放入队列
...
p.createBB();
sock.write(p.bb);//将请求写出去
}
...}
@6 run{...
try {
Class<?> consoleC = Class.forName("jline.ConsoleReader");
...
Method readLine = consoleC.getMethod("readLine", String.class);
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
...}
public void executeLine(String line)//执行命令并加入history
throws InterruptedException, IOException, KeeperException {
if (!line.equals("")) {
cl.parseCommand(line);
addToHistory(commandCount,line);
processCmd(cl);
commandCount++;
}
}
protected boolean processCmd(MyCommandOptions co)
throws KeeperException, IOException, InterruptedException
{
try {
return processZKCmd(co){...
if (cmd.equals("quit")) {
if (cmd.equals("create") && args.length >= 3) {
String newPath = zk.create(path, args[first+2].getBytes(), acl,
flags){...
ReplyHeader r = cnxn.submitRequest(h, request, response, null);//提交request请求{...
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);//封装request到outgoingQueue中然后同步等待结果
synchronized (packet) {
while (!packet.finished) {
packet.wait();
...
}
服务端分析同理查看zkServer.sh启动入口:QuorumPeerMain
if [ "x$JMXPORT" = "x" ]
then
# for some reason these two options are necessary on jdk6 on Ubuntu
# accord to the docs they are not necessary, but otw jconsole cannot
# do a local attach
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);...
initializeAndRun{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);//加载文件配置
}...
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);@7
...}
runFromConfig{...
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
...}
@7 main{
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
initializeAndRun{...
runFromConfig(config){...
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
cnxnFactory.startup(zkServer);
...}
startup{
start();//启动NIOServerCnxnFactory.run() @8
setZooKeeperServer(zks);
zks.startdata();
zks.startup();
}
startdata() {//从事务日志中加载数据到内存
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
loadData();
}
}
loadData{...
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
setZxid(zkDb.loadDataBase())->snapLog.restore()->snapLog.deserialize(dt, sessions)//从快照中反序列化数据到dataTree;fastForwardFromEdits(){...
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);...//读取快照最大事务id+1...
try {
processTransaction{...//从快照(不是每次事务都会打快照)和事务日志中重新执行事务导出数据到内存(服务突然宕机等就可以恢复)
case OpCode.createSession:
rc = dt.processTxn(hdr, txn){
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(...
}
break;
case OpCode.closeSession:
}
...}
}
...}
FinalRequestProcessor#processRequest{...//committedLog就是用来保存议案的列表 每次持久化会把保存到committedLog
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request){...
if (committedLog.size() > commitLogCount) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.size() == 0) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
byte[] data = SerializeUtils.serializeRequest(request);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
...}
...}