zoukankan      html  css  js  c++  java
  • Reactor模式的.net版本简单实现--DEMO

         近期在学习DotNetty,遇到不少的问题。由于dotnetty是次netty的.net版本的实现。导致在网上叙述dotnetty的原理,以及实现技巧方面的东西较少,这还是十分恼人的。在此建议学习和使用Dotnetty的和位小伙伴,真心阅读下netty的相关书籍,如《netty权威指南》。

         闲话少说,进入正题。netty的性能之所以能够达到如此的高度。主要由于他使用Reactor模式处理socket的请求,让服务器的使用率最大化,且尽量减少线程的开销。本文章主要简单介绍下Reactor模式。

    一、reactor概论

    reactor模式主要解决处理多个客户端请求的设计模式。

    首先从类图我们可以得知:

    Dispatcher:Handler管理器,以及调用度。他依赖于Demultiplexer类

    Demultiplexer:事件管理器,接受外部的事件,并提供给Dispatch使用。

    Handle:事件源,表示触发了那些事件

    EventHandler:各种类型的处理器,用于处理具体的业务,以及I/O的读写

    当然,也可以通过序列图看出首先需要初始化Dispatcher, Demultiplexer等相关类,以及注册具体的事件处理器。

    二、代码的具体实现

    类图如下[源码下载]:

    2.1 多路复用事件处理器的代码

    public class Demultiplexer
        {
            private ConcurrentQueue<Event> eventQuene = new ConcurrentQueue<Event>();
            private Object lockObj = new Object();
    
            public List<Event> Select()
            {
                return this.Select(0);
            }
            public List<Event> Select(int time)
            {
                if(time > 0)
                {
                    if (this.eventQuene.IsEmpty)
                    {
                        lock (lockObj)
                        {
                            if (this.eventQuene.IsEmpty)
                            {
                                System.Threading.Thread.Sleep(time);
                            }
                        }
                    }
                }
                List<Event> events = new List<Event>();
                while(this.eventQuene.Count > 0)
                {
                    Event tmp;
                    if(this.eventQuene.TryDequeue(out tmp))
                    {
                        events.Add(tmp);
                    }
                }
                return events;
            }
            public void AddEvent(Event argEvent)
            {
                this.eventQuene.Enqueue(argEvent);
            }
        }

    此类主要防止多线程的共同竞争,因为多路径复用选择器会被多个线程同时使用。所以使用的线程安全的Queue。

    2.2 Handler触发器和管理器

    /// <summary>
        /// Reactor的事件Handler触发器,提供事件Handler的注册,移除
        /// </summary>
        public class EventDispatch
        {
            private Demultiplexer demultiplexer;
            Dictionary<EventType, EventHandler> eventHandlerMap = new Dictionary<EventType, EventHandler>();
    
            public EventDispatch(Demultiplexer demultiplexer)
            {
                this.demultiplexer = demultiplexer;
            }
    
            public void RegisterHandler(EventType eventType, EventHandler eventHandler)
            {
                this.eventHandlerMap.Add(eventType, eventHandler);
            }
            public void RemoveHandler(EventType eventType)
            {
                this.eventHandlerMap.Remove(eventType);
            }
    
            public void HandleEvents()
            {
                this.Dispatch();
            }
            public void Dispatch()
            {
                string log = string.Format("thread id: {0} Dispatch", System.Threading.Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine(log);
                while (true)
                {
                    List<Event> events = this.demultiplexer.Select();
                    foreach(var itemEvent in events)
                    {
                        EventHandler eventHandler = this.eventHandlerMap[itemEvent.EventType];
                        eventHandler.Handle(itemEvent);
                    }
                    System.Threading.Thread.Sleep(1000);
                }
            }
        }

    主要职责,对Handler的注册、移除的管理,以及通过 多路复用选择器 选择相应的Handler进行处理。

    2.3 服务端的实现

    /// <summary>
        /// 开启接受请求的服务端
        /// </summary>
        public class AcceptRuner
        {
            private System.Collections.Concurrent.ConcurrentQueue<object> sourceQueue = new System.Collections.Concurrent.ConcurrentQueue<object>();
    
            private Demultiplexer demultiplexer;
    
            public AcceptRuner(Demultiplexer demultiplexer)
            {
                this.demultiplexer = demultiplexer;
            }
    
            public void adConnection(object source)
            {
                this.sourceQueue.Enqueue(source);
            }
    
            public void Run()
            {
                string log = string.Format("thread id: {0} AcceptRunner", System.Threading.Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine(log);
                while (true)
                {
                    object source;
                    if(this.sourceQueue.TryDequeue(out source))
                    {
                        Event acceptEvent = new Event()
                        {
                            EventType = EventType.Accept,
                            Source = source
                        };
                        this.demultiplexer.AddEvent(acceptEvent);
                    }
                }
            }
        }

    此类效仿netty的serverBoostrap的实现,将外部新的连接以事件对象的形式添加到 多路复用选择器上。

    2.4 其他类

    Event:事件基类

    EventHandler:事件处理器抽象基类。他派生了:AcceptEventHandler,ReadEventHandler。

    EventType:事件类型

    三、备注说明

    1. 代码没有贴完整。但下载包就是完整的。

    2. 这只我对Reactor模式的理解,如有偏颇之处,还望各拉指点一二。

  • 相关阅读:
    SC Alibaba20211102 Nacos 微服务框架结构 SerivceImplOrder <OpenFeign> SerivceImplMember 群
    SC Alibaba20211030 Nacos 微服务框架结构 SerivceImplOrder <OpenFeign> SerivceImplMember
    SC Alibaba20211028 Nacos 微服务框架结构 SerivceImplMember
    SC Alibaba20211022 Nacos 负载均衡 rest|feign(ribbon) ----@loadbalancer
    springboot启动多个实例
    SC Alibaba20211019 Nacos 手写LoadBalancer LoadBalancerClient
    SC Alibaba20211018 Nacos discoveryCient-restTemplate调用
    SC Alibaba20211017 Nacos 命令 测试服务、配置功能 postman
    mysql性能测试(SQL_CALC_FOUND_ROWS)
    GIS相关的一些常用操作(持续记录...)
  • 原文地址:https://www.cnblogs.com/cqhaibin/p/8324861.html
Copyright © 2011-2022 走看看