zoukankan      html  css  js  c++  java
  • Redis阻塞队列原理学习

    转载请注明来自 http://www.cnblogs.com/pengyu2003/p/4864918.html 

    1.redis介绍

    Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

    2.阻塞队列

    使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

    3.redis的阻塞队列

    redis提供远程访问模式,本文以redis远程阻塞从队列中取数据为例,介绍redis阻塞实现原理。

    /*-----------------------------------------------------------------------------
     * Blocking POP operations
     *----------------------------------------------------------------------------*/
    
    /* This is how the current blocking POP works, we use BLPOP as example:
     * - If the user calls BLPOP and the key exists and contains a non empty list
     *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
     *   if blocking is not required.
     * - If instead BLPOP is called and the key does not exists or the list is
     *   empty we need to block. In order to do so we remove the notification for
     *   new data to read in the client socket (so that we'll not serve new
     *   requests if the blocking request is not served). Also we put the client
     *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
     *   blocking for this keys.
     * - If a PUSH operation against a key with blocked clients waiting is
     *   performed, we mark this key as "ready", and after the current command,
     *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
     *   for this list, from the one that blocked first, to the last, accordingly
     *   to the number of elements we have in the ready list.
     */
    

      在调用阻塞队列取操作时,队列中无数据时才会真正的触发代码的阻塞分支。在客户端,redis对新来的读请求删除了标记(通知),这样知道阻塞的请求在获得服务前,新来的读请求都不能够被正常的服务。

    blpop

    void blpopCommand(redisClient *c) {
        blockingPopGenericCommand(c,REDIS_HEAD);
    }
    

      队列的左右出队对应队列(list)的表头和表尾。

    blockingPopGenericCommand

    /* Blocking RPOP/LPOP */
    void blockingPopGenericCommand(redisClient *c, int where) {
        robj *o;
        mstime_t timeout;
        int j;
    
        if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
            != REDIS_OK) return;
    
        for (j = 1; j < c->argc-1; j++) {
            o = lookupKeyWrite(c->db,c->argv[j]);
            if (o != NULL) {
                if (o->type != REDIS_LIST) {
                    addReply(c,shared.wrongtypeerr);
                    return;
                } else {
                    if (listTypeLength(o) != 0) {
                        /* Non empty list, this is like a non normal [LR]POP. */
                        char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
                        robj *value = listTypePop(o,where);
                        redisAssert(value != NULL);
    
                        addReplyMultiBulkLen(c,2);
                        addReplyBulk(c,c->argv[j]);
                        addReplyBulk(c,value);
                        decrRefCount(value);
                        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
                                            c->argv[j],c->db->id);
                        if (listTypeLength(o) == 0) {
                            dbDelete(c->db,c->argv[j]);
                            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                                c->argv[j],c->db->id);
                        }
                        signalModifiedKey(c->db,c->argv[j]);
                        server.dirty++;
    
                        /* Replicate it as an [LR]POP instead of B[LR]POP. */
                        rewriteClientCommandVector(c,2,
                            (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
                            c->argv[j]);
                        return;
                    }
                }
            }
        }
    
        /* If we are inside a MULTI/EXEC and the list is empty the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & REDIS_MULTI) {
            addReply(c,shared.nullmultibulk);
            return;
        }
    
        /* If the list is empty or the key does not exists we must block */
        //这里才是阻塞的关键    
        blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
    }
    

      

    阻塞

    /* Set a client in blocking mode for the specified key, with the specified
     * timeout */
    void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
        dictEntry *de;
        list *l;
        int j;
    
        c->bpop.timeout = timeout;
        c->bpop.target = target;
    
        if (target != NULL) incrRefCount(target);
    
        for (j = 0; j < numkeys; j++) {
            /* If the key already exists in the dict ignore it. */
            if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
            incrRefCount(keys[j]);
    
            /* And in the other "side", to map keys -> clients */
            de = dictFind(c->db->blocking_keys,keys[j]);
            if (de == NULL) {
                int retval;
    
                /* For every key we take a list of clients blocked for it */
                l = listCreate();
                retval = dictAdd(c->db->blocking_keys,keys[j],l);
                incrRefCount(keys[j]);
                redisAssertWithInfo(c,keys[j],retval == DICT_OK);
            } else {
                l = dictGetVal(de);
            }
            listAddNodeTail(l,c);
        }
        blockClient(c,REDIS_BLOCKED_LIST);
    }
    

      

  • 相关阅读:
    vue.js环境的搭建
    图片上传简单demo及springboot上传图片
    mybatise 模糊查询
    thymeleaf th:onclick 传参
    thymeleaf的特殊属性赋值
    无限分类的设计及前后台代码
    mysql 多个属性排序查询
    java添加对象成功后想知道当前添加对象的id
    SpringBoot2.x集成MQTT实现消息推送
    linux下安装MQTT服务器
  • 原文地址:https://www.cnblogs.com/pengyu2003/p/4864918.html
Copyright © 2011-2022 走看看