zoukankan      html  css  js  c++  java
  • Redis 内存管理与事件处理

    1 Redis内存管理

    Redis内存管理相关文件为zmalloc.c/zmalloc.h,其只是对C中内存管理函数做了简单的封装,屏蔽了底层平台的差异,并增加了内存使用情况统计的功能。
    void *zmalloc(size_t size) {
        // 多申请的一部分内存用于存储当前分配了多少自己的内存
        void *ptr = malloc(size+PREFIX_SIZE);
     
        if (!ptr) zmalloc_oom_handler(size);
    #ifdef HAVE_MALLOC_SIZE
        update_zmalloc_stat_alloc(zmalloc_size(ptr));
        return ptr;
    #else
        *((size_t*)ptr) = size;
        // 内存分配统计
        update_zmalloc_stat_alloc(size+PREFIX_SIZE);
        return (char*)ptr+PREFIX_SIZE;
    #endif
    }

     内存布局图示:

    2 事件处理

    Redis的事件类型分为时间事件和文件事件,文件事件也就是网络连接事件。时间事件的处理是在epoll_wait返回处理文件事件后处理的,每次epoll_wait的超时时间都是Redis最近的一个定时器时间。
     
    Redis在进行事件处理前,首先会进行初始化,初始化的主要逻辑在main/initServer函数中。初始化流程主要做的工作如下:
    • 设置信号回调函数。
    • 创建事件循环机制,即调用epoll_create。
    • 创建服务监听端口,创建定时事件,并将这些事件添加到事件机制中。
    void initServer(void) {
        int j;
     
        // 设置信号对应的处理函数
        signal(SIGHUP, SIG_IGN);
        signal(SIGPIPE, SIG_IGN);
        setupSignalHandlers();
        ...
     
        createSharedObjects();
        adjustOpenFilesLimit();
        // 创建事件循环机制,及调用epoll_create创建epollfd用于事件监听
        server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
        server.db = zmalloc(sizeof(redisDb)*server.dbnum);
     
        /* Open the TCP listening socket for the user commands. */
        // 创建监听服务端口,socket/bind/listen
        if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
            exit(1);
        ...
     
        /* Create the Redis databases, and initialize other internal state. */
        for (j = 0; j < server.dbnum; j++) {
            server.db[j].dict = dictCreate(&dbDictType,NULL);
            server.db[j].expires = dictCreate(&keyptrDictType,NULL);
            server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
            server.db[j].ready_keys = dictCreate(&setDictType,NULL);
            server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
            server.db[j].eviction_pool = evictionPoolAlloc();
            server.db[j].id = j;
            server.db[j].avg_ttl = 0;
        }
        ...
     
        /* Create the serverCron() time event, that's our main way to process
         * background operations. 创建定时事件 */
        if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            serverPanic("Can't create the serverCron time event.");
            exit(1);
        }
     
        /* Create an event handler for accepting new connections in TCP and Unix
         * domain sockets. */
        for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
                {
                    serverPanic(
                        "Unrecoverable error creating server.ipfd file event.");
                }
        }
        // 将事件加入到事件机制中,调用链为 aeCreateFileEvent/aeApiAddEvent/epoll_ctl
        if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
            acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
     
        /* Open the AOF file if needed. */
        if (server.aof_state == AOF_ON) {
            server.aof_fd = open(server.aof_filename,
                                   O_WRONLY|O_APPEND|O_CREAT,0644);
            if (server.aof_fd == -1) {
                serverLog(LL_WARNING, "Can't open the append-only file: %s",
                    strerror(errno));
                exit(1);
            }
        }
     
        ...
    }
    事件处理流程
    事件处理函数链:aeMain / aeProcessEvents / aeApiPoll / epoll_wait。
     
    常见的事件机制处理流程是:调用epoll_wait等待事件来临,然后遍历每一个epoll_event,提取epoll_event中的events和data域,data域常用来存储fd或者指针,不过一般的做法是提取出events和data.fd,然后根据fd找到对应的回调函数,fd与对应回调函数之间的映射关系可以存储在特定的数据结构中,比如数组或者哈希表,然后调用事件回调函数来处理。
     
    Redis中用了一个数组来保存fd与回调函数的映射关系,使用数组的优点就是简单高效,但是数组一般使用在建立的连接不太多情况,而Redis正好符合这个情况,一般Redis的文件事件大都是客户端建立的连接,而客户端的连接个数是一定的,该数量通过配置项maxclients来指定。
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, numevents = 0;
     
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
     
            numevents = retval;
            for (j = 0; j < numevents; j++) {
                int mask = 0;
                struct epoll_event *e = state->events+j;
     
                if (e->events & EPOLLIN) mask |= AE_READABLE;
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
                eventLoop->fired[j].fd = e->data.fd;
                eventLoop->fired[j].mask = mask;
            }
        }
        return numevents;
    }
     
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从eventLoop->events数组中查找对应的回调函数
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
     
            /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
        ...
    }
    文件事件的监听
    Redis监听端口的事件回调函数链是:acceptTcpHandler / acceptCommonHandler / createClient / aeCreateFileEvent / aeApiAddEvent / epoll_ctl。 
    在Reids监听事件处理流程中,会将客户端的连接fd添加到事件机制中,并设置其回调函数为readQueryFromClient,该函数负责处理客户端的命令请求。
     
    命令处理流程
    命令处理流程链是:readQueryFromClient / processInputBuffer / processCommand / call / 对应命令的回调函数(c->cmd->proc),比如get key命令的处理回调函数为getCommand。getCommand的执行流程是先到client对应的数据库字典中根据key来查找数据,然后根据响应消息格式将查询结果填充到响应消息中。
     

    3 如何添加自定义命令

    如何在Redis中添加自定的命令呢?其中只需要改动以下几个地方就行了,比如自定义命令random xxx,然后返回redis: xxx,因为hello xxx和get key类似,所以就依葫芦画瓢。random命令用来返回一个小于xxx的随机值。
     
    首先在redisCommandTable数组中添加自定义的命令,redisCommandTable数组定义在server.c中。然后在getCommand定义处后面添加randomCommand的定义,getCommand定义在t_string.c中。最后在server.h中添加helloCommand的声明。整个修改patch文件如下,代码基于redis-2.8.9版本。
    From 5304020683078273c1bc6cc9666dab95efa18607 Mon Sep 17 00:00:00 2001
    From: luoxn28 <luoxn28@163.com>
    Date: Fri, 30 Jun 2017 04:43:47 -0700
    Subject: [PATCH] add own command: random num
     
    ---
     src/server.c   |  3 ++-
     src/server.h   |  1 +
     src/t_string.c | 44 ++++++++++++++++++++++++++++++++++++++++++++
     3 files changed, 47 insertions(+), 1 deletion(-)
     
    diff --git a/src/server.c b/src/server.c
    index 609f396..e040104 100644
    --- a/src/server.c
    +++ b/src/server.c
    @@ -296,7 +296,8 @@ struct redisCommand redisCommandTable[] = {
         {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
         {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
         {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
    -    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
    +    {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0},
    +    {"random",randomCommand,2,"rF",0,NULL,1,1,1,0,0}
     };
      
     struct evictionPoolEntry *evictionPoolAlloc(void);
    diff --git a/src/server.h b/src/server.h
    index 3fa7c3a..427ac92 100644
    --- a/src/server.h
    +++ b/src/server.h
    @@ -1485,6 +1485,7 @@ void setnxCommand(client *c);
     void setexCommand(client *c);
     void psetexCommand(client *c);
     void getCommand(client *c);
    +void randomCommand(client *c);
     void delCommand(client *c);
     void existsCommand(client *c);
     void setbitCommand(client *c);
    diff --git a/src/t_string.c b/src/t_string.c
    index 8c737c4..df4022d 100644
    --- a/src/t_string.c
    +++ b/src/t_string.c
    @@ -173,6 +173,50 @@ void getCommand(client *c) {
         getGenericCommand(c);
     }
      
    +static bool checkRandomNum(char *num)
    +{
    +    char *c = num;
    +
    +    while (*c != '') {
    +        if (!(('0' <= *c) && (*c <= '9'))) {
    +            return false;
    +        }
    +        c++;
    +    }
    +
    +    return true;
    +}
    +
    +/**
    + * command: random n
    + * return a random num < n, if n <= 0, return 0
    + * @author: luoxiangnan
    + */
    +void randomCommand(client *c)
    +{
    +    char buff[64] = {0};
    +    int num = 0;
    +    robj *o = NULL;
    +
    +    if (!checkRandomNum(c->argv[1]->ptr)) {
    +        o = createObject(OBJ_STRING, sdsnewlen("sorry, it's not a num :(",
    +                           strlen("sorry, it's not a num :(")));
    +        addReplyBulk(c, o);
    +        return;
    +    }
    +
    +    sscanf(c->argv[1]->ptr, "%d", &num);
    +    if (num > 0) {
    +        num = random() % num;
    +    } else {
    +        num = 0;
    +    }
    +
    +    sprintf(buff, "%s %d", "redis: ", num);
    +    o = createObject(OBJ_STRING, sdsnewlen(buff, strlen(buff)));
    +    addReplyBulk(c, o);
    +}
    +
     void getsetCommand(client *c) {
         if (getGenericCommand(c) == C_ERR) return;
         c->argv[2] = tryObjectEncoding(c->argv[2]);
    -- 
    1.8.3.1
    结果如下所示:
  • 相关阅读:
    Spring事务配置的五种方式(转)
    struts.properties配置详解(转)
    Spring3.3 整合 Hibernate3、MyBatis3.2 配置多数据源/动态切换数据源 方法(转)
    php的ob函数实现页面静态化
    冒泡排序法原理讲解及PHP代码示例
    Linux Centos下编译安装Redis
    PHP判断是手机端还是PC端
    windows 下 Symfony的下载与安装
    JS在线生成二维码
    关于微信分享到朋友圈(Thinkphp-tp3.2框架下实现)
  • 原文地址:https://www.cnblogs.com/luoxn28/p/7101725.html
Copyright © 2011-2022 走看看