zoukankan      html  css  js  c++  java
  • Redis 消息中间件 ServiceStack.Redis 轻量级

    问题:

    • 公司开了个新项目,算上我一共3个人。车间里机台通过流水线连通联动的玩意。一个管理控制系统连接各个机台和硬件。专机类型就有5种,个数差不多20个左右。

    • 软件规划的时候采用总分的结构,管理控制系统和专机子系统之间通过消息中间件通讯。本来也想TCP连接来着,但是开发时间不允许。而且每个系统都得写一遍这个玩意。

    • 消息中间件有很多个,比如 Kafka、RabbitMQ、RocketMQ等国内外的消息中间件。这些中间件无论宣称的多么轻量级都要啃一下,更要命的是就他娘三个人。而且后面还要这个鸡儿系统可复制。

    • 考虑到消息及时性、开发难易程度、维护简便性等因素后决定用Redis的pub/sub功能来实现.软件结构大概如类似结构。

    可用性:

    • 作为消息通知属于安装了Redis就有的功能,因为Redis是用在系统中存储一些热数据,不用单独维护,在Windows中属于服务直接就开了。

    • 作为可以分布式集群使用的数据库,消息传递应该比较OK了。虽然使用的client-server,但是server-server已经很好了。料想client-server也不会差

    • 试验消息内容发送订阅的情况下,速度在30毫秒内,貌似可以。看其他博主说大于10K入队比较慢,但是可以不用入消息队列啊,用发布订阅。

    • .net 下一般使用ServiceStack.Redis,要命的是4.0以后收费,可以破解的但是不支持List<T>型的数据直接存取,想用只能变成JSON字符串存着。

    • 如果只是用订阅发布功能,不存储热数据或者不使用List<T>的数据可以使用4.0以上的版本。文末会贴上两个类型的下载包。想用其他的包也可以,我这里只说一种思路。

    实现:

    模块结构图展示如下

    public static class MSServer
        {
            // 定义一个object对象
            private static object objinstance = new object();
    
            private static ServerState CurState = ServerState.Free;
    
            static PooledRedisClientManager prcm;
    
            private static string clientmake = string.Empty;
    
            /// <summary>
            /// 连接的地址
            /// </summary>
            /// <param name="IP">地址127.0.0.1:6379</param>
            /// <param name="rechannels">接收通道 {"channel:1-13","channel:1-5"}</param>
            /// <returns></returns>
            public static int OpenServer(string IP ,string[] rechannels)
            {
                try
                {
                    if (prcm == null)
                    {
                        lock (objinstance)
                        {
                            if (prcm == null)
                            {
                                prcm = CreateManager(IP, IP);
                                CurState = ServerState.Init;
                                return CreateLink(rechannels);
                            }
                        }
                    }
                }
                catch
                {
                    prcm = null;
                    CurState = ServerState.Free;
                    return -1;
                }
                return 1;
            }
    
            private static int CreateLink(string[] SourceID)
            {
                if (CurState == ServerState.Init && SourceID.Length > 0)
                {
                    try
                    {
                        using (IRedisClient Redis = prcm.GetReadOnlyClient())
                        {
                            clientmake = SourceID[0];
                            var info = Redis.GetClientsInfo().Where(i => i["name"] == clientmake).ToList();
                            info.ForEach(i =>
                            {
                                Redis.KillClient(i["addr"]);
                            });
                            Redis.SetClient(clientmake);
                            IRedisSubscription sc = Redis.CreateSubscription();
                            Task.Run(() =>
                            {
                                try
                                {
                                    sc.SubscribeToChannels(SourceID);
                                }
                                catch { }
                            });
                            sc.OnMessage += new Action<string, string>(showpub);
                        }
                        CurState = ServerState.Work;
                    }
                    catch
                    {
                        string message = string.Empty;
                        prcm = null;
                        CurState = ServerState.Free;
                        return -1;
                    }
                    return 1;
                }
                else
                {
                    return 0;
                }
            }
    
    
            public static Action<string, string> ReceiveMessage;
            static void showpub(string channel, string message)
            {
                if (ReceiveMessage != null)
                {
                    ReceiveMessage(channel, message);
                }
            }
    
            private static PooledRedisClientManager CreateManager(string writeHost, string readHost)
            {
                var redisClientConfig = new RedisClientManagerConfig
                {
                    MaxWritePoolSize = 1,//“写”链接池链接数
                    MaxReadPoolSize = 1,//“读”链接池链接数
                    DefaultDb = 0,
                    AutoStart = true,
                };
                //读的客户端只能接受特定的命令,不能用于发送信息
                var RedisClientManager = new PooledRedisClientManager(
                    new string[] { writeHost }//用于写
                    , new string[] { readHost }//用于读
                    , redisClientConfig);
                CurState = ServerState.Init;
    
                return RedisClientManager;
            }
            /// <summary>
            /// 发送信息
            /// </summary>
            /// <param name="channel">通讯对象 "channel:1-13"</param>
            /// <param name="meesage">发送信息 "test send "</param>
            /// <returns>0 发送失败 1 发送成功 -1 连接损毁 检查网络后重建</returns>
            public static long PubMessage(string channel, string meesage)
            {
                if (CurState == ServerState.Work)
                {
                    if (!string.IsNullOrEmpty(channel) && !string.IsNullOrEmpty(meesage))
                    {
                        try
                        {
                            using (IRedisClient Redis = prcm.GetClient())
                            {
                                Redis.SetClient(clientmake);
                                return Redis.PublishMessage(channel, meesage);
                            }
                        }
                        catch
                        {
                            prcm = null;
                            CurState = ServerState.Free;
                            return -1;
                        }
                    }
                    else
                    {
                        return 0;
                    }
                }
                else
                {
                    return -1;
                }
            }
        }
    
    public enum ServerState
        {
            Free,
            Init,
            Work,
            Del
        }
        

    有一个问题,就是连接远程的服务器时如果网络断开再重连,会残留没用的client ,这样如果网络断断续续的话,会留好多没有清除的客户端。

    这个在3.0.504版本中Redis 中也有这个问题,不知道是基于什么考虑的。所以需要建立连接的时候,给个客户端名称,再初始化的时候删掉所有同类型的名称。

    使用的时候大概类似操作 textbox2.text = "channel:1-5" .为了简便发布的和监听的都是本地的一个通道。

    private void button1_Click(object sender, EventArgs e)
            {
    
                //11.1.7.152   192.168.12.173
                int result = ServerMS.MSServer.OpenServer("127.0.0.1:6379", new string[] { textBox2.Text });
                label1.Text = result.ToString();
                //1匿名事件
                ServerMS.MSServer.ReceiveMessage += new Action<string, string>(fuck);
    
                if (result == 0)
                {
                    //发送失败重新发送 检查 通道和字符串后重新发送
                }
                else if (result == 1)
                {
                    //发送成功
                }
                else if (result == -1)
                {
                    //连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
                }
    
            }
    
            void fuck(string channel, string message)
            {
                this.BeginInvoke(new Action(() =>
                {
                    textBox4.Text = channel + message;
                }));
            }
            public bool sdfsd = true;
    
            private void button3_Click(object sender, EventArgs e)
            {long result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString("yyyyyMMddhhmmssfff"));
    
                if (result == 0)
                    {
                        //发送失败重新发送
                    }
                else if (result == 1)
                    {
                                //发送成功
                    }
                else if (result == -1)
                     {
                      //连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
                     }
            }

    为了简便channel:是通道的固定命令 ,可以自定义channel:后面的内容,发送就有反馈。确保所有机台都接收到。

    如果有断线的需要程序自己重连,接收通道的客户端不可以再给其他的使用,Redis上说Redis client 进入订阅模式时只能接受订阅发布等命令指令,不接受普通的存取和其他命令

    所以如果需要在读取、写入、发布、执行其他的指令需要使用其他客户端,否则就出错了。跑了几天了上亿次的测试貌似没有出现什么问题。

     

    发布订阅消息不会走AOF RDB只存在于内存中,即发即用,用完就没了。没在线就没了。需要考虑使用环境。

    还用ping pong来确定连接状态,也可以自定义数据,使用场景要自己开发,要适合自己的才是好的。

    下载:

    4.0 dll   

    链接:https://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
    提取码:js8p

    5.8 dll不可以使用List<T>类型

    链接:https://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g
    提取码:bxh2

  • 相关阅读:
    BZOJ 1854 [Scoi2010]游戏
    【模板】二分图匹配-匈牙利算法
    BZOJ 1432 [ZJOI2009]Function
    BZOJ 1192 [HNOI2006]鬼谷子的钱袋
    BZOJ 1088 [SCOI2005]扫雷Mine
    BZOJ 1047 [HAOI2007]理想的正方形
    BZOJ 1034 [ZJOI2008]泡泡堂BNB
    BZOJ 1022 [SHOI2008]小约翰的游戏John
    LOJ 6278 数列分块入门2
    【BZOJ 1003】[ZJOI2006]物流运输(Dijkstra+DP)
  • 原文地址:https://www.cnblogs.com/qiaqia-liu/p/12938466.html
Copyright © 2011-2022 走看看