zoukankan      html  css  js  c++  java
  • Redis源码分析(二十七)--- rio系统I/O的封装

            I/O操作对于每个系统来说都是必不可少的一部分。而且I/O操作的好坏,在一定程度上也会影响着系统的效率问题。今天我学习了一下在Redis中的I/O是怎么处理的,同样的,Redis在他自己的系统中,也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽:

    struct _rio {
        /* Backend functions.
         * Since this functions do not tolerate short writes or reads the return
         * value is simplified to: zero on error, non zero on complete success. */
        /* 数据流的读方法 */
        size_t (*read)(struct _rio *, void *buf, size_t len);
        /* 数据流的写方法 */
        size_t (*write)(struct _rio *, const void *buf, size_t len);
        /* 获取当前的读写偏移量 */
        off_t (*tell)(struct _rio *);
        /* The update_cksum method if not NULL is used to compute the checksum of
         * all the data that was read or written so far. The method should be
         * designed so that can be called with the current checksum, and the buf
         * and len fields pointing to the new block of data to add to the checksum
         * computation. */
        /* 当读入新的数据块的时候,会更新当前的校验和 */
        void (*update_cksum)(struct _rio *, const void *buf, size_t len);
    
        /* The current checksum */
        /* 当前的校验和 */
        uint64_t cksum;
    
        /* number of bytes read or written */
        /* 当前读取的或写入的字节大小 */
        size_t processed_bytes;
    
        /* maximum single read or write chunk size */
        /* 最大的单次读写的大小 */
        size_t max_processing_chunk;
    
        /* Backend-specific vars. */
        /* rio中I/O变量 */
        union {
        	//buffer结构体
            struct {
            	//buffer具体内容
                sds ptr;
                //偏移量
                off_t pos;
            } buffer;
            //文件结构体
            struct {
                FILE *fp;
                off_t buffered; /* Bytes written since last fsync. */
                //同步的最小大小
                off_t autosync; /* fsync after 'autosync' bytes written. */
            } file;
        } io;
    };
    里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法,还有2个结构体变量,一个buffer结构体,一个file结构体,作者针对不同的I/O情况,做了不同的处理,当执行临时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时,则执行与rio.file之间的操作。下面看看rio统一定义的读写方法:

    /* The following functions are our interface with the stream. They'll call the
     * actual implementation of read / write / tell, and will update the checksum
     * if needed. */
    /* rio的写方法 */
    static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
        while (len) {
        	//判断当前操作字节长度是否超过最大长度
            size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
            //写入新的数据时,更新校验和
            if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
            //执行写方法
            if (r->write(r,buf,bytes_to_write) == 0)
                return 0;
            buf = (char*)buf + bytes_to_write;
            len -= bytes_to_write;
            //操作字节数增加
            r->processed_bytes += bytes_to_write;
        }
        return 1;
    }
    
    /* rio的读方法 */
    static inline size_t rioRead(rio *r, void *buf, size_t len) {
        while (len) {
        	//判断当前操作字节长度是否超过最大长度
            size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
            //读数据方法
            if (r->read(r,buf,bytes_to_read) == 0)
                return 0;
            //读数据时,更新校验和
            if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
            buf = (char*)buf + bytes_to_read;
            len -= bytes_to_read;
            r->processed_bytes += bytes_to_read;
        }
        return 1;
    }
    
    这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:

    /* 根据上面描述的方法,定义了BufferRio */
    static const rio rioBufferIO = {
        rioBufferRead,
        rioBufferWrite,
        rioBufferTell,
        NULL,           /* update_checksum */
        0,              /* current checksum */
        0,              /* bytes read or written */
        0,              /* read/write chunk size */
        { { NULL, 0 } } /* union for io-specific vars */
    };
    
    /* 根据上面描述的方法,定义了FileRio */
    static const rio rioFileIO = {
        rioFileRead,
        rioFileWrite,
        rioFileTell,
        NULL,           /* update_checksum */
        0,              /* current checksum */
        0,              /* bytes read or written */
        0,              /* read/write chunk size */
        { { NULL, 0 } } /* union for io-specific vars */
    };
    
    里面分别定义了相对应的读写方法,比如buffer的Read方法和File的Read方法:

    /* Returns 1 or 0 for success/failure. */
    /* 读取rio中的buffer内容到传入的参数 */
    static size_t rioBufferRead(rio *r, void *buf, size_t len) {
        if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
            return 0; /* not enough buffer to return len bytes. */
        memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
        r->io.buffer.pos += len;
        return 1;
    }
    
    /* Returns 1 or 0 for success/failure. */
    /* 读取rio中的fp文件内容 */
    static size_t rioFileRead(rio *r, void *buf, size_t len) {
        return fread(buf,len,1,r->io.file.fp);
    }
    
    作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法:

    /* rio写入不同类型数据方法,最终调用的是riowrite方法 */
    size_t rioWriteBulkCount(rio *r, char prefix, int count);
    size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
    size_t rioWriteBulkLongLong(rio *r, long long l);
    size_t rioWriteBulkDouble(rio *r, double d);
    举其中的一个方法实现:

    /* Write multi bulk count in the format: "*<count>
    ". */
    /* rio写入不同类型数据方法,调用的是riowrite方法 */
    size_t rioWriteBulkCount(rio *r, char prefix, int count) {
        char cbuf[128];
        int clen;
    
        cbuf[0] = prefix;
        clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
        cbuf[clen++] = '
    ';
        cbuf[clen++] = '
    ';
        if (rioWrite(r,cbuf,clen) == 0) return 0;
        return clen;
    }
    调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。

    /* Returns 1 or 0 for success/failure. */
    /* 将buf写入rio中的file文件中 */
    static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
        size_t retval;
    
        retval = fwrite(buf,len,1,r->io.file.fp);
        r->io.file.buffered += len;
    
        if (r->io.file.autosync &&
            r->io.file.buffered >= r->io.file.autosync)
        {
        	//判读是否需要同步
            fflush(r->io.file.fp);
            aof_fsync(fileno(r->io.file.fp));
            r->io.file.buffered = 0;
        }
        return retval;
    }
  • 相关阅读:
    Java I/O(二 使用)
    Java 基本I/O的学习总结(一 是什么)
    设计模式(一)
    浏览器输入一个网址(发生的过程)
    final关键字的4种用法
    JavaScript(4)——闭包与this对象以及window对象
    JavaScript(3)—— 正则表达式
    JavaScript(2)——对象属性、原型与原型链
    JavaScript(1)——变量、函数声明及作用域
    构建分布式配置中心阿波罗(Apollo)
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184208.html
Copyright © 2011-2022 走看看