zoukankan      html  css  js  c++  java
  • 并发系列64章(TPL 数据流)第七章

    前言

    什么是TPL?全称:transmission control protocol

    传输层对应于OSI七层参考模型的传输层,它提供两种端到端的通信服务。

    然后思维方式回到为什么有这个TPL 数据流上。

    TPL 数据流库向具有高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序的并行化和消息传递提供了基础。 它还能显式控制缓存数据的方式以及在系统中移动的方式。 
    
    为了更好地了解数据流编程模型,请考虑一个以异步方式从磁盘加载图像并创建复合图像的应用程序。 
    
    传统编程模型通常需要使用回调和同步对象(例如锁)来协调任务和访问共享数据。 
    
    通过使用数据流编程模型,您可以从磁盘读取时创建处理图像的数据流对象。 
    
    在数据流模型下,您可以声明当数据可用时的处理方式,以及数据之间的所有依赖项。 由于运行时管理数据之间的依赖项,因此通常可以避免这种要求来同步访问共享数据。
    
    此外,因为运行时计划基于数据的异步到达,所以数据流可以通过有效管理基础线程提高响应能力和吞吐量。
    

    分析一下,这段话。

    TPL 数据流库向具有高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序的并行化和消息传递提供了基础。

    解决一个问题就是:

    高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操作的应用程序。

    如何解决的:

    应用程序的并行化和消息传递提供了基础。通过并行解决的。

    例子:

    异步方式从磁盘加载图像并创建复合图像的应用程序

    遇到的问题:

    协调任务和访问共享数据 需要 回调和同步。

    就是说共享数据的时候,需要同步。

    总结问题:共享数据代价大。

    如果解决的:

    由于运行时管理数据之间的依赖项,因此通常可以避免这种要求来同步访问共享数据。

    总结:解决了依赖,那么不需要同步了。

    综上所述:TPL 数据流库的作用在于解决数据之间的依赖,避免同步访问共享数据。

    正文

    链接数据流块

    var multiplyBlock = new TransformBlock<int, int>(item=>item*2);
    var subtractBlock = new TransformBlock<int, int>(item=> { Console.WriteLine(item);  return item - 2; });
    multiplyBlock.LinkTo(subtractBlock);
    multiplyBlock.Post(10);
    Console.ReadKey();
    

    打印出来就是20了。

    传递出错信息

    需要处理数据流网格中发生的错误

    如果数据流块的委托抛出异常,这个块就是故障块。一但数据流进入了故障状态,就会删除所有数据(停止接收新的数据)。

    什么意思呢?

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		return item * 2;
    	}
    	);
    	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item); return item - 2; });
    	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	try
    	{
    		multiplyBlock.Post(10);
    		multiplyBlock.Post(1);
    		multiplyBlock.Post(20);
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    结果是:


    有没有发现multiplyBlock.Post(20);,没有运行?

    因为一但一个有错误,那么就会终止,并销毁数据。

    这里和上面不同的是,new DataflowLinkOptions { PropagateCompletion = true}。

    多个这个东西,那么这个有啥用呢?

    因为我们链接块的时候,这个库不会帮助我们传递块运行的状态,如果不传递的话,下一个块是不晓得上一个块到底啥情况,这样不利于我们捕获异常。

    而这种传递做法,我们只要在最后的处理模块,统一处理错误就可以。

    断开链接

    这个我从来就没有遇到过。是这样子的,适用一种这样的场景。

    比如说有一个数据块需要动态替换,需要断开现有的模块然后接上新的数据块。

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		Console.WriteLine("item:" + item);
    		return item * 2;
    	}
    	);
    	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item-2); return item - 2; });
    	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; });
    	var link=multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	try
    	{
    		for (int i = 0; i < 20; i++)
    		{
    			multiplyBlock.Post(2);
    			if (i==10)
    			{
    				await Task.Delay(1000);
    				link.Dispose();
    				multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
    			}
    		}
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    结果是:

    值得注意的是,我这里了一个:

    await Task.Delay(1000);
    

    这是模拟动态运行的时候,因为当我post结束的时候,数据块链接还没开始传递。

    注:

    除非保证链接是空闲的情况下,否则在断开数据块的链接时候会出现竞争。
    
    竞争的是先断开还是先传递。
    
    但是这种竞争是安全的,他会保证要不断开,要不传递带下一个数据块。
    

    限制流量

    前面我们都是线性链接,就是一条路走到黑。但是呢,有时候出现分叉的时候,那么该如何均衡呢?

    之所以考虑均衡,是因为比如传递到下一个数据块的时候,是会有缓存的。如果有条分叉,一条分叉无限去缓存,那另外一条可能吃不上饭了。

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		Console.WriteLine("item:" + item);
    		return item * 2;
    	}
    	);
    	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item-2); return item - 2; });
    	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; });
    	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
    	try
    	{
    		for (int i = 0; i < 100; i++)
    		{
    			multiplyBlock.Post(2);
    		}
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    这种就属于没吃上饭的情况。

    static async void datalfow()
    {
    	var multiplyBlock = new TransformBlock<int, int>(item =>
    	{
    		if (item == 1)
    		{
    			throw new InvalidOperationException("not good");
    		}
    		Console.WriteLine("item:" + item);
    		return item * 2;
    	}
    	);
    	var options = new DataflowBlockOptions {BoundedCapacity=1 };
    	var subtractBlock = new TransformBlock<int, int>(item => {
    		return item - 2;
    	}, options);
    	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; }, options);
    	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
    	multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
    	try
    	{
    		for (int i = 0; i < 100; i++)
    		{
    			multiplyBlock.Post(2);
    		}
    		await subtractBlock.Completion;
    	}
    	catch(AggregateException e)
    	{
    		Console.WriteLine(e);
    	}
    }
    

    限制缓存为1,那么这时候我们就会相互切换。

    下一章

    整理:

    1.数据流块的并行处理

    2.创建自定义数据流块

    参考

    https://www.cnblogs.com/yswenli/p/8042594.html

  • 相关阅读:
    ExecuteScalar requires the command to have a transaction when the connection assigned to the command is in a pending
    如何从vss中分离程序
    String or binary data would be truncated
    the pop3 service failed to retrieve authentication type and cannot continue
    The POP3 service failed to start because
    IIS Error he system cannot find the file specified _找不到页面
    pku2575Jolly Jumpers
    pku2940Wine Trading in Gergovia
    pku3219二项式系数
    pku1029false coin
  • 原文地址:https://www.cnblogs.com/aoximin/p/12711637.html
Copyright © 2011-2022 走看看