zoukankan      html  css  js  c++  java
  • Redis源码分析(二十九)--- bio后台I/O服务的实现

             在Redis系统中也存在后台服务的概念,background Service,后台线程在Redis中的表现主要为background I/O Service,有了后台线程的支持,系统在执行的效率上也势必会有不一样的提高。在Redis代码中,描述了此功能的文件为bio.c,同样借此机会学习一下,在C语言中的多线程编程到底是怎么一回事。我们先来看看,在Redis中的background job的工作形式;

    /* Background I/O service for Redis.
     *
     * 后台I/O服务
     * This file implements operations that we need to perform in the background.
     * Currently there is only a single operation, that is a background close(2)
     * system call. This is needed as when the process is the last owner of a
     * reference to a file closing it means unlinking it, and the deletion of the
     * file is slow, blocking the server.
     *
     * In the future we'll either continue implementing new things we need or
     * we'll switch to libeio. However there are probably long term uses for this
     * file as we may want to put here Redis specific background tasks (for instance
     * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
     * implementation).
     *
     * DESIGN
     * ------
     *
     * The design is trivial, we have a structure representing a job to perform
     * and a different thread and job queue for every job type.
     * Every thread wait for new jobs in its queue, and process every job
     * sequentially.
     *
     * Jobs of the same type are guaranteed to be processed from the least
     * recently inserted to the most recently inserted (older jobs processed
     * first).
     *
     * Currently there is no way for the creator of the job to be notified about
     * the completion of the operation, this will only be added when/if needed.
     *
     * 作者定义了一个结构体代表一个工作,每个线程等待从相应的job Type工作队列中获取一个job,每个job的排列的都按照时间
     * 有序排列的
     * ----------------------------------------------------------------------------
    这里总共与2种Background I/O Type:

    /* Background job opcodes */
    /* 定义了2种后台工作的类别 */
    #define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall.文件的关闭 */
    #define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync.AOF文件的同步 */ 
    /* BIO后台操作类型总数为2个 */
    #define REDIS_BIO_NUM_OPS       2
    
    一个是AOF文件的同步操作,AOF就是“Append ONLY File”的缩写,记录每次的数据改变的写操作,用于数据的恢复。还有一个我好像没碰到过,CLOSE FILE,难道是异步关闭文件的意思。

    static pthread_t bio_threads[REDIS_BIO_NUM_OPS]; /* 定义了bio线程组变量 */
    static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; /* 线程相对应的mutex变量,用于同步操作 */
    static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
    static list *bio_jobs[REDIS_BIO_NUM_OPS]; /* 每种job类型都是一个列表 */
    /* The following array is used to hold the number of pending jobs for every
     * OP type. This allows us to export the bioPendingJobsOfType() API that is
     * useful when the main thread wants to perform some operation that may involve
     * objects shared with the background thread. The main thread will just wait
     * that there are no longer jobs of this type to be executed before performing
     * the sensible operation. This data is also useful for reporting. */
    static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];   /* 此类型job等待执行的数量 */
    
    /* This structure represents a background Job. It is only used locally to this
     * file as the API does not expose the internals at all. */
    /* background Job结构体 */
    struct bio_job {
    	//job创建的时间
        time_t time; /* Time at which the job was created. */
        /* Job specific arguments pointers. If we need to pass more than three
         * arguments we can just pass a pointer to a structure or alike. */
        /* job特定参数指针 */
        void *arg1, *arg2, *arg3;
    };
    
    上面声明了一些变量,包括bio_threads线程数组,总数2个,bio_jobs列表数组,存放每种Type的job。下面我们看主要的一些方法:

    /* Exported API */
    void bioInit(void); /* background I/O初始化操作 */
    void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); /* 创建后台job,通过传入的3个参数初始化 */
    unsigned long long bioPendingJobsOfType(int type); /* 返回type类型的job正在等待被执行的个数 */
    void bioWaitPendingJobsLE(int type, unsigned long long num); /* 返回type类型的job正在等待被执行的个数 */
    time_t bioOlderJobOfType(int type); 
    void bioKillThreads(void); /* 杀死后台所有线程 */
    首先看初始化操作;

    /* Initialize the background system, spawning the thread. */
    /* background I/O初始化操作 */
    void bioInit(void) {
        pthread_attr_t attr;
        pthread_t thread;
        size_t stacksize;
        int j;
    
        /* Initialization of state vars and objects */
        for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
            pthread_mutex_init(&bio_mutex[j],NULL);
            pthread_cond_init(&bio_condvar[j],NULL);
            //创建每个job类型的List列表
            bio_jobs[j] = listCreate();
            bio_pending[j] = 0;
        }
    
        /* Set the stack size as by default it may be small in some system */
        //设置线程栈空间
        pthread_attr_init(&attr);
        pthread_attr_getstacksize(&attr,&stacksize);
        if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
        while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
        pthread_attr_setstacksize(&attr, stacksize);
    
        /* Ready to spawn our threads. We use the single argument the thread
         * function accepts in order to pass the job ID the thread is
         * responsible of. */
        for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
            void *arg = (void*)(unsigned long) j;
            //创建2个线程,专门运行相应类型的job
            if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
                redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
                exit(1);
            }
            //赋值到相应的Thread中
            bio_threads[j] = thread;
        }
    }
    
    也就是说,执行完上述的操作之后,在bio_threads线程中就运行着2个线程,从各自的job列表中取出相应的等待执行的jo;

    /* 创建后台job,通过传入的3个参数初始化 */
    void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
        struct bio_job *job = zmalloc(sizeof(*job));
    
        job->time = time(NULL);
        job->arg1 = arg1;
        job->arg2 = arg2;
        job->arg3 = arg3;
        pthread_mutex_lock(&bio_mutex[type]);
        //加入相对应的job type列表
        listAddNodeTail(bio_jobs[type],job);
        //等待的job数量增加1
        bio_pending[type]++;
        pthread_cond_signal(&bio_condvar[type]);
        pthread_mutex_unlock(&bio_mutex[type]);
    }
    简洁的创建background job操作,上面利用了mutex变量实现了线程同步操作,保证线程安全。下面看一下最重要的执行background Job的操作实现(省略了部分代码):

    /* 执行后台的job,参数内包含着哪种type */
    void *bioProcessBackgroundJobs(void *arg) {
       	......
        while(1) {
            listNode *ln;
    
            /* The loop always starts with the lock hold. */
            if (listLength(bio_jobs[type]) == 0) {
                pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
                continue;
            }
            /* Pop the job from the queue. */
            //从工作列表中取出第一个job
            ln = listFirst(bio_jobs[type]);
            job = ln->value;
            /* It is now possible to unlock the background system as we know have
             * a stand alone job structure to process.*/
            pthread_mutex_unlock(&bio_mutex[type]);
    
            /* Process the job accordingly to its type. */
            //执行具体的工作
            if (type == REDIS_BIO_CLOSE_FILE) {
                close((long)job->arg1);
            } else if (type == REDIS_BIO_AOF_FSYNC) {
                aof_fsync((long)job->arg1);
            } else {
                redisPanic("Wrong job type in bioProcessBackgroundJobs().");
            }
            zfree(job);
    
            /* Lock again before reiterating the loop, if there are no longer
             * jobs to process we'll block again in pthread_cond_wait(). */
            pthread_mutex_lock(&bio_mutex[type]);
            listDelNode(bio_jobs[type],ln);
            bio_pending[type]--;
        }
    }
    
    while循环,从队列中取出一个,执行一个操作。当然,如果想马上停止一切后台线程,可以执行下面的方法,调用
    pthread_cancel:

    /* Kill the running bio threads in an unclean way. This function should be
     * used only when it's critical to stop the threads for some reason.
     * Currently Redis does this only on crash (for instance on SIGSEGV) in order
     * to perform a fast memory check without other threads messing with memory. */
    /* 杀死后台所有线程 */
    void bioKillThreads(void) {
        int err, j;
    
        for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        	//调用pthread_cancel方法kill当前的后台线程
            if (pthread_cancel(bio_threads[j]) == 0) {
                if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
                    redisLog(REDIS_WARNING,
                        "Bio thread for job type #%d can be joined: %s",
                            j, strerror(err));
                } else {
                    redisLog(REDIS_WARNING,
                        "Bio thread for job type #%d terminated",j);
                }
            }
        }
    }
  • 相关阅读:
    maven学习
    存储过程的作用
    数据库优化
    Springmvc整合mybatis
    Spring Mvc简介
    Axis2开发实例
    Mybatis之typeAlias配置的3种方法
    Spring AOP教程及实例
    spring AOP底层原理实现——jdk动态代理
    Java实现动态代理的两种方式
  • 原文地址:https://www.cnblogs.com/bianqi/p/12184200.html
Copyright © 2011-2022 走看看