zoukankan      html  css  js  c++  java
  • Rx .Net 简介

    1 定义:

    Reactive Extensions是一个遵循函数式编程的类库,它引用【观察者模式】以及【迭代器模式】对可观察对象产生的数据进行异步消费。

    使用Rx需要引用System.Reactive.Core的Nuget程序包(.Net Core)

    2 核心:

    2个核心接口:IObservable<T>、IObserver<T>

    其中IObservable代表观察源,而IObserver是观察者(“鼠标点击”是观察源,“点击后刷新”是观察者)

    2.1 IObservable接口

    IObservable只有一个方法,就是【触发事件】

    [NullableContextAttribute(1)]
    public interface IObservable<[NullableAttribute(2)] out T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }

    2.2 IObserver接口

    [NullableContextAttribute(1)]
    public interface IObserver<[NullableAttribute(2)] in T>
    {
      
        void OnCompleted();
    
        void OnError(Exception error);
    
        void OnNext(T value);
    }

    void OnNext<T>(T value), 序列里有新的值的时候会调用这个

    void OnCompleted(), 序列结束的时候调用这个

    void OnError(Exception ex), 发生错误的时候调用这个

    3 获取事件源的方式

    3.1 获取事件源

     /// <summary>
     /// 获取返回一个简单值的事件源
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetSimpleObservable()
     {
         return Observable.Return(42);
     }
    
     /// <summary>
     /// 返回抛出一个异常的事件源
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetThrowObservable()
     {
         return Observable.Throw<int>(new ArgumentException("Error in observable"));
     }
    
     /// <summary>
     /// 返回一个空的事件源
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetEmptyObservable()
     {
         return Observable.Empty<int>();
     }
    
     /// <summary>
     /// 返回一个任务事件源
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetTaskObservable()
     {
         return GetTask().ToObservable();
     }
    
     private static async Task<int> GetTask()
     {
         return 42;
     }
    
     /// <summary>
     /// 返回一个范围的事件源序列
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetRangeObservable()
     {
         return Observable.Range(2, 10);
     }
    
     /// <summary>
     /// 返回一个定时器事件源序列
     /// </summary>
     /// <returns></returns>
     private static IObservable<long> GetIntervalObservable()
     {
         return Observable.Interval(TimeSpan.FromMilliseconds(200));
     }
    
     /// <summary>
     /// 创建一个基本事件源序列
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetCreateObservable()
     {
         return Observable.Create<int>(observer =>
         {
             observer.OnNext(1);
             observer.OnNext(2);
             observer.OnNext(3);
             observer.OnNext(4);
             observer.OnCompleted();
             return Disposable.Empty;
         });
     }
    
     /// <summary>
     /// 创建一个类似for循环的事件源序列
     /// </summary>
     /// <returns></returns>
     private static IObservable<int> GetGenerateObservable()
     {
         // 类似for循环
         return Observable.Generate(
             1,              // 初始值
             x => x < 10,    // 循环停止条件
             x => x + 1,     // 循环一次+1
             x => x + 20     // 循环中执行的操作
         );
     }

    3.2 事件调用

    最简单的用法应该是

    class Program
    {
        static void Main(string[] args)
        {
            var numbers = Observable.Range(2, 10);
            var observer = new MyConsoleObserver<int>();
            numbers.Subscribe(observer);
            Console.ReadLine();
        }
    }
    
    /// <summary>
    /// 自定义观察者对象
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class MyConsoleObserver<T> : IObserver<T>
    {
        public void OnNext(T value)
        {
            Console.WriteLine("接收到 value {0}", value);
        }
        public void OnError(Exception error)
        {
            Console.WriteLine("出现异常! {0}", error);
        }
        public void OnCompleted()
        {
            Console.WriteLine("关闭观察行为");
        }
    }

    但是上述方法没有办法使用【多播】的特性,要想使用多播就必须使用 ISubject 接口了;通过 ISubject 接口就可以为事件源绑定多个相应。

    代码如下:

    static async Task Main(string[] args)
    {
    
        // 定义事件流
        var sequence = GetGenerateObservable();
        // 对事件流进行筛选
        sequence = sequence.Where(a => a < 23);
    
        // 注册观察者
        ISubject<int> subject = new ReplaySubject<int>();//申明Subject
        subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject
        subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject
        
        // 触发
        sequence.Subscribe(subject);
        Console.ReadKey();
    }
    private static IObservable<int> GetGenerateObservable()
    {
        // 类似for循环
        return Observable.Generate(
            1,              // 初始值
            x => x < 10,    // 循环停止条件
            x => x + 1,     // 循环一次+1
            x => x + 20     // 循环中执行的操作
        );
    }

    4 制造观察者 ISubject

    ISubject接口可以用来生成观察者,其实通过定义可以知道他既是观察者也是观察源

    public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>

    但是我们还是主要用它来生成观察者信息

    比如上面例子中的 ReplaySubject 就是 ISubject 的一种实现,ISubject有多种实现,区别如下

    1. Subject - 向所有观察者广播每个通知
    2. AsyncSubject - 当可观察序列完成后有且仅发送一个通知
    3. ReplaySubject - 缓存指定通知以对后续订阅的观察者进行重放
    4. BehaviorSubject - 推送默认值或最新值给观察者

     通过ISubject可以实现定义观察者,并实现多播,上面已经有例子

    5 观察源的温度

    1. Cold Observable:当有观察者订阅时才发送通知,且每个观察者【独享】一份完整的观察者序列。在被订阅 Subscribe 后运行,也就是说,observables序列仅在subscribe函数被调用后才会推送数据。
    2. Hot Observable:不管有无观察者订阅都会发送通知,且所有观察者【共享】同一份观察者序列。Hot Observables在被订阅之前就已经开始产生数据,例如鼠标移动事件。

    5.1 冷观察源

    var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1);
    clock.Subscribe(x => Console.WriteLine($"a:{x}"));
    Thread.Sleep(2000);
    clock.Subscribe(x => Console.WriteLine($"c:{x}"));
    Console.ReadKey();

    输出:

     可以看到c的输出虽然晚了2秒,但是他还是拿着完整的一份观察者序列的;

    5.2 热观察源

    var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1).Publish();
    clock.Connect();
    clock.Subscribe(x => Console.WriteLine($"a:{x}"));
    Thread.Sleep(2000);
    clock.Subscribe(x => Console.WriteLine($"c:{x}"));
    Console.ReadKey();

    输出:

     可以看到c在延迟后,没有办法获取1的数据,只能获取和a同步的数据,也就是c和a是公用的观察者序列。

    其中publish是将冷序列转换为热序列,connect方法是触发使生成观察者序列

    可以稍微在connect和注册观察者之间稍微加一个sleep如下

    var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1).Publish();
    clock.Connect();
    Thread.Sleep(2000);
    clock.Subscribe(x => Console.WriteLine($"a:{x}"));
    Thread.Sleep(2000);
    clock.Subscribe(x => Console.WriteLine($"c:{x}"));
    Console.ReadKey();

    输出:

     可以看到由于connect和注册没在一起,a也获取不到完整的序列了。

    5.3 RefCount方法

    还有一种特殊的RefCount方法需要注意

    var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1).Publish().RefCount();
    Thread.Sleep(2000);
    clock.Subscribe(x => Console.WriteLine($"a:{x}"));
    Thread.Sleep(2000);
    clock.Subscribe(x => Console.WriteLine($"c:{x}"));
    Console.ReadKey();

    输出:

     可以看出虽然sleep了2s,但是a依然获取到完整的观察者序列,可见RefCount是默认看到第一个观察者后才会生产观察者序列的。

    6 Rx的调度 Scheduler

    在Rx中,使用Scheduler来控制并发。而对于Scheduler我们可以理解为程序调度,通过Scheduler来规定在什么时间什么地点执行什么事情。Rx提供了以下几种Scheduler:

    1. NewThreadScheduler:即在新线程上执行
    2. ThreadPoolScheduler:即在线程池中执行
    3. TaskPoolScheduler:与ThreadPoolScheduler类似
    4. CurrentThreadScheduler:在当前线程执行
    5. ImmediateScheduler:在当前线程立即执行
    6. EventLoopScheduler:创建一个后台线程按序执行所有操作

    使用方式如下:

    Observable.Return(42, NewThreadScheduler.Default);

    总结

    其实Rx .Net就是对观察者模式的linq封装,使用起来会比event更方便一些,因为可以对事件源进行合并,筛选,聚合等操作。

    这个和事件总线还不一样,事件总线是将事件进行集中处理的功能。

    参考:

    https://www.cnblogs.com/Leo_wl/p/10400983.html

    https://segmentfault.com/a/1190000011052037

  • 相关阅读:
    阻止 gulp 发生异常后退出
    Implementing DDD Reading
    Chrome 37 Beta: 使用Windows的DirectWrite和支持<dialog>元素
    作为一个程序员,你的进步完全取决于你自己
    Accessing Report Server using Report Server Web Service
    admin自定义后台注意事项
    BBS项目
    python之路_django中form组件
    python之路_django中ajax应用
    python之路_django 认证系统
  • 原文地址:https://www.cnblogs.com/gamov/p/13143436.html
Copyright © 2011-2022 走看看