zoukankan      html  css  js  c++  java
  • TPL 之六 在数据流块中指定任务计划程序

    (一)DataflowBlockOptions属性      代码地址-BroadcastBlock2
    在一些Block的构造函数中,我们常常可以看见需要你输入DataflowBlockOptions 类型或者它的两个派生类型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。
    DataflowBlockOptions有五个属性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。
    BoundedCapacity
    这个属性用来限制一个Block中最多可以缓存数据项的数量,大多数Block都支持这个属性,这个值默认是DataflowBlockOptions.Unbounded = -1,也就是说没有限制。开发人员可以制定这个属性设置数量的上限。那后面的新数据将会延迟。比如说用一个BufferBlock连接一个ActionBlock,如果在ActionBlock上面设置了上限,ActionBlock处理的操作速度比较慢,留在ActionBlock中的数据到达了上限,那么余下的数据将留在BufferBlock中,直到ActionBlock中的数据量低于上限。这种情况常常会发生在生产者生产的速度大于消费者速度的时候,导致的问题是内存越来越大,数据操作越来越延迟。我们可以通过一个BufferBlock连接多个ActionBlock来解决这样的问题,也就是负载均衡。一个ActionBlock满了,就会放到另外一个ActionBlock中去了。
    CancellationToken
    TPL中常用的类型。CancellationToken来取消操作,在Block的构造函数中放入CancellationToken,Block将在它的整个生命周期中全程监控这个对象,只要在这个Block结束运行(调用Complete方法)前,用CancellationToken发送取消请求,该Block将会停止运行,如果Block中还有没有处理的数据,那么将不会再被处理。
    MaxMessagesPerTask
    用MaxMessagesPerTask控制公平性,每一个Block内部都是异步处理,都是使用TPL的Task。TDF的设计是在保证性能的情况下,尽量使用最少的任务对象来完成数据的操作,这样效率会高一些,一个任务执行完成一个数据以后,任务对象并不会销毁,而是会保留着去处理下一个数据,直到没有数据处理的时候,Block才会回收掉这个任务对象。但是如果数据来自于多个Source,公平性就很难保证。从其他Source来的数据必须要等到早前的那些Source的数据都处理完了才能被处理。这时我们就可以通过MaxMessagesPerTask来控制。这个属性的默认值还是DataflowBlockOptions.Unbounded=-1,表示没有上限。假如这个数值被设置为1的话,那么单个任务只会处理一个数据。这样就会带来极致的公平性,但是将带来更多的任务对象消耗。
    NameFormat
    用NameFormat来定义Block名称MSDN上说属性NameFormat用来获取或设置查询块的名称时要使用的格式字符串。Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。所以当我们输入”{0}”的时候,名字就是block.GetType ().Name,如果我们数据的是”{1}”,那么名字就是block.Completion.Id。如果是“{2}”,那么就会抛出异常。
    TaskScheduler
    用TaskScheduler来调度Block行为,TaskScheduler是非常重要的属性。同样这个类型来源于TPL。每个Block里面都使用TaskScheduler来调度行为,无论是源Block和目标Block之间的数据传递,还是用户自定义的执行数据方法委托,都是使用的TaskScheduler。如果没有特别设置的话,将使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)来调度。我们可以使用其他的一些继承于TaskScheduler的类型来设置这个调度器,一旦设置了以后,Block中的所有行为都会使用这个调度器来执行。.Net Framework 4中内建了两个Scheduler,一个是默认的ThreadPoolTaskScheduler,另一个是用于UI线程切换的SynchronizationContextTaskScheduler。如果你使用的Block设计到UI的话,那可以使用后者,这样在UI线程切换上面将更加方便。
    MaxDegreeOfParallelism
    用MaxDegreeOfParallelism来并行处理,通常,Block中处理数据都是单线程的,一次只能处理一个数据,比如说ActionBlock中自定义的代理。使用MaxDegreeOfParallelism可以让你并行处理这些数据。属性的定义是最大的并行处理个数。如果定义成-1的话,那就是没有限制。用户需要在实际情况中选择这个值的大小,并不是越大越好。如果是平行处理的话,还应该考虑是否有共享资源。
    .Net Framework 4.5 中,还有一个类型被加入到System.Threading.Tasks名称空间下:ConcurrentExclusiveSchedulerPair。这个类是两个TaskScheduler的组合。它提供两个TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;我们可以把这两个TaskScheduler构造进要使用的Block中。他们保证了在没有排他任务的时候(使用ExclusiveScheduler的任务),其他任务(使用ConcurrentScheduler)可以同步进行,当有排他任务在运行的时候,其他任务都不能运行。其实它里面就是一个读写锁。这在多个Block操作共享资源的问题上是一个很方便的解决方案。
    image

    BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
    public Form1()
    {
        InitializeComponent();
        /*创建一个toggleCheckBoxBlock*/
        var toggleCheckBoxBlock = new ActionBlock<CheckBox>(checkBox =>
        {
            checkBox.Checked = !checkBox.Checked;
            Console.WriteLine($"chk名称:{checkBox.Name}");
        }, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
        });
        var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
        /*创建三个readerActions,向toggleCheckBoxBlock投递Post数据*/
        var readerActions = from checkBox in new CheckBox[] { checkBox1, checkBox2, checkBox3 }
                            select new ActionBlock<int>(milliseconds =>
                            {
                                toggleCheckBoxBlock.Post(checkBox);
                                Thread.Sleep(milliseconds);
                                toggleCheckBoxBlock.Post(checkBox);
                            },
                            new ExecutionDataflowBlockOptions
                            {
                                TaskScheduler = taskSchedulerPair.ConcurrentScheduler
                            });
        /*创建一个writerAction,向toggleCheckBoxBlock投递Post数据*/
        var writerAction = new ActionBlock<int>(milliseconds =>
        {
            toggleCheckBoxBlock.Post(checkBox4);
            Thread.Sleep(milliseconds);
            toggleCheckBoxBlock.Post(checkBox4);
        }, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = taskSchedulerPair.ExclusiveScheduler
        });
        /*链接数据块,三个readerAction,一个writerAction*/
        foreach (var readerAction in readerActions)
        {
            broadcaster.LinkTo(readerAction);
        }
        broadcaster.LinkTo(writerAction);
        timer1.Start();
    }
    private void timer1_Tick(object sender, EventArgs e)
    {
        broadcaster.Post(1000);
        Console.WriteLine($"broadcaster长度");
    }
  • 相关阅读:
    WAF与IPS的区别总结
    web后门排查与高效分析web日志技巧
    如何做一名好的web安全工程师?
    从“黑掉Github”学Web安全开发
    DNS劫持
    万网上如何将IP和申请的域名绑定
    如何申请网站域名
    什么是域名?什么网站名?什么是URL?
    myeclipse svn 插件去除已经保存的密码方法
    SVN中检出 和 导出 的区别
  • 原文地址:https://www.cnblogs.com/lihuali/p/14536305.html
Copyright © 2011-2022 走看看