zoukankan      html  css  js  c++  java
  • 为Disruptor 写的一个简单实用的.Net扩展

    disruptor   用户封装自己的消费者,把消费者注入到消费者容器,消费者容器实现自动创建 缓存队列,生产者;

     文中用到的 disruptor   C#移植源代码

     https://github.com/bingyang001/disruptor-net-3.3.0-alpha

     作者博客 http://www.cnblogs.com/liguo/p/3296166.html

     消费者容器:

    /// <summary>
        /// 消费者管理器
        /// </summary>
        /// <typeparam name="TProduct">产品</typeparam>
        public class Workers<TProduct> where TProduct : Producer<TProduct>, new()
        {
            private readonly WorkerPool<TProduct> _workerPool;
    
            public Workers(List<IWorkHandler<TProduct>> handers, IWaitStrategy waitStrategy = null, int bufferSize = 1024*64)
            {
                if (handers == null || handers.Count == 0)
                    throw new ArgumentNullException("消费事件处理数组为空!");
                if (handers.Count == 1)
                    _ringBuffer = RingBuffer<TProduct>.CreateSingleProducer(() => new TProduct(), bufferSize,
                        waitStrategy ?? new YieldingWaitStrategy());
                else
                {
                    _ringBuffer = RingBuffer<TProduct>.CreateMultiProducer(() => new TProduct(), bufferSize,
                        waitStrategy ?? new YieldingWaitStrategy());
                }
                _workerPool = new WorkerPool<TProduct>(_ringBuffer
                    , _ringBuffer.NewBarrier()
                    , new FatalExceptionHandler()
                    , handers.ToArray());
                _ringBuffer.AddGatingSequences(_workerPool.getWorkerSequences());
            }
    
            public void Start()
            {
                _workerPool.start(TaskScheduler.Default);
            }
    
            public Producer<TProduct> CreateOneProducer()
            {
                return new Producer<TProduct>(this._ringBuffer);
            } 
            public void DrainAndHalt()
            {
                _workerPool.drainAndHalt();
            }
    
            private readonly RingBuffer<TProduct> _ringBuffer;
        }

      生产者(产品): 所有的产品都应该继承自生产者

    /// <summary>
        /// 生产者对象
        /// </summary>
        /// <typeparam name="TProduct">产品类型</typeparam>
        public class Producer<TProduct> where TProduct:Producer<TProduct>
        {
    
            long _sequence;
            private RingBuffer<TProduct> _ringBuffer;
            public Producer()
            {
                
            }
            public Producer(RingBuffer<TProduct> ringBuffer )
            {
                _ringBuffer = ringBuffer;
            }
            /// <summary>
            /// 获取可修改的产品
            /// </summary>
            /// <returns></returns>
            public Producer<TProduct> Enqueue()
            {
                long sequence = _ringBuffer.Next();
                Producer<TProduct> producer = _ringBuffer[sequence];
                producer._sequence = sequence;
                if (producer._ringBuffer == null)
                    producer._ringBuffer = _ringBuffer;
                return producer;
            }
            /// <summary>
            /// 提交产品修改
            /// </summary>
            public void Commit()
            {
                _ringBuffer.Publish(_sequence);
            }
        }

      --------------------------------------------------------

      以上就实现了,测试代码

      先创建 产品对象:

      

    /// <summary>
            /// 产品/继承生产者
            /// </summary>
            public class Product : Producer<Product>
            {
                //产品包含的属下随便定义,无要求,只需要继承自生产者就行了
                public long Value { get; set; }
                public string Guid { get; set; }
            }

    创建消费者对象

     /// <summary>
            /// 消费处理对象
            /// </summary>
            public class WorkHandler : IWorkHandler<Product>
            {
             
                public void OnEvent(Product @event)
                {
                    //Test是测试对象数据准确(数据重复或者丢失数据)
                    Test.UpdateCacheByOut(@event.Guid);
                    //收到产品,在这里写处理代码
    
                }
    
            }

      测试代码:

      可创建1个或者多个的生产者对象,消费者处理对象;不一定太多,多不一定快; 建议生产者创建一个就行了,多线程操作一个生产者对象; 消费者对象可以根据实际情况创建多少个;

      

               //创建2个消费者,2个生产者, 2个消费者表示,框架会有2个线程去处理消费产品 
    Workers<Product> workers = new Workers<Product>( new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()}); Producer<Product> producerWorkers = workers.CreateOneProducer(); Producer<Product> producerWorkers1 = workers.CreateOneProducer();
    //开始消费
      workers.Start();

     产品生产:

     可以在任何引用生产者的地方,把产品放进队列中. 这里 放入队列的方法和平时不太一样.  这里采用的是,从队列里面拿去一个位置,然后把产品放进去; 具体的做法 ,找生产者,获取一个产品对象,然后修改产品属性,最后提交修改.

      var obj = producer.Enqueue();
               //修改产品属性
                    obj.Commit();

      以上是关键代码:

    完整的测试类 : 包含测试数据正确性,  性能,在不校验正确性的时候,每秒ops 1千万左右. 

     class Test
        {
            public static long PrePkgInCount = 0;
            public static long PrePkgOutCount = 0;
            public static long PkgInCount = 0;
            public static long PkgOutCount = 0;
            static ConcurrentDictionary<string, string> InCache = new ConcurrentDictionary<string, string>();
            static ConcurrentDictionary<string, string> OutCache = new ConcurrentDictionary<string, string>();
            private static long Seconds;
    
            static void Main(string[] args)
            {
                Workers<Product> workers = new Workers<Product>(
                new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});
    
                Producer<Product> producerWorkers = workers.CreateOneProducer();
                Producer<Product> producerWorkers1 = workers.CreateOneProducer();
    
                workers.Start();
                Task.Run(delegate
                {
                    while (true)
                    {
                        Thread.Sleep(1000);
                        Seconds++;
                        long intemp = PkgInCount;
                        long outemp = PkgOutCount;
                        Console.WriteLine(
                            $"In ops={intemp - PrePkgInCount},out ops={outemp - PrePkgOutCount},inCacheCount={InCache.Count},OutCacheCount={OutCache.Count},RunningTime={Seconds}");
                        PrePkgInCount = intemp;
                        PrePkgOutCount = outemp;
                    }
    
                });
                Task.Run(delegate { Run(producerWorkers); });
                Task.Run(delegate { Run(producerWorkers); });
                Task.Run(delegate { Run(producerWorkers1); });
                Console.Read();
    
            }
    
            public static void Run(Producer<Product> producer)
            {
                for (int i = 0; i < int.MaxValue; i++)
                {
    
                    var obj = producer.Enqueue();
                    CheckRelease(obj as Product);
                    obj.Commit();
                }
            }
    
            public static  void CheckRelease(Product publisher)
            {
                Interlocked.Increment(ref PkgInCount);
                return; //不检查正确性
                publisher.Guid = Guid.NewGuid().ToString();
                InCache.TryAdd(publisher.Guid, string.Empty);
              
            }
    
            public static void UpdateCacheByOut(string guid)
            {
                Interlocked.Increment(ref Test.PkgOutCount);
                if (guid != null)
                    if (InCache.ContainsKey(guid))
                    {
                        string str;
                        InCache.TryRemove(guid, out str);
                    }
                    else
                    {
                        OutCache.TryAdd(guid, string.Empty);
                    }
    
            }
            /// <summary>
            /// 产品/继承生产者
            /// </summary>
            public class Product : Producer<Product>
            {
                //产品包含的属下随便定义,无要求,只需要继承自生产者就行了
                public long Value { get; set; }
                public string Guid { get; set; }
            }
    
            /// <summary>
            /// 消费处理对象
            /// </summary>
            public class WorkHandler : IWorkHandler<Product>
            {
             
                public void OnEvent(Product @event)
                {
    
                    Test.UpdateCacheByOut(@event.Guid);
                    //收到产品,在这里写处理代码
    
                }
    
            }
        }
  • 相关阅读:
    C# 关于委托和事件的妙文:通过一个例子详细介绍委托和事件的作用;Observer模式简介
    Path.Combine (合并两个路径字符串)方法的一些使用细节
    taskkill /f /im的应用
    powersheel远程连接方法操作
    Centos 定时任务发送smtp邮件
    Centos 发送smtp邮件
    在 Windows 上安装Rabbit MQ 指南
    Quartz.NET总结(五)基于Quartz.net 的开源任务管理平台
    Quartz.NET总结(四)Quartz 远程调度
    Quartz.NET总结(三)Quartz 配置
  • 原文地址:https://www.cnblogs.com/hda37210/p/5242185.html
Copyright © 2011-2022 走看看