zoukankan      html  css  js  c++  java
  • Reactive Extensions学习

    今天抽空学习了一下Reactive Extensions库,感觉还是比较容易上手的,顺手练习了一下,写了个ReadAsync的扩展。 

        static void Main(string[] args)
        {
            var bufferSize 
    = 1000;
            var buffer 
    = new byte[bufferSize];

            var stream 
    = File.OpenRead(@"r:\1.txt");

            stream.ReadAsync(buffer).Subscribe(
                            onNext: i 
    => { Console.Write(Encoding.Default.GetString(buffer, 0, i)); },
                            onCompleted: () 
    => Console.WriteLine("completed")
                        );

            Console.ReadLine();
        }

        
    public static IObservable<int> ReadAsync(this Stream stream, byte[] buffer)
        {
            var bufferLen 
    = buffer.Length;
            var readAsync 
    = Observable.FromAsyncPattern<byte[], intintint>(stream.BeginRead, stream.EndRead);

            
    return Observable.Defer(() => readAsync(buffer, 0, bufferLen)).Repeat().TakeWhile(i => i > 0)

            // return Observable.While(() => stream.Position != stream.Length, Observable.Defer(() => readAsync(buffer, 0, bufferLen)));
        }

        
    public static IObservable<string> ReadLinesAsync(this Stream stream, byte[] buffer)
        {
            
    return Observable.CreateWithDisposable<string>(observer =>
            {
                var blocks 
    = stream.ReadAsync(buffer).Select(i => Encoding.Default.GetString(buffer, 0, i));
                var lastLine 
    = string.Empty;

                
    return blocks.Subscribe(data =>
                {
                    
    if (data.Contains("\r\n"))
                    {
                        var lines 
    = data.Split(new string[] { "\r\n" }, StringSplitOptions.None);

                        observer.OnNext(lastLine 
    + lines[0]);
                        
    foreach (var line in lines.Skip(1).Take(lines.Length - 2))
                        {
                            observer.OnNext(line);
                        }

                        lastLine 
    = lines[lines.Length - 1];
                    }
                    
    else
                    {
                        lastLine 
    += data;
                    }
                },
                observer.OnError,
                () 
    =>
                {
                    observer.OnNext(lastLine);
                    observer.OnCompleted();
                });
            });
        }

    当然,到C# 5.0后,在编译器的原生语法糖支持下,实现这个扩展更加简单和直观,不过Reactive Extensions是异步的基础,并且在一些高级应用方便语法糖仍然不能取代它。先熟悉下这个框架还是有必要的。

    由于目前还只熟悉了皮毛,有许多东西还不会,等后续详细了解后再专门写篇文章介绍它。对这个库感兴趣的朋友可以访问一下下面几个链接:

    Reactive Extensions for .NET (Rx)

    Rx: Curing your asynchronous programming blues

    Introducing the Reactive Framework

  • 相关阅读:
    java.lang.NoSuchMethodError:antlr.collections.AST.getLine() I
    T7 java Web day01 标签HTML
    T6 s1 day19
    T5 s5 Day18
    T5 s4 Day 17
    T5 s3 day16
    T5 s2 Day 15
    T5 s1 day14
    T4 S03 day 12
    T4 S01 day1
  • 原文地址:https://www.cnblogs.com/TianFang/p/2011967.html
Copyright © 2011-2022 走看看