zoukankan      html  css  js  c++  java
  • Lind.DDD.LindMQ~关于持久化到Redis的消息格式

    回到目录

    关于持久化到Redis的消息格式,主要是说在Broker上把消息持久化的过程中,需要存储哪些类型的消息,因为我们的消息是分topic的,而每个topic又有若干个queue组成,而我们的topic和queue由于redis存储结构的原因,我们需要将它们分区对应存储一下,而不能像关系型数据库那样灵活,所以要额外设计几个数据结构来存储它们。

    一 Topic字典

    二 Topic对应的Queue字典

    三 Queue里的消息

    四 某个客户端对应某个Queue的消费进度

    以上四个结构是我们要说的,它们会在推消息,拉消息,删消息时用到,下面一一介绍一下,讲的不好不对的地方,欢迎大家为大叔留言。

    一 Topic字典

    主要存储每个topic,它是一个set集合,redis的我集合类型之一,每个key是唯一的LindMq_Topic,值value就是我们客户端传来的具体topic的名字,这主要是在删除过期的消息时用的,主是作用是遍历所有的topic消息类型,这样我们在删除消息时,就可以把所有注册的topic都找到了,最后把过期的删除,默认消息存活周期是一天。

    删除过期的消息代码如下

     var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);
      foreach (var topic in topicList)
       {
         var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic);
             foreach (var queue in queueList)
               {
                var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd");
                  RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey);
                }
        }

    二 Topic对应的Queue字典

    我们知道,为了加大redis的并发量和吞吐量,我们会把大数据键值对设计成多个键,这就像是一个集群环境的sharing,就是将大数据进行分片,而我们的分片规则是采用按对象取模的方式,模数可以自己设置,比较我设置8,那说明我的队列(分片)最多可以被分为8个,这个大家可以去做测试,挺有意思的,比随机数来个直接!而这一次redis里的键就是某个topic,而值就是我们的topic加上队列索引,例如你的topic是zzl,那么队列里的键可能就是zzl0,zzl1,zzl2...

    三 Queue里的消息

    我们的生产者将消息发送到broker里,然后于broker将消息持久化到具体的存储介质里,当然这里我们用的是Redis,在存储在redis里时,我们的具体队列的键是有后缀的,这主要用于消息的回收,因为我们打算1天回收一次消息,所以我们的消息后缀是个日期变量,当然精确到天就可以了,它可以是这样键名LindMQ_order_Paid4_20161202,每个队列都有自己的后缀,我们在清除消息时也就有了方法了。我们的队列存储结构是比较特殊的sortedSet ,就是可排序的集合,它有权重的概念,我们刚好可以使用这个特性来记录客户端的消费进度,因为我们的权重值在一个redis键/值对里是唯一的。

    下面代码选自Push入队列的代码片断,分享给大家

           //存储当前Topic
                RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic);
    
                //要存储到哪个队列
                body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);
                var dataKey = body.Topic + body.QueueId;
                RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey);
    
                //记录偏移
                var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));
                body.QueueOffset = offset + 1;
    
                //存储消息
                RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(
                    GetRedisDataKey(dataKey),
                    Utils.SerializeMemoryHelper.SerializeToJson(body),
                    score: body.QueueOffset);

    四 某个客户端对应某个Queue的消费进度

    消费进度是一个很麻烦的问题,生产者的消息是可以被多个消费者消费的,所以不能使用.net那种简单的Queue机制,出队列后就消失了,这是不靠谱的,万一消失失败了,也会造成消息的丢失!下面我们主要看一下消费进度的存储,它是一个Hash集合,其中redis的键名是LindMQ_ConsumerOffset,而value是一个hash对象,hash里的key是当前队列名+消费者IP地址的hashcode值,hash里的value是这个消费者(客户端)的消费进度(Queue里的权重,Queue的存储结构是一个sortedSet)。

    客户端消费的测试代码

                #region Client-LindMQ
                var consumer = new ConsumerSetting
                {
                    BrokenName = "test",
                    BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406),
                    Callback = new Dictionary<string, Action<MessageBody>>() { 
                    {"zzl",(o)=>{
                        Console.WriteLine(o.ToString());
                        Thread.Sleep(1000);
                    }},
                    {"zhz",(o)=>{
                        Console.WriteLine(o.ToString());
                        Thread.Sleep(2000);
                    }}
                    }
                };
                var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });
                consumerClient.Start();
                #endregion

    客户端消费的测试结果

    好了,到这里我们的LindMQ里数据存储结构的内容就讲完了,主要使用了redis里的set,sortedSet,hash等数据结构,在设计过程中,使用了分片(Sharing)的概念,当然也是借鉴了mongodb和redis集群的设计理念,同时借鉴了方雪华老兄的EQueue设计理念,在这里和他们说一声:谢谢!

    感谢各位对Lind的支持!

    回到目录

  • 相关阅读:
    冲刺 09
    冲刺08
    个人作业-买书
    冲刺07
    冲刺 06
    软件工程 寻找小水王
    冲刺04
    冲刺 03
    冲刺 02
    冲刺3
  • 原文地址:https://www.cnblogs.com/lori/p/6125690.html
Copyright © 2011-2022 走看看