一、线程模型(网络连接库的整体实现框架)
1.1 文件事件处理器
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型。
1.2 消息处理流程
文件事件处理器使用I/O多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
-
- 当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。
- 尽管多个文件事件可能会并发地出现,但I/O多路复用程序总是会将所有产生事件的套接字都推到一个队列里面,然后通过这个队列,以有序(sequentially)、同步(synchronously)、每次一个套接字的方式向文件事件分派器传送套接字:当上一个套接字产生的事件被处理完毕之后(该套接字为事件所关联的事件处理器执行完毕), I/O多路复用程序才会继续向文件事件分派器传送下一个套接字。
1.3 I/O 多路复用程序的实现
Redis的I/O多路复用程序的所有功能是通过包装select、epoll、evport和kqueue这些I/O多路复用函数库来实现的,每个I/O多路复用函数库在Redis源码中都对应一个单独的文件,比如ae_select.c、ae_epoll.c、ae_kqueue.c等。
因为Redis为每个I/O多路复用函数库都实现了相同的API,所以I/O多路复用程序的底层实现是可以互换的,如下图所示。
Redis在I/O多路复用程序的实现源码ae.c中用#include宏定义了相应的规则,程序会在编译时自动选择系统中性能最好的I/O多路复用函数库来作为Redis的I/O多路复用程序的底层实现:
1 /**
2 * Include the best multiplexing layer supported by this system.
3 * The following should be ordered by performances, descending.
4 * 包括该系统支持的最佳复用层。性能依次下降
5 */
6 #ifdef HAVE_EVPORT
7 #include "ae_evport.c"
8 #else
9 #ifdef HAVE_EPOLL
10 #include "ae_epoll.c"
11 #else
12 #ifdef HAVE_KQUEUE
13 #include "ae_kqueue.c"
14 #else
15 #include "ae_select.c"
16 #endif
17 #endif
18 #endif
1.4 文件事件的类型
I/O 多路复用程序可以监听多个套接字的ae.h/AE_READABLE事件和ae.h/AE_WRITABLE事件,这两类事件和套接字操作之间的对应关系如下:
-
- 当套接字变得可读时(客户端对套接字执行write操作,或者执行close操作),或者有新的可应答(acceptable)套接字出现时(客户端对服务器的监听套接字执行connect操作),套接字产生AE_READABLE 事件。
- 当套接字变得可写时(客户端对套接字执行read操作),套接字产生AE_WRITABLE事件。I/O多路复用程序允许服务器同时监听套接字的AE_READABLE事件和AE_WRITABLE事件,如果一个套接字同时产生了这两种事件,那么文件事件分派器会优先处理AE_READABLE事件,等到AE_READABLE事件处理完之后,才处理AE_WRITABLE 事件。这也就是说,如果一个套接字又可读又可写的话,那么服务器将先读套接字,后写套接字。
- AE_BARRIER 4 反转读写事件顺序
- AE_NONE 没有事件
事件类型
1 #define AE_NONE 0 /* No events registered. 没有事件*/
2 #define AE_READABLE 1 /* Fire when descriptor is readable. 当描述符可读时触发AE_READABLE事件。*/
3 #define AE_WRITABLE 2 /* Fire when descriptor is writable. 当描述符可写时触发。*/
4 /**
5 * With WRITABLE, never fire the event if the READABLE event already fired in the same event
6 * loop iteration. Useful when you want to persist things to disk before sending replies, and want
7 * to do that in a group fashion.
8 * 如果 READABLE 事件已在同一事件循环迭代中触发,则此时绝不会触发WRITABLE事件,
9 * 当您想在发送回复之前将内容持久保存到磁盘时很有用,并且希望以组方式进行。
10 *
11 * 但是,网络 IO 事件注册的时候,除了正常的读写事件外,还可以注册一个 AE_BARRIER 事件,
12 * 这个事件就是会影响到先读后写的处理顺序。
13 *
14 * 如果某个 fd 的 mask 包含了 AE_BARRIER,那它的处理顺序会是 先写后读。
15 *
16 * 针对这个场景,redis 举的例子是,如果在 beforesleep 回调中进行了 fsync 动作,
17 * 然后需要把结果快速回复给 client。这个情况下就需要用到 AE_BARRIER 事件,用来翻转处理事件顺序了。
18 */
19 #define AE_BARRIER 4
1.5 文件事件的处理器
Redis为文件事件编写了多个处理器,这些事件处理器分别用于实现不同的网络通讯需求,常用的处理器如下:
-
- 为了对连接服务器的各个客户端进行应答, 服务器要为监听套接字关联连接应答处理器。
- 为了接收客户端传来的命令请求, 服务器要为客户端套接字关联命令请求处理器。
- 为了向客户端返回命令的执行结果, 服务器要为客户端套接字关联命令回复处理器。
1.5.1 连接应答处理器
networking.c中acceptTcpHandler函数是Redis的连接应答处理器,这个处理器用于对连接服务器监听套接字的客户端进行应答,具体实现为sys/socket.h/accept函数的包装。
当Redis服务器进行初始化的时候,程序会将这个连接应答处理器和服务器监听套接字的AE_READABLE事件关联起来,当有客户端用sys/socket.h/connect函数连接服务器监听套接字的时候, 套接字就会产生AE_READABLE 事件, 引发连接应答处理器执行, 并执行相应的套接字应答操作,如图所示。
1.5.2 命令请求处理器
networking.c中readQueryFromClient函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容, 具体实现为unistd.h/read函数的包装。
当一个客户端通过连接应答处理器成功连接到服务器之后, 服务器会将客户端套接字的AE_READABLE事件和命令请求处理器关联起来,当客户端向服务器发送命令请求的时候,套接字就会产生 AE_READABLE事件,引发命令请求处理器执行,并执行相应的套接字读入操作,如图所示。
在客户端连接服务器的整个过程中,服务器都会一直为客户端套接字的AE_READABLE事件关联命令请求处理器。
1.5.3 命令回复处理器
networking.c中sendReplyToClient函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write函数的包装。
当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的AE_WRITABLE事件和命令回复处理器关联起来,当客户端准备好接收服务器传回的命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器执行,并执行相应的套接字写入操作, 如图所示。
当命令回复发送完毕之后, 服务器就会解除命令回复处理器与客户端套接字的 AE_WRITABLE 事件之间的关联。
1.6 一次完整的客户端与服务器连接事件示例
假设Redis服务器正在运作,那么这个服务器的监听套接字的AE_READABLE事件应该正处于监听状态之下,而该事件所对应的处理器为连接应答处理器。
如果这时有一个Redis客户端向Redis服务器发起连接,那么监听套接字将产生AE_READABLE事件, 触发连接应答处理器执行:处理器会对客户端的连接请求进行应答, 然后创建客户端套接字,以及客户端状态,并将客户端套接字的 AE_READABLE 事件与命令请求处理器进行关联,使得客户端可以向主服务器发送命令请求。
之后,客户端向Redis服务器发送一个命令请求,那么客户端套接字将产生 AE_READABLE事件,引发命令请求处理器执行,处理器读取客户端的命令内容, 然后传给相关程序去执行。
执行命令将产生相应的命令回复,为了将这些命令回复传送回客户端,服务器会将客户端套接字的AE_WRITABLE事件与命令回复处理器进行关联:当客户端尝试读取命令回复的时候,客户端套接字将产生AE_WRITABLE事件, 触发命令回复处理器执行, 当命令回复处理器将命令回复全部写入到套接字之后, 服务器就会解除客户端套接字的AE_WRITABLE事件与命令回复处理器之间的关联。
二、实现
2.1 Redis网络连接库简介
Redis网络连接库对应的文件是networking.c。这个文件主要负责
- 客户端的创建与释放
- 命令接收、命令回复、连接应答处理
- Redis通信协议分析
- CLIENT 命令的实现
Redis 命令执行过程:
我们接下来就这几块内容分别列出源码,进行剖析。
2.2 客户端的创建与释放
redis 网络链接库的源码详细注释,链接:https://github.com/menwenjun/redis_source_annotation/blob/master/networking.c
2.2.1 客户端的创建
Redis 服务器是一个同时与多个客户端建立连接的程序。当客户端连接上服务器时,服务器会建立一个server.h/client
结构来保存客户端的状态信息。所以在客户端创建时,就会初始化这样一个结构,客户端的创建源码如下:
1 client *createClient(int fd) {
2 //分配空间
3 client *c = zmalloc(sizeof(client));
4
5 /**
6 * passing -1 as fd it is possible to create a non connected client.
7 * This is useful since all the commands needs to be executed
8 * in the context of a client. When commands are executed in other
9 * contexts (for instance a Lua script) we need a non connected client.
10 *
11 * 如果fd为-1,表示创建的是一个无网络连接的伪客户端,用于执行lua脚本的时候。
12 * 如果fd不等于-1,表示创建一个有网络连接的客户端
13 */
14 if (fd != -1) {
15 // 设置fd为非阻塞模式
16 anetNonBlock(NULL,fd);
17 // 禁止使用 Nagle 算法,client向内核递交的每个数据包都会立即发送给server出去,TCP_NODELAY
18 anetEnableTcpNoDelay(NULL,fd);
19 // 如果开启了tcpkeepalive,则设置 SO_KEEPALIVE
20 if (server.tcpkeepalive)
21 anetKeepAlive(NULL,fd,server.tcpkeepalive);// 设置tcp连接的keep alive选项
22 /**
23 * 使能AE_READABLE事件,readQueryFromClient是该事件的回调函数
24 *
25 * 创建一个文件事件状态el,且监听读事件,开始接受命令的输入
26 */
27 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
28 readQueryFromClient, c) == AE_ERR)
29 {
30 close(fd);
31 zfree(c);
32 return NULL;
33 }
34 }
35
36 // 默认选0号数据库
37 selectDb(c,0);
38 uint64_t client_id;
39 // 设置client的ID
40 atomicGetIncr(server.next_client_id,client_id,1);
41 c->id = client_id;
42 // client的套接字
43 c->fd = fd;
44 // client的名字
45 c->name = NULL;
46 // 回复固定(静态)缓冲区的偏移量
47 c->bufpos = 0;
48 c->qb_pos = 0;
49 // 输入缓存区
50 c->querybuf = sdsempty();
51 c->pending_querybuf = sdsempty();
52 // 输入缓存区的峰值
53 c->querybuf_peak = 0;
54 // 请求协议类型,内联或者多条命令,初始化为0
55 c->reqtype = 0;
56 // 参数个数
57 c->argc = 0;
58 // 参数列表
59 c->argv = NULL;
60 // 当前执行的命令和最近一次执行的命令
61 c->cmd = c->lastcmd = NULL;
62 // 查询缓冲区剩余未读取命令的数量
63 c->multibulklen = 0;
64 // 读入参数的长度
65 c->bulklen = -1;
66 // 已发的字节数
67 c->sentlen = 0;
68 // client的状态
69 c->flags = 0;
70 // 设置创建client的时间和最后一次互动的时间
71 c->ctime = c->lastinteraction = server.unixtime;
72 // 认证状态
73 c->authenticated = 0;
74 // replication复制的状态,初始为无
75 c->replstate = REPL_STATE_NONE;
76 // 设置从节点的写处理器为ack,是否在slave向master发送ack
77 c->repl_put_online_on_ack = 0;
78 // replication复制的偏移量
79 c->reploff = 0;
80 c->read_reploff = 0;
81 // 通过ack命令接收到的偏移量
82 c->repl_ack_off = 0;
83 // 通过ack命令接收到的偏移量所用的时间
84 c->repl_ack_time = 0;
85 // 从节点的端口号
86 c->slave_listening_port = 0;
87 // 从节点IP地址
88 c->slave_ip[0] = '