zoukankan      html  css  js  c++  java
  • 消息推送服务

    APM.Server 消息推送服务的实现

     

    消息推送服务

      服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。
      APM.Server基于简单

    1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>();

    1 private static ConcurrentQueue<Message> _messageQueue = new ConcurrentQueue<Message>();

    实现。

      部分代码如下:

    1 /// <summary>
     2         /// 消息转发
     3         /// </summary>
     4         private void ForwardMsg()
     5         {
     6             try
     7             {
     8                 var msg = MessageQueue.Dequeue();
     9                 if (msg != null)
    10                 {
    11                     switch (msg.Type)
    12                     {
    13                         case (byte)MessageType.Sub:
    14                             if (!msg.IsMuti)
    15                             {
    16                                 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
    17                                     SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
    18                             }
    19                             if (!SessionDic.Exists(msg.SessionID, msg.Sender))
    20                                 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
    21                             break;
    22                         case (byte)MessageType.Unsub:
    23                             if (!msg.IsMuti)
    24                             {
    25                                 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
    26                                     SessionDic.Del(msg.SessionID, msg.SessionID);
    27                             }
    28                             if (SessionDic.Exists(msg.SessionID, msg.Sender))
    29                                 SessionDic.Del(msg.Sender, msg.SessionID);
    30                             break;
    31                         default:
    32                             var session = SessionDic.Get(msg.SessionID);
    33                             if (session != null)
    34                             {
    35                                 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
    36                                 if (remotes != null && remotes.Count > 0)
    37                                 {
    38                                     Parallel.For(0, remotes.Count, i =>
    39                                     {
    40                                         this._server.SendMsg(remotes[i], Message.Serialize(msg));
    41                                     });
    42                                 }
    43                             }
    44                             this.OnMessage?.Invoke(msg);
    45                             break;
    46                     }
    47 
    48                 }
    49             }
    50             catch { }
    51         }
    复制代码
     1 /// <summary>
     2         /// 消息转发
     3         /// </summary>
     4         private void ForwardMsg()
     5         {
     6             try
     7             {
     8                 var msg = MessageQueue.Dequeue();
     9                 if (msg != null)
    10                 {
    11                     switch (msg.Type)
    12                     {
    13                         case (byte)MessageType.Sub:
    14                             if (!msg.IsMuti)
    15                             {
    16                                 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
    17                                     SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
    18                             }
    19                             if (!SessionDic.Exists(msg.SessionID, msg.Sender))
    20                                 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
    21                             break;
    22                         case (byte)MessageType.Unsub:
    23                             if (!msg.IsMuti)
    24                             {
    25                                 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
    26                                     SessionDic.Del(msg.SessionID, msg.SessionID);
    27                             }
    28                             if (SessionDic.Exists(msg.SessionID, msg.Sender))
    29                                 SessionDic.Del(msg.Sender, msg.SessionID);
    30                             break;
    31                         default:
    32                             var session = SessionDic.Get(msg.SessionID);
    33                             if (session != null)
    34                             {
    35                                 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
    36                                 if (remotes != null && remotes.Count > 0)
    37                                 {
    38                                     Parallel.For(0, remotes.Count, i =>
    39                                     {
    40                                         this._server.SendMsg(remotes[i], Message.Serialize(msg));
    41                                     });
    42                                 }
    43                             }
    44                             this.OnMessage?.Invoke(msg);
    45                             break;
    46                     }
    47 
    48                 }
    49             }
    50             catch { }
    51         }
    复制代码

    异步tcp通信——APM.Core 服务端概述

    异步tcp通信——APM.Core 解包

    异步tcp通信——APM.Server 消息推送服务的实现

    异步tcp通信——APM.ConsoleDemo


    转载请标明本文来源:http://www.cnblogs.com/yswenli/
    更多内容欢迎star作者的github:https://github.com/yswenli/APM

  • 相关阅读:
    Generate Parentheses (Java)
    leetcode15
    MD5
    leetcode409
    Vue第一个简单的例子
    SpringBoot和Ajax通信
    如何使用安装光盘为本机创建yum repository
    Less known Solaris features: svccfg editprop (ZT)
    Rename Oracle Managed File (OMF) datafiles in ASM(ZT)
    跨数据文件删除flashback database
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/8313458.html
Copyright © 2011-2022 走看看