zoukankan      html  css  js  c++  java
  • redis源码学习之工作流程初探

    背景

    redis是当下比较流行的KV数据库之一,是抵御高并发的一把利器,本着知其然还要知其所以然的目的,我决定花一点时间来研究其源码,希望最后能向自己解释清楚“redis为什么这么快”这个疑惑,第一篇主要介绍环境搭建和redis工作流程初探,后期会陆续献上其他有意思的章节。

    环境准备

    我自己的电脑是win10系统,所以我会准备一套适合windows系统的环境来供自己学习,这样方便调试分析。

    下载redis源码

    redis本身是不支持windows系统的,但是微软的工程师针对windows平台做了支持,源码放在了github上,有需要的可以自己去下载,我这里下载的是v2.8.9这个tag的源码,下载地址https://github.com/microsoftarchive/redis。这里扯个题外话,学习一个开源软件的时候不要一上来就下载最新版本的源码看,经过的迭代太多代码量就上来了,对于新手来说容易晕,先下一个早期的稳定版本了解其体系结构和工作流程,等待熟悉了以后再循序渐进。

    下载Visual Studio

    其他软件我没尝试过,这个是官方推荐的ide,一定一定一定下载 Visual Studio 2013 update5这个版本的,否则编译的时候各种报错,下载地址
    http://download.microsoft.com/download/9/3/E/93EA27FF-DB02-4822-8771-DCA0238957E9/vs2013.5_ult_chs.iso。这里再扯个题外话,我刚开始下载的是最新版本的Visual Studio,结果编译的时候各种报错,然后就去网络上一顿查一顿试,折腾半天还是没好,最后下载了 Visual Studio 2013 update5这个版本,结果一把就成功,有些牛角尖一定得钻,但是有些牛角尖不值得钻。

    Visual Studio打开redis源码

    按照下图方式打开下载的redis源码


    c程序的入口是main方法,redis main方法的位置在redis.c文件中,下面我们通过main方法来逐步了解redis的工作流程。

    启动过程分析

    跟着main方法顺序看下去,大概有以下几个关键步骤(略过了sentinel相关逻辑):
    1.设置随机数种子、获取当前时间等;
    2.初始化服务配置信息,设置默认值(initServerConfig);
    3.解析配置文件(loadServerConfig);
    4.初始化server对象(initServer);
     4.1创建eventLoop对象;
     4.2创建serverSocket,监听端口;
     4.3添加定时事件到eventLoop对象中;
     4.4将serverSocket文件描述符添加到监视集中,这里借助IO多路复用框架的能力(windows平台使用IOCP,其他平台使用select、epoll、evport等);
    5.从磁盘加载数据到内存中(loadDataFromDisk);
    6.执行事件循环逻辑(aeMain),这是redis真正挥洒汗水的地方,下一节会单独讲述这块内容。

    调用关系图

    事件循环分析

    我们都知道redis是单线程执行客户端命令的,那究竟是怎样一种设计才能支持高并发的读写呢。

    工作模型

    1.server启动,创建serverSocket监听端口,将serverSocket对应的FD(文件描述符)简称为FD-Server添加到IO多路复用框架的监视集当中,注册AE_READABLE事件(可读),关联的事件处理器是acceptTcpHandler;
    2.client连接server;
    3.事件循环开始轮询IO多路复用框架接口aeApiPoll,会得到就绪的FD,执行对应的事件处理器;
    4.由第3步事件循环触发FD-Server AE_READABLE事件对应的事件处理器acceptTcpHandler;
     4.1调用accept获得clientSocket对应的FD简称为FD-Client;
     4.2将FD-Client添加到IO多路复用框架的监视集当中,注册AE_READABLE事件(可读),关联的事件处理器是readQueryFromClient;
    5.client发送redis命令;
    6.由第3步事件循环触发FD-Clien AE_READABLE事件对应的事件处理器readQueryFromClient;
     6.1解析客户端发来的redis命令,找到命令对应的redisCommandProc(命令对应的处理函数);
     6.2执行redisCommandProc;
     6.3prepareClientToWrite准备回写响应信息,为FD-Client注册AE_WRITEABLE事件(可写),关联的事件处理器是sendReplyToClient;
    7.执行redis中的定时任务;
    8.由第3步事件循环触发FD-Clien AE_WRITEABLE事件对应的事件处理器sendReplyToClient,发送响应内容给client;

    代码分析

    server启动,创建serverSocket并注册AE_READABLE事件,设置事件处理器为acceptTcpHandler

    void initServer() {
          //省略部分代码
    
          //初始化eventLoop对象,eventLoop对象里面存储了所有的事件
          server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
    
          //创建serverSocket,监听端口
          if (server.port != 0 &&
            listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
            exit(1);
          
          //添加定时任务到eventLoop中 
          if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
          }
    
    
    
          //将serverSocket对应的文件描述符添加到监视集中,关联的事件处理器是acceptTcpHandler
          for (j = 0; j < server.ipfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
                acceptTcpHandler,NULL) == AE_ERR)
               
           }
    }
    

    acceptTcpHandler当有连接过来的时候被触发,调用accept得到client socket对应的FD,并将FD添加到监视集中,关联的事件处理器是readQueryFromClient

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd;
        //调用accept获得clientSocket对应的FD
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        
        //将clientSocket对应的FD添加到监视集中 
        acceptCommonHandler(cfd,0);
    }
    
    static void acceptCommonHandler(int fd, int flags) {
        redisClient *c;
         
        //调用createClient添加
        if ((c = createClient(fd)) == NULL) {
        }  
    }
    
    redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(redisClient));
    
        if (fd != -1) {
            anetNonBlock(NULL,fd);
            anetEnableTcpNoDelay(NULL,fd);
            if (server.tcpkeepalive)
                anetKeepAlive(NULL,fd,server.tcpkeepalive);
    
    	//将fd添加到监视集中,关联的事件处理器是readQueryFromClient
            if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                readQueryFromClient, c) == AE_ERR)
            {
            }
        }
    }
    

    aeMain就是跑一个循环,一直去调用aeProcessEvents

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }
    

    aeProcessEvents会调用aeApiPoll方法来获得就绪的文件描述符,然后执行文件描述符关联的的事件处理器

    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
    #ifdef _WIN32	
    	if (ServiceStopIssued() == TRUE)
    		aeStop(eventLoop);
    #endif
    
        /* Nothing to do? return ASAP */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        /* Note that we want call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            aeTimeEvent *shortest = NULL;
            struct timeval tv, *tvp;
    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                shortest = aeSearchNearestTimer(eventLoop);
            if (shortest) {
                long now_sec, now_ms;
    
                /* Calculate the time missing for the nearest
                 * timer to fire. */
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
                tvp->tv_sec = shortest->when_sec - now_sec;
                if (shortest->when_ms < now_ms) {
                    tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                    tvp->tv_sec --;
                } else {
                    tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
                }
                if (tvp->tv_sec < 0) tvp->tv_sec = 0;
                if (tvp->tv_usec < 0) tvp->tv_usec = 0;
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                if (flags & AE_DONT_WAIT) {
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            numevents = aeApiPoll(eventLoop, tvp);
            for (j = 0; j < numevents; j++) {
                aeFileEvent *fe;
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int rfired = 0;
    
                fe = &eventLoop->events[eventLoop->fired[j].fd];
    
    	    /* note the fe->mask & mask & ... code: maybe an already processed
                 * event removed an element that fired and we still didn't
                 * processed, so we check if the event is still valid. */
                if (fe->mask & mask & AE_READABLE) {
                    rfired = 1;
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                }
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!rfired || fe->wfileProc != fe->rfileProc)
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                }
                processed++;
            }
        }
        /* Check time events */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);//处理延迟任务
    
        return processed; /* return the number of processed file/time events */
    }
    

    动画演示

    做了一个动画帮助理解工作过程(redis启动之后使用命令行telnet到6379端口,然后执行keys *命令,最终拿到结果)

    网络模块

    IO多路复用

    这部分内容网络上精彩的内容太多,这里把我认为比较经典的一些内容贴出来供大家品读(建议从上往下顺序阅读)
    The C10K problem
    socket阻塞非阻塞等头疼问题解释
    LINUX – IO MULTIPLEXING – SELECT VS POLL VS EPOLL
    poll vs select vs event-based
    redis事件驱动

     
     

       来我的公众号与我交流
  • 相关阅读:
    Markdown基础语法
    Java是什么
    myBatis框架_关于怎么获得多表查询的总记录数
    关于Could not load driverClass ${jdbc.driverClassName}问题解决方案
    java中String与StringBuffer拼接的区别
    部分标签
    基础标签 网页分类
    入坑小开头
    完整版的OpenLDAP搭建全过程
    测试Linux下tcp最大连接数限制
  • 原文地址:https://www.cnblogs.com/chopper-poet/p/13207546.html
Copyright © 2011-2022 走看看