zoukankan      html  css  js  c++  java
  • Redis学习--monior命令

    学习总结

    在redis中使用到缓冲区的功能主要有:

    • 客户端缓冲区
    • 复制积压缓冲区
    • AOF缓冲区

    其中客户端缓冲区指客户端通过TCP连接到redis的输入缓冲区和输出缓存区。

    参数client-query-buffer-limit用来控制客户端传递给redis的数据大小,默认为1G,在使用Redis时应尽量避免超大Key。

    参数client-output-buffer-limit用来控制输出缓冲区大小,可以按照不同的使用场景来进行控制:

    # The client output buffer limits can be used to force disconnection of clients
    # that are not reading data from the server fast enough for some reason (a
    # common reason is that a Pub/Sub client can't consume messages as fast as the
    # publisher can produce them).
    #
    # The limit can be set differently for the three different classes of clients:
    #
    # normal -> normal clients including MONITOR clients
    # replica  -> replica clients
    # pubsub -> clients subscribed to at least one pubsub channel or pattern
    #
    # The syntax of every client-output-buffer-limit directive is the following:
    #
    # client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
    #
    # A client is immediately disconnected once the hard limit is reached, or if
    # the soft limit is reached and remains reached for the specified number of
    # seconds (continuously).
    # So for instance if the hard limit is 32 megabytes and the soft limit is
    # 16 megabytes / 10 seconds, the client will get disconnected immediately
    # if the size of the output buffers reach 32 megabytes, but will also get
    # disconnected if the client reaches 16 megabytes and continuously overcomes
    # the limit for 10 seconds.
    #
    # By default normal clients are not limited because they don't receive data
    # without asking (in a push way), but just after a request, so only
    # asynchronous clients may create a scenario where data is requested faster
    # than it can read.
    #
    # Instead there is a default limit for pubsub and replica clients, since
    # subscribers and replicas receive data in a push fashion.
    #
    # Both the hard or the soft limit can be disabled by setting them to zero.
    client-output-buffer-limit normal 0 0 0
    client-output-buffer-limit replica 256mb 64mb 60
    client-output-buffer-limit pubsub 32mb 8mb 60
    
    # Client query buffers accumulate new commands. They are limited to a fixed
    # amount by default in order to avoid that a protocol desynchronization (for
    # instance due to a bug in the client) will lead to unbound memory usage in
    # the query buffer. However you can configure it here if you have very special
    # needs, such us huge multi/exec requests or alike.
    #
    # client-query-buffer-limit 1gb
    

    默认配置中,不会对normal的客户端进行限制,在使用monitor监听redis的命令请求时,如果redis的QPS较高或者请求数据较多时,客户端无法快速接收这些数据,则会在客户端缓冲区中形成积压,严重消耗redis内存。

    在请求频繁的生产环境,应尽量避免使用monitor命令并严格控制monitor命令执行时间。

    通过命令client listhe info clients来获取客户端列表和客户端输入输出缓冲区的资源消耗。

    源码学习

    首先在redisServer结构中会保存所有的从节点列表和监控客户端:

    struct redisServer {
    	list *slaves, *monitors;    /* List of slaves and MONITORs */
    }
    

    当客户端使用monitor请求服务时,会将该客户端加入到monitor列表中:

    void monitorCommand(client *c) {
        /* ignore MONITOR if already slave or in monitor mode */
        if (c->flags & CLIENT_SLAVE) return;
    
        c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
        listAddNodeTail(server.monitors,c);
        addReply(c,shared.ok);
    }
    

    在命令执行前,会将要执行的命令发送给所有的Monitor:

    /* Call() is the core of Redis execution of a command.
     */
    void call(client *c, int flags) {
        long long dirty;
        ustime_t start, duration;
        int client_old_flags = c->flags;
        struct redisCommand *real_cmd = c->cmd;
    
        server.fixed_time_expire++;
    
        /* Sent the command to clients in MONITOR mode, only if the commands are
         * not generated from reading an AOF. */
        if (listLength(server.monitors) &&
            !server.loading &&
            !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
        {
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
        }
        
    }
    

    replicationFeedMonitor函数会将命令进行封装,使用addReply函数将封装结果发送到所有monitor的output buffer中。

    void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
        listNode *ln;
        listIter li;
        int j;
        sds cmdrepr = sdsnew("+");
        robj *cmdobj;
        struct timeval tv;
    
        gettimeofday(&tv,NULL);
        cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
        if (c->flags & CLIENT_LUA) {
            cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
        } else if (c->flags & CLIENT_UNIX_SOCKET) {
            cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
        } else {
            cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
        }
    
        for (j = 0; j < argc; j++) {
            if (argv[j]->encoding == OBJ_ENCODING_INT) {
                cmdrepr = sdscatprintf(cmdrepr, ""%ld"", (long)argv[j]->ptr);
            } else {
                cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
                            sdslen(argv[j]->ptr));
            }
            if (j != argc-1)
                cmdrepr = sdscatlen(cmdrepr," ",1);
        }
        cmdrepr = sdscatlen(cmdrepr,"
    ",2);
        cmdobj = createObject(OBJ_STRING,cmdrepr);
    
        listRewind(monitors,&li);
        while((ln = listNext(&li))) {
            client *monitor = ln->value;
            addReply(monitor,cmdobj);
        }
        decrRefCount(cmdobj);
    }
    

    在将监控日志放入客户端output buffer前会检查当前buffer是否有足够空间来存放数据(_addReplyToBuffer),如果空间不足则跳过。当监控日志存入outpu buffer后,会异步检查output buffer的水位线(asyncCloseClientOnOutputBufferLimitReached)

    /* Add the object 'obj' string representation to the client output buffer. */
    void addReply(client *c, robj *obj) {
        if (prepareClientToWrite(c) != C_OK) return;
    
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
        } else if (obj->encoding == OBJ_ENCODING_INT) {
            /* For integer encoded strings we just convert it into a string
             * using our optimized function, and attach the resulting string
             * to the output buffer. */
            char buf[32];
            size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) != C_OK)
                _addReplyStringToList(c,buf,len);
        } else {
            serverPanic("Wrong obj->encoding in addReply()");
        }
    }
    
    /* -----------------------------------------------------------------------------
     * Low level functions to add more data to output buffers.
     * -------------------------------------------------------------------------- */
    
    int _addReplyToBuffer(client *c, const char *s, size_t len) {
        size_t available = sizeof(c->buf)-c->bufpos;
    
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
    
        /* If there already are entries in the reply list, we cannot
         * add anything more to the static buffer. */
        if (listLength(c->reply) > 0) return C_ERR;
    
        /* Check that the buffer has enough space available for this string. */
        if (len > available) return C_ERR;
    
        memcpy(c->buf+c->bufpos,s,len);
        c->bufpos+=len;
        return C_OK;
    }
    
    void _addReplyStringToList(client *c, const char *s, size_t len) {
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
    
        listNode *ln = listLast(c->reply);
        clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
    
        /* Note that 'tail' may be NULL even if we have a tail node, becuase when
         * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
         * fo fill it later, when the size of the bulk length is set. */
    
        /* Append to tail string when possible. */
        if (tail) {
            /* Copy the part we can fit into the tail, and leave the rest for a
             * new node */
            size_t avail = tail->size - tail->used;
            size_t copy = avail >= len? len: avail;
            memcpy(tail->buf + tail->used, s, copy);
            tail->used += copy;
            s += copy;
            len -= copy;
        }
        if (len) {
            /* Create a new node, make sure it is allocated to at
             * least PROTO_REPLY_CHUNK_BYTES */
            size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
            tail = zmalloc(size + sizeof(clientReplyBlock));
            /* take over the allocation's internal fragmentation */
            tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
            tail->used = len;
            memcpy(tail->buf, s, len);
            listAddNodeTail(c->reply, tail);
            c->reply_bytes += tail->size;
        }
        asyncCloseClientOnOutputBufferLimitReached(c);
    }
    

    当水位线超过阈值后,会异步关闭客户端,防止output buffer内存消耗过多引发实例异常。

    /* Asynchronously close a client if soft or hard limit is reached on the
     * output buffer size. The caller can check if the client will be closed
     * checking if the client CLIENT_CLOSE_ASAP flag is set.
     *
     * Note: we need to close the client asynchronously because this function is
     * called from contexts where the client can't be freed safely, i.e. from the
     * lower level functions pushing data inside the client output buffers. */
    void asyncCloseClientOnOutputBufferLimitReached(client *c) {
        if (c->fd == -1) return; /* It is unsafe to free fake clients. */
        serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
        if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
        if (checkClientOutputBufferLimits(c)) {
            sds client = catClientInfoString(sdsempty(),c);
    
            freeClientAsync(c);
            serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
            sdsfree(client);
        }
    }
    

    参考资料

    一文了解 Redis 内存监控和内存消耗

    client-output-buffer-limit限制导致主从断开

    redis源码浅析--监视器的实现

  • 相关阅读:
    LeetCode 104
    LeetCode 100
    LeetCode 27
    LeetCode 7
    LeetCode 8
    蘑菇街2017春招笔试
    codeforces 5D
    codeforces 5C
    codeforces 875B
    codeforces 876B
  • 原文地址:https://www.cnblogs.com/gaogao67/p/15086379.html
Copyright © 2011-2022 走看看