zoukankan      html  css  js  c++  java
  • REDIS 主从复制

    REDIS目前给出了一个异步的主从复制版本系统。在redis里 提供了几种方式来完成这个工作。 主从复制主要对应在redis/replication.c这个文件里。源码框架里 分为3部分: Master部分/SLAVE部分/复制核心部分

    其实主从复制我个人觉得比较难的点就是在于每次重启之后 master/slave传递数据的模式方式

    首先对于slave来讲 是主动连接他的master

    复制代码
    int connectWithMaster(void) {
        int fd;
    
        fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
        if (fd == -1) {
            redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
                strerror(errno));
            return REDIS_ERR;
        }
    
        if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
                AE_ERR)
        {
            close(fd);
            redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
            return REDIS_ERR;
        }
    
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_s = fd;
        server.repl_state = REDIS_REPL_CONNECTING;
        return REDIS_OK;
    }
    复制代码

    aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) 就是注册一个可读和可写的事件  注意处理事件函数是syncwithMaster.

    rep_state状态就变成了REDIS_PREPL_CONNECTING.相关的server的replication做出相应的调整。

    现在我们就进入syncwithMaster进去看看:

    复制代码
    if (server.repl_state == REDIS_REPL_CONNECTING) {
         redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
         /* Delete the writable event so that the readable event remains
          * registered and we can wait for the PONG reply. */
         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
         server.repl_state = REDIS_REPL_RECEIVE_PONG;
         /* Send the PING, don't check for errors at all, we have the timeout
          * that will take care about this. */
         syncWrite(fd,"PING
    ",6,100);
         return;
     }
    复制代码

    首先客户端slave 发送一个PING给server master 这个是带有超时的一个回应, 状态就改成了REDIS_REPL_RECEIVE_PONG 按理来说Master收到了会做出相应的动作。对于slave端而言 下一步就是REDIS_REPL_RECEIVE_PONG这个状态了。其实就是准备接受某个值了

    复制代码
    buf[0] = '';
            if (syncReadLine(fd,buf,sizeof(buf),
                server.repl_syncio_timeout*1000) == -1)
            {
                redisLog(REDIS_WARNING,
                    "I/O error reading PING reply from master: %s",
                    strerror(errno));
                goto error;
            }
    复制代码

    这是PONG状态下的做的核心事情:读出来 然后判断是否是有相应的内容。

    复制代码
    if (syncWrite(fd,"SYNC
    ",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
      ……if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
                == AE_ERR)
    复制代码

    给MASETER发送一个SYNC 然后就进入可读状态 注册了一个readSyncBulkPayload。等下来看看这个事件函数  后面要做的事情就是设置相应的位了:

    复制代码
    server.repl_state = REDIS_REPL_TRANSFER;
        server.repl_transfer_size = -1;
        server.repl_transfer_read = 0;
        server.repl_transfer_last_fsync_off = 0;
        server.repl_transfer_fd = dfd;
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_tmpfile = zstrdup(tmpfile);
    复制代码

    repl_transfer_size设置-1 表示从master收到的文件大小为-1 。状态变成了REPL_TRANSFER。 现在进入readSyncBulkPayload看看这个函数是怎么接受的:

    server.repl_transfer_size = strtol(buf+1,NULL,10);

    首先确定了对方要发送多大的文件 然后读到buf 在写到rdb相应的文件里面。

    复制代码
    left = server.repl_transfer_size - server.repl_transfer_read;
        readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
        nread = read(fd,buf,readlen);
    ...............................................................
    ...............................................................
    ...............................................................
    write(server.repl_transfer_fd,buf,nread) != nread) 
    server.repl_transfer_read += nread;
    /* Check if the transfer is now complete */
        if (server.repl_transfer_read == server.repl_transfer_size) {
            if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
                redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
                replicationAbortSyncTransfer();
                return;
            }
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
            signalFlushedDb(-1);
            emptyDb();
            /* Before loading the DB into memory we need to delete the readable
             * handler, otherwise it will get called recursively since
             * rdbLoad() will call the event loop to process events from time to
             * time for non blocking loading. */
            aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
            if (rdbLoad(server.rdb_filename) != REDIS_OK) {
                redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
                replicationAbortSyncTransfer();
                return;
            }
    复制代码

    如果2者是相等的 read和transfer_size相等 首先替换名字  替换成rdb_filename名字  然后清空db emptyDb()销毁可读事件  最后调用rdbLoad在本地重新构建一个key_value数据库副本。【master的】 所有的动作的操作都已经完成了 。这里的发送大小双方应该会有一个限定。我们可以从master部分来找到相应的事件出来:

    对于主服务器来讲 除了客户端发送了一个PING之后期望得到主机的一个回复之外  真正对这个主从复制有用的应该是从服务器这个操作:

    write(fd,“SYNC ”,buf).这个动作一发出: master就会调用SYNCcommand()来完成相应的拷贝动作: 首先进入SYNCcommand()函数进去看看是一个什么情况:

    要完成复制 首先要在一个合适的时机:master进入了一个bgsave操作。要保证rdb文件是一个最新的文件。 对于master而言 先看看rdb_pid!=-1如果条件满足 表明正在做这个操作 master只需要等完成了才做相应的动作 而如果不是sync就会触发一个bgsave操作。然后对于主进程而言: 都会设置状态为:WAIT_BGSAVE_END.这个时候syncCommand就完成了 而复制操作还没有开始 进行往下面看

    而做bgsaveCommand操作时 都会调用一个function:updateSlavesWaitingBgsave 这样就不会出现同步等待现象了。

    if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||

    打开相应的repldbfd,准备复制文件了:

    aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR)

    注册了一个sendBulkToSlave.发给salve 这个部分要注意好:

    需要注意的点: 1) 发送缓冲区大小怎么设置 和slave是不是设置一样 2)发送是一个同步的过程还是异步的过程

    REDIS_IOBUF_LEN: 1024*16 这个变量就是一次读的rdb量  还是进入sendBulkToSlave()看看:

    复制代码
    {
        redisClient *slave = privdata;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(mask);
        char buf[REDIS_IOBUF_LEN];
        ssize_t nwritten, buflen;
    
        if (slave->repldboff == 0) {
            /* Write the bulk write count before to transfer the DB. In theory here
             * we don't know how much room there is in the output buffer of the
             * socket, but in practice SO_SNDLOWAT (the minimum count for output
             * operations) will never be smaller than the few bytes we need. */
            sds bulkcount;
    
            bulkcount = sdscatprintf(sdsempty(),"$%lld
    ",(unsigned long long)
                slave->repldbsize);
            if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
            {
                sdsfree(bulkcount);
                freeClient(slave);
                return;
            }
            sdsfree(bulkcount);
        }
        lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
        buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
        if (buflen <= 0) {
            redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
                (buflen == 0) ? "premature EOF" : strerror(errno));
            freeClient(slave);
            return;
        }
        if ((nwritten = write(fd,buf,buflen)) == -1) {
            redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
                strerror(errno));
            freeClient(slave);
            return;
        }
        slave->repldboff += nwritten;
        if (slave->repldboff == slave->repldbsize) {
            close(slave->repldbfd);
            slave->repldbfd = -1;
            aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
            slave->replstate = REDIS_REPL_ONLINE;
            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
                sendReplyToClient, slave) == AE_ERR) {
                freeClient(slave);
                return;
            }
            redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
        }
    }
    复制代码

    如果repldoff==0 表明是第一次初始化  也就是会发送一个应该发送的长度数据给对方slave.这是第一次发送。注意这里调用write如果成功之后会继续。lseek(slave->repldbfd,slave->repldboff,SEEK_SET); 每次会定位到相应的位置,这个非常恼火 调用磁盘的一个随机操作,比较耗时 如果文件很大 对性能影响比较大。 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); 然后读到内存中 然后在做write操作。因为没有关闭事件模型,所以EPOLL轮询时都会认为这个事件还需要执行:还是准备好的,所以继续调用这个函数,从本质上来讲 可以算是一个异步的操作。所以不会出现服务的一个中断现象,但是lseek是比较耗时的,在复制完成了 关闭fd的可读状态 并且把replstate状态标记成REPL_ONLINE,这个状态就是命令传播状态。注册了一个一个新的函数sendReplyToClient,当然把之前的函数事件del掉。所以每次Server端给的buf是比slave端小很多.主从复制核心就是这里了。

  • 相关阅读:
    Java String 字符串操作小结
    找到一篇关于 Oracle 全文检索实践 的文章
    Java中Array与ArrayList的主要区别
    Java使用Array类创建多维数组
    [例] 用MappedByteBuffer更新文件内容
    java nio 之MappedByteBuffer
    Java.util.Properties类
    Oracle外连接与条件的组合
    Oracle 树形SQL语句,SYS_CONNECT_BY_PATH 函数
    SQL Connect By 的例子
  • 原文地址:https://www.cnblogs.com/lyl6796910/p/3958798.html
Copyright © 2011-2022 走看看