命令行使用管道(命令以换行符分隔):
(printf "PING PING PING "; sleep 1) | nc localhost 6379
redis server 接收客户端的输入,调用栈如下:
ae.c/aeProcessEvents networking.c/processInputBuffer
redis 中客户端的结构体:
typedef struct client { // 输入缓冲区保存客户端发送的命令 sds querybuf; // 字符串数组,要执行的命令,例如 PING robj **argv; // 记录 argv 长度 int argc; } client;
分析管道命令的执行过程:按换行符 split 命令,分三次执行 PING 命令。
void processInputBuffer(client *c) { server.current_client = c; /* Keep processing while there is something in the input buffer */ // querybuf 的初始值是 "PING PING PING " // 每经过一次 processInlineBuffer(c),减少一个 PING // "PING PING PING " -> "PING PING " -> "PING " -> "" while(sdslen(c->querybuf)) { /* Return if clients are paused. */ if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; /* Determine request type when unknown. */ // 普通命令以 * 开头,请求类型为 PROTO_REQ_MULTIBULK,管道命令类型为 PROTO_REQ_INLINE if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = PROTO_REQ_MULTIBULK; } else { c->reqtype = PROTO_REQ_INLINE; } } if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ if (processCommand(c) == C_OK) resetClient(c); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) break; } } server.current_client = NULL; }
执行具体命令:
sever.c/processCommand
sever.c/call