zoukankan      html  css  js  c++  java
  • Redis C客户端Hiredis代码分析

    初始化

    redisContext - Redis连接的上下文

    /* Context for a connection to Redis */
    typedef struct redisContext {
        int err; /* Error flags, 0 when there is no error */
        char errstr[128]; /* String representation of error when applicable */
        int fd;
        int flags;
        char *obuf; /* Write buffer */
        redisReader *reader; /* Protocol reader */
    
        enum redisConnectionType connection_type;
        struct timeval *timeout;
    
        struct {
            char *host;
            char *source_addr;
            int port;
        } tcp;
    
        struct {
            char *path;
        } unix_sock;
    
    } redisContext;
    

    每一次连接成功, 都会创建一个redisContext数据, 用于后续的操作

    redisReader - 结果读取

    typedef struct redisReader {
        int err; /* Error flags, 0 when there is no error */
        char errstr[128]; /* String representation of error when applicable */
    
        char *buf; /* Read buffer */
        size_t pos; /* Buffer cursor */
        size_t len; /* Buffer length */
        size_t maxbuf; /* Max length of unused buffer */
    
        redisReadTask rstack[9];
        int ridx; /* Index of current read task */
        void *reply; /* Temporary reply pointer */
    
        redisReplyObjectFunctions *fn;
        void *privdata;
    } redisReader;
    

    redisReader在redisContext中使用redisReplyObjectFunctions参数进行初始化, 用于读取执行结果, 其中 redisReplyObjectFunctions 是一个函数指针列表, 用于读取不同类型的返回数据, 

    typedef struct redisReplyObjectFunctions {
        void *(*createString)(const redisReadTask*, char*, size_t);
        void *(*createArray)(const redisReadTask*, int);
        void *(*createInteger)(const redisReadTask*, long long);
        void *(*createNil)(const redisReadTask*);
        void (*freeObject)(void*);
    } redisReplyObjectFunctions;
    

    一般使用的是默认的defaultFunctions进行初始化

    /* Default set of functions to build the reply. Keep in mind that such a
     * function returning NULL is interpreted as OOM. */
    static redisReplyObjectFunctions defaultFunctions = {
        createStringObject,
        createArrayObject,
        createIntegerObject,
        createNilObject,
        freeReplyObject
    };
    

    redisReader中的另一个结构体是 redisReadTask数组, 需要配合ridx一起使用, 用于给redisReplyObjectFunctions中的函数提供输入参数

    typedef struct redisReadTask {
        int type;
        int elements; /* number of elements in multibulk container */
        int idx; /* index in parent (array) object */
        void *obj; /* holds user-generated value for a read task */
        struct redisReadTask *parent; /* parent task */
        void *privdata; /* user-settable arbitrary field */
    } redisReadTask;
    

    在defaultFunctions下面的函数是 createStringObject, createArrayObject, createIntegerObject, createNilObject, freeReplyObject, 这些函数的输入都是rediskReadTask, 但是根据类型不同会有不同的附带参数, 例如array就会有elements元素数量, 而string就会有char *str和size_t len, 都会在处理中产生redisReply对象, 并将这个对象挂载到上一级redisReply对象的element变量上(如果有上一级的话).

    结果解析

    执行命令时, 实际调用的处理方法是redisvCommand, 

    void *redisvCommand(redisContext *c, const char *format, va_list ap) {
        if (redisvAppendCommand(c,format,ap) != REDIS_OK)
            return NULL;
        return __redisBlockForReply(c);
    }
    

    在这里会调用__redisBlockForReply来处理存在redisContext *c中的结果

    /* Helper function for the redisCommand* family of functions.
     *
     * Write a formatted command to the output buffer. If the given context is
     * blocking, immediately read the reply into the "reply" pointer. When the
     * context is non-blocking, the "reply" pointer will not be used and the
     * command is simply appended to the write buffer.
     *
     * Returns the reply when a reply was successfully retrieved. Returns NULL
     * otherwise. When NULL is returned in a blocking context, the error field
     * in the context will be set.
     */
    static void *__redisBlockForReply(redisContext *c) {
        void *reply;
    
        if (c->flags & REDIS_BLOCK) {
            if (redisGetReply(c,&reply) != REDIS_OK)
                return NULL;
            return reply;
        }
        return NULL;
    }
    

    这里会将错误情况排除, 实际的处理方法是 redisGetReply, 这里的处理方式是阻塞的, 如果aux已经读到error就返回, 否则一直在等直到有值

    int redisGetReply(redisContext *c, void **reply) {
        int wdone = 0;
        void *aux = NULL;
    
        /* Try to read pending replies */
        if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
            return REDIS_ERR;
    
        /* For the blocking context, flush output buffer and read reply */
        if (aux == NULL && c->flags & REDIS_BLOCK) {
            /* Write until done */
            do {
                if (redisBufferWrite(c,&wdone) == REDIS_ERR)
                    return REDIS_ERR;
            } while (!wdone);
    
            /* Read until there is a reply */
            do {
                if (redisBufferRead(c) == REDIS_ERR)
                    return REDIS_ERR;
                if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                    return REDIS_ERR;
            } while (aux == NULL);
        }
    
        /* Set reply object */
        if (reply != NULL) *reply = aux;
        return REDIS_OK;
    }
    

    在阻塞读取的过程中, 会先通过redisBufferRead尝试读取, 成功后再用redisGetReplyFromReader尝试解析

    /* Use this function to handle a read event on the descriptor. It will try
     * and read some bytes from the socket and feed them to the reply parser.
     *
     * After this function is called, you may use redisContextReadReply to
     * see if there is a reply available. */
    int redisBufferRead(redisContext *c) {
        char buf[1024*16];
        int nread;
    
        /* Return early when the context has seen an error. */
        if (c->err)
            return REDIS_ERR;
    
        nread = read(c->fd,buf,sizeof(buf));
        if (nread == -1) {
            if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
                /* Try again later */
            } else {
                __redisSetError(c,REDIS_ERR_IO,NULL);
                return REDIS_ERR;
            }
        } else if (nread == 0) {
            __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
            return REDIS_ERR;
        } else {
            if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
                __redisSetError(c,c->reader->err,c->reader->errstr);
                return REDIS_ERR;
            }
        }
        return REDIS_OK;
    }
    

    在redisGetReplyFromReader中调用redisReaderGetReply解析, 再往内部, 就是循环调用processItem来处理结果了

    /* Internal helper function to try and get a reply from the reader,
     * or set an error in the context otherwise. */
    int redisGetReplyFromReader(redisContext *c, void **reply) {
        if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) {
            __redisSetError(c,c->reader->err,c->reader->errstr);
            return REDIS_ERR;
        }
        return REDIS_OK;
    }
    
    int redisReaderGetReply(redisReader *r, void **reply) {
        /* Default target pointer to NULL. */
        if (reply != NULL)
            *reply = NULL;
    
        /* Return early when this reader is in an erroneous state. */
        if (r->err)
            return REDIS_ERR;
    
        /* When the buffer is empty, there will never be a reply. */
        if (r->len == 0)
            return REDIS_OK;
    
        /* Set first item to process when the stack is empty. */
        if (r->ridx == -1) {
            r->rstack[0].type = -1;
            r->rstack[0].elements = -1;
            r->rstack[0].idx = -1;
            r->rstack[0].obj = NULL;
            r->rstack[0].parent = NULL;
            r->rstack[0].privdata = r->privdata;
            r->ridx = 0;
        }
    
        /* Process items in reply. */
        while (r->ridx >= 0)
            if (processItem(r) != REDIS_OK)
                break;
    
        /* Return ASAP when an error occurred. */
        if (r->err)
            return REDIS_ERR;
    
        /* Discard part of the buffer when we've consumed at least 1k, to avoid
         * doing unnecessary calls to memmove() in sds.c. */
        if (r->pos >= 1024) {
            sdsrange(r->buf,r->pos,-1);
            r->pos = 0;
            r->len = sdslen(r->buf);
        }
    
        /* Emit a reply when there is one. */
        if (r->ridx == -1) {
            if (reply != NULL)
                *reply = r->reply;
            r->reply = NULL;
        }
        return REDIS_OK;
    }
    

    在processItem中, 会根据第一个字节确定当前redisReadTask的类型, 然后对类型为integer, string和array的数据进行处理, 其他的直接返回error

    static int processItem(redisReader *r) {
        redisReadTask *cur = &(r->rstack[r->ridx]);
        char *p;
    
        /* check if we need to read type */
        if (cur->type < 0) {
            if ((p = readBytes(r,1)) != NULL) {
                switch (p[0]) {
                case '-':
                    cur->type = REDIS_REPLY_ERROR;
                    break;
                case '+':
                    cur->type = REDIS_REPLY_STATUS;
                    break;
                case ':':
                    cur->type = REDIS_REPLY_INTEGER;
                    break;
                case '$':
                    cur->type = REDIS_REPLY_STRING;
                    break;
                case '*':
                    cur->type = REDIS_REPLY_ARRAY;
                    break;
                default:
                    __redisReaderSetErrorProtocolByte(r,*p);
                    return REDIS_ERR;
                }
            } else {
                /* could not consume 1 byte */
                return REDIS_ERR;
            }
        }
    
        /* process typed item */
        switch(cur->type) {
        case REDIS_REPLY_ERROR:
        case REDIS_REPLY_STATUS:
        case REDIS_REPLY_INTEGER:
            return processLineItem(r);
        case REDIS_REPLY_STRING:
            return processBulkItem(r);
        case REDIS_REPLY_ARRAY:
            return processMultiBulkItem(r);
        default:
            assert(NULL);
            return REDIS_ERR; /* Avoid warning. */
        }
    }
    

    对redisReadTask队列和ridx的操作只发生在类型为array的情况下, 是在processMultiBulkItem的处理中进行的, 这里会取出一整个这一层array里的数据, 如果遇到元素的elements大于0的, 说明这个元素也是一个数组, 那么就会ridx加一, 在rstack中增加一个redisReadTask, 这时候就不再继续处理同层数据了, 而是等着外循环去进入下一层处理.

    static int processMultiBulkItem(redisReader *r) {
        redisReadTask *cur = &(r->rstack[r->ridx]);
        void *obj;
        char *p;
        long long elements;
        int root = 0, len;
    
        /* Set error for nested multi bulks with depth > 7 */
        if (r->ridx == 8) {
            __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
                "No support for nested multi bulk replies with depth > 7");
            return REDIS_ERR;
        }
    
        if ((p = readLine(r,&len)) != NULL) {
            if (string2ll(p, len, &elements) == REDIS_ERR) {
                __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
                        "Bad multi-bulk length");
                return REDIS_ERR;
            }
    
            root = (r->ridx == 0);
    
            if (elements < -1 || elements > INT_MAX) {
                __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
                        "Multi-bulk length out of range");
                return REDIS_ERR;
            }
    
            if (elements == -1) {
                if (r->fn && r->fn->createNil)
                    obj = r->fn->createNil(cur);
                else
                    obj = (void*)REDIS_REPLY_NIL;
    
                if (obj == NULL) {
                    __redisReaderSetErrorOOM(r);
                    return REDIS_ERR;
                }
    
                moveToNextTask(r);
            } else {
                if (r->fn && r->fn->createArray)
                    obj = r->fn->createArray(cur,elements);
                else
                    obj = (void*)REDIS_REPLY_ARRAY;
    
                if (obj == NULL) {
                    __redisReaderSetErrorOOM(r);
                    return REDIS_ERR;
                }
    
                /* Modify task stack when there are more than 0 elements. */
                if (elements > 0) {
                    cur->elements = elements;
                    cur->obj = obj;
                    r->ridx++;
                    r->rstack[r->ridx].type = -1;
                    r->rstack[r->ridx].elements = -1;
                    r->rstack[r->ridx].idx = 0;
                    r->rstack[r->ridx].obj = NULL;
                    r->rstack[r->ridx].parent = cur;
                    r->rstack[r->ridx].privdata = r->privdata;
                } else {
                    moveToNextTask(r);
                }
            }
    
            /* Set reply if this is the root object. */
            if (root) r->reply = obj;
            return REDIS_OK;
        }
    
        return REDIS_ERR;
    }
    

    这里要注意的是moveToNextTask这个函数, 这是去移动到当前层的下一个元素, 如果当前层已经空了就直接返回, 如果在当前层的编号已经到最后一个, 就返回上一层. 所以这个hiredis对返回数组的处理是一个深度遍历, 并且规定了层数不能超过8

    static void moveToNextTask(redisReader *r) {
        redisReadTask *cur, *prv;
        while (r->ridx >= 0) {
            /* Return a.s.a.p. when the stack is now empty. */
            if (r->ridx == 0) {
                r->ridx--;
                return;
            }
    
            cur = &(r->rstack[r->ridx]);
            prv = &(r->rstack[r->ridx-1]);
            assert(prv->type == REDIS_REPLY_ARRAY);
            if (cur->idx == prv->elements-1) {
                r->ridx--;
            } else {
                /* Reset the type because the next item can be anything */
                assert(cur->idx < prv->elements);
                cur->type = -1;
                cur->elements = -1;
                cur->idx++;
                return;
            }
        }
    }
    

    .

  • 相关阅读:
    java面向对象高级分层实例_实体类
    But what exactly do we mean by "gets closer to"?
    information entropy as a measure of the uncertainty in a message while essentially inventing the field of information theory
    SVM vs. Softmax
    every row of W is a classifier for one of the classes
    Hinge Loss
    polynomial time
    Conditional random fields
    Frobenius Norm
    L2 范数 L1 范数 出租车范数
  • 原文地址:https://www.cnblogs.com/milton/p/9565169.html
Copyright © 2011-2022 走看看