zoukankan      html  css  js  c++  java
  • 分布式EventBus的Socket实现

    分布式EventBus的Socket实现 - 发布订阅

    在这篇文章中,EventBus实现 - 发布订阅 - XML加载 所适用的范围只是本机的事件传播,要是牵涉到多台服务器之间的事件传播就不行了,解决办法有用msmq解决的,也有用redis的发布订阅解决的,这次用C# socket来实现,能实现立刻推送事件到所以server上。

    这次的子系统适用的场景如下:

    主要分2个部分:各个server使用的Event Bus Broker以及Event Bus Server。

    Broker与Server之间的通信协议就3个:ME、Subscribe、Publish。分别代表:我的名字是、我要订阅的事件是、我触发事件。

    Event Bus Server是基于SuperSocket开源组件写的,非常方便。实现了3个对应的命令类(建议大家先看看SuperSocket的文档),如下: 

    复制代码
    public class ME : CommandBase<EventDispatcherBusSession, StringRequestInfo>
        {
            public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
            {
                session.AssociatedServerIdentity = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
                Console.WriteLine(string.Format("[{0}]: connected.", session.AssociatedServerIdentity));
            }
        }
    
    public class Publish : CommandBase<EventDispatcherBusSession, StringRequestInfo>
        {
            public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
            {
                string evtClassPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
                string evtXml = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[1]);
    
                foreach(EventDispatcherBusSession s in session.AppServer.GetAllSessions())
                {
                    if (s.AssociatedServerIdentity == session.AssociatedServerIdentity)
                        continue;
    
                    if (s.SubscribedEventClasses.Contains(evtClassPath))
                    {
                        s.Send("Publish " + requestInfo.Body);//forward only
                        Console.WriteLine(string.Format("Forwarding publish command from {1} to server: [{0}]", s.AssociatedServerIdentity, session.AssociatedServerIdentity));
                    }
                }
            }
        }
    
    public class Subscribe : CommandBase<EventDispatcherBusSession, StringRequestInfo>
        {
            public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
            {
                string classPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
                if (session.SubscribedEventClasses.Contains(classPath))
                    return;
    
                session.SubscribedEventClasses.Add(classPath);
    
                string msg = "";
                msg+=string.Format("[{0}], now subscribed these events:
    ", session.AssociatedServerIdentity);
                foreach(string evtClass in session.SubscribedEventClasses)
                    msg += string.Format("              {0}
    ", evtClass);
                Console.WriteLine(msg);
            }
        }
    复制代码

     启动server代码:

    复制代码
    static void Main(string[] args)
            {
                var appServer = new EventDispatcherBusServer();
    
                if (!appServer.Setup(2012))
                {
                    Console.WriteLine("Failed to setup!");
                    Console.ReadKey();
                    return;
                }
    
                Console.WriteLine();
    
                if (!appServer.Start())
                {
                    Console.WriteLine("Failed to start!");
                    Console.ReadKey();
                    return;
                }
    
                Console.WriteLine("The server started successfully, press key 'q' to stop it!");
    
                Console.ReadKey();
    
                appServer.Stop();
            }
    复制代码

     server端没多少好说的,都是很简单的,下面来看看客户端:

    复制代码
    static void Main(string[] args)
            {
                EventBusClientBroker busBroker = new EventBusClientBroker("127.0.0.1", 2012, "App server 2", "AppEvents.dll");
                busBroker.NewEventReceived += new EventReceived(busBroker_NewEventReceived);//新event被推送过来的事件
                busBroker.Connect();
    
                busBroker.Subscribe<NewUserRegisteredEvent>();//订阅Event
    
                //busBroker.Close();
            }
    
            static void busBroker_NewEventReceived(string evtClass, object evt)
            {
                //这里会将event对象发过来
                //evtClass是evt的class全路径
                if (evt is NewUserRegisteredEvent)
                    Console.WriteLine(((NewUserRegisteredEvent)evt).UserName);
                else if (evt is UserProfileUpdatedEvent)
                    Console.WriteLine(((UserProfileUpdatedEvent)evt).UserID);
            }
    复制代码

    如果要触发一个事件,则:

                       NewUserRegisteredEvent evt = new NewUserRegisteredEvent();
                        evt.RegisterDate = DateTime.Now;
                        evt.UserName = "aaron";
                        busBroker.Publish<NewUserRegisteredEvent>(evt);

    我们来看看效果图:

    搞定,代码下载。 

    自省推动进步,视野决定未来。
    心怀远大理想。
    为了家庭幸福而努力。
    用A2D科技,服务社会。
     
  • 相关阅读:
    3.Linux系统信息
    2.LINUX常用命令
    1.CMD命令
    8.变量内存CPU原理
    17.I/O系统访问方式和类型
    16.磁盘调度
    15.大容量存储结构
    cluvfy comp命令用法
    禁用DRM
    Oracle数据库升级前必要的准备工作
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3229303.html
Copyright © 2011-2022 走看看