zoukankan      html  css  js  c++  java
  • Reactive Extensions学习

    今天抽空学习了一下Reactive Extensions库,感觉还是比较容易上手的,顺手练习了一下,写了个ReadAsync的扩展。 

        static void Main(string[] args)
        {
            var bufferSize 
    = 1000;
            var buffer 
    = new byte[bufferSize];

            var stream 
    = File.OpenRead(@"r:\1.txt");

            stream.ReadAsync(buffer).Subscribe(
                            onNext: i 
    => { Console.Write(Encoding.Default.GetString(buffer, 0, i)); },
                            onCompleted: () 
    => Console.WriteLine("completed")
                        );

            Console.ReadLine();
        }

        
    public static IObservable<int> ReadAsync(this Stream stream, byte[] buffer)
        {
            var bufferLen 
    = buffer.Length;
            var readAsync 
    = Observable.FromAsyncPattern<byte[], intintint>(stream.BeginRead, stream.EndRead);

            
    return Observable.Defer(() => readAsync(buffer, 0, bufferLen)).Repeat().TakeWhile(i => i > 0)

            // return Observable.While(() => stream.Position != stream.Length, Observable.Defer(() => readAsync(buffer, 0, bufferLen)));
        }

        
    public static IObservable<string> ReadLinesAsync(this Stream stream, byte[] buffer)
        {
            
    return Observable.CreateWithDisposable<string>(observer =>
            {
                var blocks 
    = stream.ReadAsync(buffer).Select(i => Encoding.Default.GetString(buffer, 0, i));
                var lastLine 
    = string.Empty;

                
    return blocks.Subscribe(data =>
                {
                    
    if (data.Contains("\r\n"))
                    {
                        var lines 
    = data.Split(new string[] { "\r\n" }, StringSplitOptions.None);

                        observer.OnNext(lastLine 
    + lines[0]);
                        
    foreach (var line in lines.Skip(1).Take(lines.Length - 2))
                        {
                            observer.OnNext(line);
                        }

                        lastLine 
    = lines[lines.Length - 1];
                    }
                    
    else
                    {
                        lastLine 
    += data;
                    }
                },
                observer.OnError,
                () 
    =>
                {
                    observer.OnNext(lastLine);
                    observer.OnCompleted();
                });
            });
        }

    当然,到C# 5.0后,在编译器的原生语法糖支持下,实现这个扩展更加简单和直观,不过Reactive Extensions是异步的基础,并且在一些高级应用方便语法糖仍然不能取代它。先熟悉下这个框架还是有必要的。

    由于目前还只熟悉了皮毛,有许多东西还不会,等后续详细了解后再专门写篇文章介绍它。对这个库感兴趣的朋友可以访问一下下面几个链接:

    Reactive Extensions for .NET (Rx)

    Rx: Curing your asynchronous programming blues

    Introducing the Reactive Framework

  • 相关阅读:
    【上线复盘】20190329-一个刷新数据的接口是如何导致几十万的订单数据错误
    VS------csc.exe已停止工作解决方法
    SQLServer------存储过程的使用
    SQLServer------聚集索引和非聚集索引的区别
    SQLServer------Sql Server性能优化辅助指标SET STATISTICS TIME ON和SET STATISTICS IO ON
    SQLServer------如何快速插入几万条测试数据
    SQLServer------如何让标识列重新开始计算
    SQLServer------begin tran/commit tran事务的使用方法
    SQLServer------插入数据时出现IDENTITY_INSERT错误
    Spring----Spring Boot Rest的使用方法
  • 原文地址:https://www.cnblogs.com/TianFang/p/2011967.html
Copyright © 2011-2022 走看看