zoukankan      html  css  js  c++  java
  • 分布式EventBus的Socket实现

    分布式EventBus的Socket实现 - 发布订阅

    在这篇文章中,EventBus实现 - 发布订阅 - XML加载 所适用的范围只是本机的事件传播,要是牵涉到多台服务器之间的事件传播就不行了,解决办法有用msmq解决的,也有用redis的发布订阅解决的,这次用C# socket来实现,能实现立刻推送事件到所以server上。

    这次的子系统适用的场景如下:

    主要分2个部分:各个server使用的Event Bus Broker以及Event Bus Server。

    Broker与Server之间的通信协议就3个:ME、Subscribe、Publish。分别代表:我的名字是、我要订阅的事件是、我触发事件。

    Event Bus Server是基于SuperSocket开源组件写的,非常方便。实现了3个对应的命令类(建议大家先看看SuperSocket的文档),如下: 

    复制代码
    public class ME : CommandBase<EventDispatcherBusSession, StringRequestInfo>
        {
            public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
            {
                session.AssociatedServerIdentity = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
                Console.WriteLine(string.Format("[{0}]: connected.", session.AssociatedServerIdentity));
            }
        }
    
    public class Publish : CommandBase<EventDispatcherBusSession, StringRequestInfo>
        {
            public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
            {
                string evtClassPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
                string evtXml = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[1]);
    
                foreach(EventDispatcherBusSession s in session.AppServer.GetAllSessions())
                {
                    if (s.AssociatedServerIdentity == session.AssociatedServerIdentity)
                        continue;
    
                    if (s.SubscribedEventClasses.Contains(evtClassPath))
                    {
                        s.Send("Publish " + requestInfo.Body);//forward only
                        Console.WriteLine(string.Format("Forwarding publish command from {1} to server: [{0}]", s.AssociatedServerIdentity, session.AssociatedServerIdentity));
                    }
                }
            }
        }
    
    public class Subscribe : CommandBase<EventDispatcherBusSession, StringRequestInfo>
        {
            public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
            {
                string classPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
                if (session.SubscribedEventClasses.Contains(classPath))
                    return;
    
                session.SubscribedEventClasses.Add(classPath);
    
                string msg = "";
                msg+=string.Format("[{0}], now subscribed these events:
    ", session.AssociatedServerIdentity);
                foreach(string evtClass in session.SubscribedEventClasses)
                    msg += string.Format("              {0}
    ", evtClass);
                Console.WriteLine(msg);
            }
        }
    复制代码

     启动server代码:

    复制代码
    static void Main(string[] args)
            {
                var appServer = new EventDispatcherBusServer();
    
                if (!appServer.Setup(2012))
                {
                    Console.WriteLine("Failed to setup!");
                    Console.ReadKey();
                    return;
                }
    
                Console.WriteLine();
    
                if (!appServer.Start())
                {
                    Console.WriteLine("Failed to start!");
                    Console.ReadKey();
                    return;
                }
    
                Console.WriteLine("The server started successfully, press key 'q' to stop it!");
    
                Console.ReadKey();
    
                appServer.Stop();
            }
    复制代码

     server端没多少好说的,都是很简单的,下面来看看客户端:

    复制代码
    static void Main(string[] args)
            {
                EventBusClientBroker busBroker = new EventBusClientBroker("127.0.0.1", 2012, "App server 2", "AppEvents.dll");
                busBroker.NewEventReceived += new EventReceived(busBroker_NewEventReceived);//新event被推送过来的事件
                busBroker.Connect();
    
                busBroker.Subscribe<NewUserRegisteredEvent>();//订阅Event
    
                //busBroker.Close();
            }
    
            static void busBroker_NewEventReceived(string evtClass, object evt)
            {
                //这里会将event对象发过来
                //evtClass是evt的class全路径
                if (evt is NewUserRegisteredEvent)
                    Console.WriteLine(((NewUserRegisteredEvent)evt).UserName);
                else if (evt is UserProfileUpdatedEvent)
                    Console.WriteLine(((UserProfileUpdatedEvent)evt).UserID);
            }
    复制代码

    如果要触发一个事件,则:

                       NewUserRegisteredEvent evt = new NewUserRegisteredEvent();
                        evt.RegisterDate = DateTime.Now;
                        evt.UserName = "aaron";
                        busBroker.Publish<NewUserRegisteredEvent>(evt);

    我们来看看效果图:

    搞定,代码下载。 

    自省推动进步,视野决定未来。
    心怀远大理想。
    为了家庭幸福而努力。
    用A2D科技,服务社会。
     
  • 相关阅读:
    zoj 3627#模拟#枚举
    Codeforces 432D Prefixes and Suffixes kmp
    hdu 4778 Gems Fight! 状压dp
    CodeForces 379D 暴力 枚举
    HDU 4022 stl multiset
    手动转一下田神的2048
    【ZOJ】3785 What day is that day? ——KMP 暴力打表找规律
    poj 3254 状压dp
    C++中运算符的优先级
    内存中的数据对齐
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3229303.html
Copyright © 2011-2022 走看看