zoukankan      html  css  js  c++  java
  • 并发系列64章(TPL 数据流)第七章

    前言

    什么是TPL?全称:transmission control protocol

    传输层对应于OSI七层参考模型的传输层,它提供两种端到端的通信服务。

    然后思维方式回到为什么有这个TPL 数据流上。

    TPL 数据流库向具有高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序的并行化和消息传递提供了基础。 它还能显式控制缓存数据的方式以及在系统中移动的方式。 
    
    为了更好地了解数据流编程模型,请考虑一个以异步方式从磁盘加载图像并创建复合图像的应用程序。 
    
    传统编程模型通常需要使用回调和同步对象(例如锁)来协调任务和访问共享数据。 
    
    通过使用数据流编程模型,您可以从磁盘读取时创建处理图像的数据流对象。 
    
    在数据流模型下,您可以声明当数据可用时的处理方式,以及数据之间的所有依赖项。 由于运行时管理数据之间的依赖项,因此通常可以避免这种要求来同步访问共享数据。
    
    此外,因为运行时计划基于数据的异步到达,所以数据流可以通过有效管理基础线程提高响应能力和吞吐量。
    

    分析一下,这段话。

    TPL 数据流库向具有高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序的并行化和消息传递提供了基础。

    解决一个问题就是:

    高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序。

    如何解决的:

    应用程序的并行化和消息传递提供了基础。通过并行解决的。

    例子:

    异步方式从磁盘加载图像并创建复合图像的应用程序

    遇到的问题:

    协调任务和访问共享数据 需要 回调和同步。

    就是说共享数据的时候,需要同步。

    总结问题:共享数据代价大。

    如果解决的:

    由于运行时管理数据之间的依赖项,因此通常可以避免这种要求来同步访问共享数据。

    总结:解决了依赖,那么不需要同步了。

    综上所述:TPL 数据流库的作用在于解决数据之间的依赖,避免同步访问共享数据。

    正文

    链接数据流块

    var multiplyBlock = new TransformBlock<int, int>(item=>item*2);
    var subtractBlock = new TransformBlock<int, int>(item=> { Console.WriteLine(item);  return item - 2; });
    multiplyBlock.LinkTo(subtractBlock);
    multiplyBlock.Post(10);
    Console.ReadKey();
    

    打印出来就是20了。

    传递出错信息

    需要处理数据流网格中发生的错误

    如果数据流块的委托抛出异常,这个块就是故障块。一但数据流进入了故障状态,就会删除所有数据(停止接收新的数据)。

    什么意思呢?

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		return item * 2;
    	}
    	);
    	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item); return item - 2; });
    	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	try
    	{
    		multiplyBlock.Post(10);
    		multiplyBlock.Post(1);
    		multiplyBlock.Post(20);
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    结果是:


    有没有发现multiplyBlock.Post(20);,没有运行?

    因为一但一个有错误,那么就会终止,并销毁数据。

    这里和上面不同的是,new DataflowLinkOptions { PropagateCompletion = true}。

    多个这个东西,那么这个有啥用呢?

    因为我们链接块的时候,这个库不会帮助我们传递块运行的状态,如果不传递的话,下一个块是不晓得上一个块到底啥情况,这样不利于我们捕获异常。

    而这种传递做法,我们只要在最后的处理模块,统一处理错误就可以。

    断开链接

    这个我从来就没有遇到过。是这样子的,适用一种这样的场景。

    比如说有一个数据块需要动态替换,需要断开现有的模块然后接上新的数据块。

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		Console.WriteLine("item:" + item);
    		return item * 2;
    	}
    	);
    	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item-2); return item - 2; });
    	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; });
    	var link=multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	try
    	{
    		for (int i = 0; i < 20; i++)
    		{
    			multiplyBlock.Post(2);
    			if (i==10)
    			{
    				await Task.Delay(1000);
    				link.Dispose();
    				multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
    			}
    		}
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    结果是:

    值得注意的是,我这里了一个:

    await Task.Delay(1000);
    

    这是模拟动态运行的时候,因为当我post结束的时候,数据块链接还没开始传递。

    注:

    除非保证链接是空闲的情况下,否则在断开数据块的链接时候会出现竞争。
    
    竞争的是先断开还是先传递。
    
    但是这种竞争是安全的,他会保证要不断开,要不传递带下一个数据块。
    

    限制流量

    前面我们都是线性链接,就是一条路走到黑。但是呢,有时候出现分叉的时候,那么该如何均衡呢?

    之所以考虑均衡,是因为比如传递到下一个数据块的时候,是会有缓存的。如果有条分叉,一条分叉无限去缓存,那另外一条可能吃不上饭了。

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		Console.WriteLine("item:" + item);
    		return item * 2;
    	}
    	);
    	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item-2); return item - 2; });
    	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; });
    	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
    	try
    	{
    		for (int i = 0; i < 100; i++)
    		{
    			multiplyBlock.Post(2);
    		}
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    这种就属于没吃上饭的情况。

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		Console.WriteLine("item:" + item);
    		return item * 2;
    	}
    	);
    	var options = new DataflowBlockOptions {BoundedCapacity=1 };
    	var subtractBlock = new TransformBlock<int, int>(item => {
    		return item - 2;
    	}, options);
    	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; }, options);
    	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
    	try
    	{
    		for (int i = 0; i < 100; i++)
    		{
    			multiplyBlock.Post(2);
    		}
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    限制缓存为1,那么这时候我们就会相互切换。

    下一章

    整理:

    1.数据流块的并行处理

    2.创建自定义数据流块

    参考

    https://www.cnblogs.com/yswenli/p/8042594.html

  • 相关阅读:
    webpack打包注意事项
    打印内存, 打印16进制
    c++ 字符集转换
    RegSvr32 加载失败,找不到指定的模块
    错误码设计
    mfc 移动绘制的图形
    获取、设置光标
    c++ 函数中定义函数
    python linux 自动补全 tab.py
    3.4.5节 完整神经网络样例程序
  • 原文地址:https://www.cnblogs.com/aoximin/p/12711637.html
Copyright © 2011-2022 走看看