zoukankan      html  css  js  c++  java
  • 消息推送服务

    APM.Server 消息推送服务的实现

     

    消息推送服务

      服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。
      APM.Server基于简单

    1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>();

    1 private static ConcurrentQueue<Message> _messageQueue = new ConcurrentQueue<Message>();

    实现。

      部分代码如下:

    1 /// <summary>
     2         /// 消息转发
     3         /// </summary>
     4         private void ForwardMsg()
     5         {
     6             try
     7             {
     8                 var msg = MessageQueue.Dequeue();
     9                 if (msg != null)
    10                 {
    11                     switch (msg.Type)
    12                     {
    13                         case (byte)MessageType.Sub:
    14                             if (!msg.IsMuti)
    15                             {
    16                                 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
    17                                     SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
    18                             }
    19                             if (!SessionDic.Exists(msg.SessionID, msg.Sender))
    20                                 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
    21                             break;
    22                         case (byte)MessageType.Unsub:
    23                             if (!msg.IsMuti)
    24                             {
    25                                 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
    26                                     SessionDic.Del(msg.SessionID, msg.SessionID);
    27                             }
    28                             if (SessionDic.Exists(msg.SessionID, msg.Sender))
    29                                 SessionDic.Del(msg.Sender, msg.SessionID);
    30                             break;
    31                         default:
    32                             var session = SessionDic.Get(msg.SessionID);
    33                             if (session != null)
    34                             {
    35                                 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
    36                                 if (remotes != null && remotes.Count > 0)
    37                                 {
    38                                     Parallel.For(0, remotes.Count, i =>
    39                                     {
    40                                         this._server.SendMsg(remotes[i], Message.Serialize(msg));
    41                                     });
    42                                 }
    43                             }
    44                             this.OnMessage?.Invoke(msg);
    45                             break;
    46                     }
    47 
    48                 }
    49             }
    50             catch { }
    51         }
    复制代码
     1 /// <summary>
     2         /// 消息转发
     3         /// </summary>
     4         private void ForwardMsg()
     5         {
     6             try
     7             {
     8                 var msg = MessageQueue.Dequeue();
     9                 if (msg != null)
    10                 {
    11                     switch (msg.Type)
    12                     {
    13                         case (byte)MessageType.Sub:
    14                             if (!msg.IsMuti)
    15                             {
    16                                 if (!SessionDic.Exists(msg.SessionID, msg.SessionID))
    17                                     SessionDic.Set(this._server, msg.SessionID, msg.SessionID);
    18                             }
    19                             if (!SessionDic.Exists(msg.SessionID, msg.Sender))
    20                                 SessionDic.Set(this._server, msg.Sender, msg.SessionID);
    21                             break;
    22                         case (byte)MessageType.Unsub:
    23                             if (!msg.IsMuti)
    24                             {
    25                                 if (SessionDic.Exists(msg.SessionID, msg.SessionID))
    26                                     SessionDic.Del(msg.SessionID, msg.SessionID);
    27                             }
    28                             if (SessionDic.Exists(msg.SessionID, msg.Sender))
    29                                 SessionDic.Del(msg.Sender, msg.SessionID);
    30                             break;
    31                         default:
    32                             var session = SessionDic.Get(msg.SessionID);
    33                             if (session != null)
    34                             {
    35                                 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList();
    36                                 if (remotes != null && remotes.Count > 0)
    37                                 {
    38                                     Parallel.For(0, remotes.Count, i =>
    39                                     {
    40                                         this._server.SendMsg(remotes[i], Message.Serialize(msg));
    41                                     });
    42                                 }
    43                             }
    44                             this.OnMessage?.Invoke(msg);
    45                             break;
    46                     }
    47 
    48                 }
    49             }
    50             catch { }
    51         }
    复制代码

    异步tcp通信——APM.Core 服务端概述

    异步tcp通信——APM.Core 解包

    异步tcp通信——APM.Server 消息推送服务的实现

    异步tcp通信——APM.ConsoleDemo


    转载请标明本文来源:http://www.cnblogs.com/yswenli/
    更多内容欢迎star作者的github:https://github.com/yswenli/APM

  • 相关阅读:
    面向接口程序设计思想实践
    Block Chain Learning Notes
    ECMAScript 6.0
    Etcd Learning Notes
    Travis CI Build Continuous Integration
    Markdown Learning Notes
    SPRING MICROSERVICES IN ACTION
    Java Interview Questions Summary
    Node.js Learning Notes
    Apache Thrift Learning Notes
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/8313458.html
Copyright © 2011-2022 走看看