zoukankan      html  css  js  c++  java
  • redis StackExchange 主备 实现 demo

    网上关于redis高可用基本都是用redis-sentinel 哨兵 或者 redis cluster 集群来实现, 但是有没有更简单的方式,比如我现在就只有2个redis实例。我试验的结果是我们可用采用主备的方式来实现(我们的实际需求很简单,有2个redis实例分布在不同的计算机,在一个实例down掉后我们的应用程序有继续读写redis,主从配置可用手动修改)。需求很简单, 实现也就很简单。首先下载 https://github.com/StackExchange/StackExchange.Redis 源码。启动StackExchange.Redis-masterRedis Configs里面的主从2个实例,我最终demo的code如下:

     class Program
        {
            static IDatabase database;
            static ConnectionMultiplexer conn;
            static void Main(string[] args)
            {
                ConfigurationOptions option = new ConfigurationOptions() {
                    EndPoints =
                                {
                                    { "127.0.0.1", 6379 },
                                    { "127.0.0.1", 6380 }
                                },
                    AllowAdmin =true,                
                };
    
                 conn = ConnectionMultiplexer.Connect(option);
                database = conn.GetDatabase();
    
                Random rand = new Random();
               
    
                while (true)
                {
                    string val = "gavin_" + rand.Next(1, 999999).ToString();
                    TestWriteRead(val);
                    Thread.Sleep(100);
                }
    
            }
    
            static void TestWriteRead(string value) {
                string key = "gavinteststring";
                try
                {
                    database.StringSet(key, value);
                    Console.WriteLine($"写入{key}={value}成功");
                }
                catch (Exception ex)
                {
                    var points = conn.GetEndPoints();
                    foreach (var item in points)
                    {
                        var server = conn.GetServer(item);
                        if (server.IsConnected)
                        {
                            server.MakeMaster(ReplicationChangeOptions.All);
                        }
                        else
                        {
                            server.SlaveOf(points[1],CommandFlags.FireAndForget);
                        }
                        
                    }
                    database.StringSet(key, value);
                    Console.WriteLine($"写入{key}={value}成功");
                    //Console.WriteLine($"写入{key}={value}失败:"+ex.ToString());
                   // Console.ReadKey();
                }
                string temp = string.Empty;
                try
                {
                    temp=database.StringGet(key);
                    Console.WriteLine($"读取{key}={temp}成功");
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"读取{key}失败:" + ex.ToString());
                }
            }
        }
    View Code

    大家请先忽略我catch里面的code,当我把redis的master关闭后,程序报错:

    No connection is available to service this operation: SET gavinteststring; 远程主机强迫关闭了一个现有的连接。; IOCP: (Busy=0,Free=1000,Min=4,Max=1000), WORKER: (Busy=0,Free=1023,Min=4,Max=1023), Local-CPU: 100%

    找到源码后发现在ConnectionMultiplexer的ExecuteSyncImpl方法里面有这么一段:

    if (!TryPushMessageToBridge(message, processor, source, ref server))
    {
    throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, message.Command, message, server, GetServerSnapshot());
    }

    也就是说StackExchange没有找到redis的服务器实例,继续跟踪code发现具体查找server的code在ConnectionMultiplexer的AnyConnected方法里面:

     internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags)
            {
                var tmp = serverSnapshot;
                int len = tmp.Length;
                ServerEndPoint fallback = null;
                for (int i = 0; i < len; i++)
                {
                    var server = tmp[(int)(((uint)i + startOffset) % len)];
                    if (server != null && server.ServerType == serverType && server.IsSelectable(command))
                    {
                        if (server.IsSlave)
                        {
                            switch (flags)
                            {
                                case CommandFlags.DemandSlave:
                                case CommandFlags.PreferSlave:
                                    return server;
                                case CommandFlags.PreferMaster:
                                    fallback = server;
                                    break;
                            }
                        } else
                        {
                            switch (flags)
                            {
                                case CommandFlags.DemandMaster:
                                case CommandFlags.PreferMaster:
                                    return server;
                                case CommandFlags.PreferSlave:
                                    fallback = server;
                                    break;
                            }
                        }
                    }
                }
                return fallback;
            }

    因为主的server已经down掉了,所以可用访问的server就是Slave,但是这里的flags默认是CommandFlags.DemandMaster。所以是找不到server。那么我们把现在的从的server改为主的server如:  server.MakeMaster(ReplicationChangeOptions.All); 我以为就可以了,但是还是不行。 后来我想 如果我把主的也改为从是否可以了  server.SlaveOf(points[1],CommandFlags.FireAndForget);(我测试的时候还用过quit方法,调试有,但是release的时候说没有该方法)。运行效果如下

     后来把上面的code简单封装为一个方法:

      void ChangeMaster(IDatabase database)
            {
                var mex = database.Multiplexer;
                var endpoints = mex.GetEndPoints();
                if (endpoints.Count() < 2)
                {
                    return;
                }
                //多个endpoint 才切换主备服务器
                List<EndPoint> connectedPoints = new List<EndPoint>();
                List<EndPoint> disconnetedPoints = new List<EndPoint>();
                foreach (var item in endpoints)
                {
                    //判断哪些服务器可以连接
                    var server = mex.GetServer(item);
                    if (server.IsConnected)
                    {
                        connectedPoints.Add(item);
                    }
                    else
                    {
                        disconnetedPoints.Add(item);
                    }
                }
                var connectedPoint = connectedPoints.FirstOrDefault();
                if (connectedPoint == null)
                {
                    throw new Exception("没有可用的redis服务器");
                }
                mex.GetServer(connectedPoint).MakeMaster(ReplicationChangeOptions.All);
                for (int i = 1; i < connectedPoints.Count; i++)
                {
                    mex.GetServer(connectedPoints[i]).SlaveOf(connectedPoint, CommandFlags.FireAndForget);
                }
                foreach (var item in disconnetedPoints)
                {
                    mex.GetServer(item).SlaveOf(connectedPoint, CommandFlags.FireAndForget);
                }
            }
    View Code

     -----------------------------------------2017-4-14--------------------------------------------------------

    我们知道读写redis的时候都是Message包

    protected Message(int db, CommandFlags flags, RedisCommand command)
            {
                bool dbNeeded = RequiresDatabase(command);
                if (db < 0)
                {
                    if (dbNeeded)
                    {
                        throw ExceptionFactory.DatabaseRequired(false, command);
                    }
                }
                else
                {
                    if (!dbNeeded)
                    {
                        throw ExceptionFactory.DatabaseNotRequired(false, command);
                    }
                }
    
                bool masterOnly = IsMasterOnly(command);
                Db = db;
                this.command = command;
                this.flags = flags & UserSelectableFlags;
                if (masterOnly) SetMasterOnly();
    
                createdDateTime = DateTime.UtcNow;
                createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
            }
     internal void SetMasterOnly()
            {
                switch (GetMasterSlaveFlags(flags))
                {
                    case CommandFlags.DemandSlave:
                        throw ExceptionFactory.MasterOnly(false, command, null, null);
                    case CommandFlags.DemandMaster:
                        // already fine as-is
                        break;
                    case CommandFlags.PreferMaster:
                    case CommandFlags.PreferSlave:
                    default: // we will run this on the master, then
                        flags = SetMasterSlaveFlags(flags, CommandFlags.DemandMaster);
                        break;
                }
            }
            internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, CommandFlags masterSlave)
            {
                // take away the two flags we don't want, and add back the ones we care about
                return (everything & ~(CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave))
                                | masterSlave;
            }

    这里根据我们的Command来判断是否必须是Master主库,如果是 就代用SetMasterOnly来设置flags,那么那些指令需要Master了:

     public static bool IsMasterOnly(RedisCommand command)
            {
                switch (command)
                {
                    case RedisCommand.APPEND:
                    case RedisCommand.BITOP:
                    case RedisCommand.BLPOP:
                    case RedisCommand.BRPOP:
                    case RedisCommand.BRPOPLPUSH:
                    case RedisCommand.DECR:
                    case RedisCommand.DECRBY:
                    case RedisCommand.DEL:
                    case RedisCommand.EXPIRE:
                    case RedisCommand.EXPIREAT:
                    case RedisCommand.FLUSHALL:
                    case RedisCommand.FLUSHDB:
                    case RedisCommand.GETSET:
                    case RedisCommand.HDEL:
                    case RedisCommand.HINCRBY:
                    case RedisCommand.HINCRBYFLOAT:
                    case RedisCommand.HMSET:
                    case RedisCommand.HSET:
                    case RedisCommand.HSETNX:
                    case RedisCommand.INCR:
                    case RedisCommand.INCRBY:
                    case RedisCommand.INCRBYFLOAT:
                    case RedisCommand.LINSERT:
                    case RedisCommand.LPOP:
                    case RedisCommand.LPUSH:
                    case RedisCommand.LPUSHX:
                    case RedisCommand.LREM:
                    case RedisCommand.LSET:
                    case RedisCommand.LTRIM:
                    case RedisCommand.MIGRATE:
                    case RedisCommand.MOVE:
                    case RedisCommand.MSET:
                    case RedisCommand.MSETNX:
                    case RedisCommand.PERSIST:
                    case RedisCommand.PEXPIRE:
                    case RedisCommand.PEXPIREAT:
                    case RedisCommand.PFADD:
                    case RedisCommand.PFMERGE:
                    case RedisCommand.PSETEX:
                    case RedisCommand.RENAME:
                    case RedisCommand.RENAMENX:
                    case RedisCommand.RESTORE:
                    case RedisCommand.RPOP:
                    case RedisCommand.RPOPLPUSH:
                    case RedisCommand.RPUSH:
                    case RedisCommand.RPUSHX:
                    case RedisCommand.SADD:
                    case RedisCommand.SDIFFSTORE:
                    case RedisCommand.SET:
                    case RedisCommand.SETBIT:
                    case RedisCommand.SETEX:
                    case RedisCommand.SETNX:
                    case RedisCommand.SETRANGE:
                    case RedisCommand.SINTERSTORE:
                    case RedisCommand.SMOVE:
                    case RedisCommand.SPOP:
                    case RedisCommand.SREM:
                    case RedisCommand.SUNIONSTORE:
                    case RedisCommand.ZADD:
                    case RedisCommand.ZINTERSTORE:
                    case RedisCommand.ZINCRBY:
                    case RedisCommand.ZREM:
                    case RedisCommand.ZREMRANGEBYLEX:
                    case RedisCommand.ZREMRANGEBYRANK:
                    case RedisCommand.ZREMRANGEBYSCORE:
                    case RedisCommand.ZUNIONSTORE:
                        return true;
                    default:
                        return false;
                }
            }

    如果我们执行脚本则是用的ScriptEvalMessage类,其构造函数:

       private sealed class ScriptEvalMessage : Message, IMultiMessage
            {
                private readonly RedisKey[] keys;
                private readonly string script;
                private readonly RedisValue[] values;
                private byte[] asciiHash, hexHash;
                public ScriptEvalMessage(int db, CommandFlags flags, string script, RedisKey[] keys, RedisValue[] values)
                    : this(db, flags, ResultProcessor.ScriptLoadProcessor.IsSHA1(script) ? RedisCommand.EVALSHA : RedisCommand.EVAL, script, null, keys, values)
                {
                    if (script == null) throw new ArgumentNullException(nameof(script));
                }
                public ScriptEvalMessage(int db, CommandFlags flags, byte[] hash, RedisKey[] keys, RedisValue[] values)
                    : this(db, flags, RedisCommand.EVAL, null, hash, keys, values)
                {
                    if (hash == null) throw new ArgumentNullException(nameof(hash));
                }
    
                private ScriptEvalMessage(int db, CommandFlags flags, RedisCommand command, string script, byte[] hexHash, RedisKey[] keys, RedisValue[] values)
                    : base(db, flags, command)
                {
                    this.script = script;
                    this.hexHash = hexHash;
    
                    if (keys == null) keys = RedisKey.EmptyArray;
                    if (values == null) values = RedisValue.EmptyArray;
                    for (int i = 0; i < keys.Length; i++)
                        keys[i].AssertNotNull();
                    this.keys = keys;
                    for (int i = 0; i < values.Length; i++)
                        values[i].AssertNotNull();
                    this.values = values;
                }     

    也就是说 执行脚本可以在从库上执行,很多查询语句也可以在从库上执行

  • 相关阅读:
    学习第五天
    第四天学习
    学习第三天
    学校键盘键位设置
    学习第二天
    fatal error C1902: 程序数据库管理器不匹配;请检查安装解决
    ffmpeg遇到inttypes.h和UINT64_C
    <ZZ>linux yum命令详解
    <ZZ>Linux rpm 命令参数使用详解[介绍和应用]
    转:Windows下WSH/JS实现SVN服务器钩子脚本阻止提交空日志信息和垃圾文件
  • 原文地址:https://www.cnblogs.com/majiang/p/6476823.html
Copyright © 2011-2022 走看看