zoukankan      html  css  js  c++  java
  • Net中的反应式编程

    Net中的反应式编程(Reactive Programming)

     

    系列主题:基于消息的软件架构模型演变

    一、反应式编程(Reactive Programming)

    1、什么是反应式编程:反应式编程(Reactive programming)简称Rx,他是一个使用LINQ风格编写基于观察者模式的异步编程模型。简单点说Rx = Observables + LINQ + Schedulers。

    2、为什么会产生这种风格的编程模型?我在本系列文章开始的时候说过一个使用事件的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    var watch = new FileSystemWatcher();
      watch.Created += (s, e) =>
      {
          var fileType = Path.GetExtension(e.FullPath);
          if (fileType.ToLower() == "jpg")
          {
              //do some thing
          }
      };

    这个代码定义了一个FileSystemWatcher,然后在Watcher事件上注册了一个匿名函数。事件的使用是一种命令式代码风格,有没有办法写出声明性更强的代码风格?我们知道使用高阶函数可以让代码更具声明性,整个LINQ扩展就是一个高阶函数库,常见的LINQ风格代码如下:

    1
    2
    3
    4
    var list = Enumerable.Range(1, 10)
                    .Where(x => x > 8)
                    .Select(x => x.ToString())
                    .First();

    能否使用这样的风格来编写事件呢?

    3、事件流
    LINQ是对IEnumerable<T>的一系列扩展方法,我们可以简单的将IEnumerable<T>认为是一个集合。当我们将事件放在一个时间范围内,事件也变成了集合。我们可以将这个事件集合理解为事件流。

    事件流的出现给了我们一个能够对事件进行LINQ操作的灵感。

    二、反应式编程中的两个重要类型

    事件模型从本质上来说是观察者模式,所以IObservable<T>和IObserver<T>也是该模型的重头戏。让我们来看看这两个接口的定义:

    1
    2
    3
    4
    5
    public interface IObservable<out T>
    {
          //Notifies the provider that an observer is to receive notifications.
          IDisposable Subscribe(IObserver<T> observer);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public interface IObserver<in T>
    {
        //Notifies the observer that the provider has finished sending push-based notifications.
        void OnCompleted();
     
        //Notifies the observer that the provider has experienced an error condition.
        void OnError(Exception error);
        
        //Provides the observer with new data.
        void OnNext(T value);
    }

    这两个名称准确的反应出了它两的职责:IObservable<T>-可观察的事物,IObserver<T>-观察者。

    IObservable<T>只有一个方法Subscribe(IObserver<T> observer),此方法用来对事件流注册一个观察者。

    IObserver<T>有三个回调方法。当事件流中有新的事件产生的时候会回调OnNext(T value),观察者会得到事件中的数据。OnCompleted()和OnError(Exception error)则分别用来通知观察者事件流已结束,事件流发生错误。

    显然事件流是可观察的事物,我们用Rx改写上面的例子:

    1
    2
    3
    4
    5
    6
    Observable.FromEventPattern<FileSystemEventArgs>(watch, "Created")
                    .Where(e => Path.GetExtension(e.EventArgs.FullPath).ToLower() == "jpg")
                    .Subscribe(e =>
                    {
                        //do some thing
                    });

    注:在.net下使用Rx编程需要安装以下Nuget组件:

    • Rx-Core
    • Rx-Interfaces
    • Rx-Linq

    三、UI编程中使用Rx

    Rx模型不但使得代码更加具有声明性,而且使得UI编程更加简单。

    1、UI编程中的第一段Rx代码

    为了简单的展示如何在UI编程中使用Rx,我们以Winform中的Button为例,看看事件模型和Rx有何不同。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private void BindFirstGroupButtons()
     {
         btnFirstEventMode.Click += btnFirstEventMode_Click;
     }
     
     void btnFirstEventMode_Click(object sender, EventArgs e)
     {
         MessageBox.Show("hello world");
     }

    添加了一个Button,点击Button的时候弹出一个对话框。使用Rx做同样的实现:

    1
    2
    3
    4
    //得到了Button的Click事件流。
    var clickedStream = Observable.FromEventPattern<EventArgs>(btnFirstReactiveMode, "Click");
    //在事件流上注册了一个观察者。
    clickedStream.Subscribe(e => MessageBox.Show("Hello world"));

    有朋友指出字符串“Click”非常让人不爽,这确实是个问题。由于Click是一个event类型,无法用表达式树获取其名称,最终我想到使用扩展方法来实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static IObservable<EventPattern<EventArgs>> FromClickEventPattern(this Button button)
     {
         return Observable.FromEventPattern<EventArgs>(button, "Click");
     }
     
     public static IObservable<EventPattern<EventArgs>> FromDoubleClickEventPattern(this Button button)
     {
         return Observable.FromEventPattern<EventArgs>(button, "DoubleClick");
     }

    我们平时常用的事件类型也就那么几个,可以暂时通过这种方案来实现,该方案算不上完美,但是比起直接使用字符串又能优雅不少。

    1
    2
    btnFirstReactiveMode.FromClickEventPattern()
                    .Subscribe(e => MessageBox.Show("hello world"));

    2、UI编程中存在一个很常见的场景:当一个事件的注册者阻塞了线程时,整个界面都处于假死状态。.net中的异步模型也从APM,EAP,TPL不断演化直至async/await模型的出现才使得异步编程更加简单易用。我们来看看界面假死的代码:

    1
    2
    3
    4
    5
    6
    void btnSecondEventMode_Click(object sender, EventArgs e)
     {
         btnSecondEventMode.BackColor = Color.Coral;
         Thread.Sleep(5000);
         MessageBox.Show("hello world");
     }

    Thread.Sleep(5000);模拟了一个长时间的操作,当你点下Button时整个界面处于假死状态并且此时的程序无法响应其他的界面事件。传统的解决方案是使用多线程来解决假死:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void BtnSecondEventAsyncModel_Click(object sender, EventArgs e)
    {
        Action action = () =>
        {
            Task.Run(() =>
            {
                BtnSecondEventAsyncModel.BackColor = Color.Coral;
                Thread.Sleep(5000);
                MessageBox.Show("hello world");
            });
     
        };
        BeginInvoke(action);
    }

    这个代码的复杂点在于:普通的多线程无法对UI进行操作,在Winform中需要用Control.BeginInvoke(Action action)经过包装后,多线程中的UI操作才能正确执行,WPF则要使用Dispatcher.BeginInvoke(Action action)包装。

    在Rx中选择合适的Scheduler类型就可以轻松解决多线程更新UI的问题。

    1
    2
    3
    4
    5
    6
    7
    8
    btnSecondReactiveMode.FromClickEventPattern()
                    .ObserveOn(new NewThreadScheduler())
                    .Subscribe(e =>
                    {
                        btnSecondReactiveMode.BackColor = Color.Coral;
                        Thread.Sleep(5000);
                        MessageBox.Show("hello world");
                    });

    一句ObserveOn(new NewThreadScheduler())成功让后面的观察者跑在了新线程中,并且避免了多线程更新UI的问题,使用者无需再被这种问题所困扰。——抱歉,这一段描述有问题,简单的使用NewThreadScheduler并不能保证异步更新UI。待我周末了详细补充一下Rx中的异步能力。
    注:使用Scheduler需要从Nuget中安装Rx-PlatformServices。

    常用的Scheduler有:

    • DefaultScheduler:默认的Scheduler,会将观察者添加在调用队列中
    • ImmediateScheduler:立即执行
    • TaskPoolScheduler:用来执行短时间的任务
    • ...

    3、再来一个例子,让我们感受一下Rx的魅力

    界面上有两个Button分别为+和-操作,点击+按钮则+1,点击-按钮则-1,最终的结果显示在一个Label中。
    这样的一个需求使用经典事件模型只需要维护一个内部变量,两个按钮的Click事件分别对变量做加1或减1的操作即可。
    Rx作为一种函数式编程模型讲求immutable-不可变性,即不使用变量来维护内部状态。

    1
    2
    3
    4
    5
    6
    7
    8
    var increasedEventStream = btnIncreasement.FromClickEventPattern()
        .Select(_ => 1);
    var decreasedEventStream = btnDecrement.FromClickEventPattern()
        .Select(_ => -1);
     
    increasedEventStream.Merge(decreasedEventStream)
        .Scan(0, (result, s) => result + s)
        .Subscribe(x => lblResult.Text = x.ToString());

    这个例子使用了IObservable<T>的”谓词”来对事件流做了一些操作。

    • Select跟Linq操作有点类似,分别将两个按钮的事件变形为IObservable<int>(1)和IObservable<int>(-1);
    • Merge操作将两个事件流合并为一个;
    • Scan稍显复杂,对事件流做了一个折叠操作,给定了一个初始值,并通过一个函数来对结果和下一个值进行累加;

    下面就让我们来看看IObservable<T>中常用的“谓词”

    四、IObservable<T>中的谓词

    IObservable<T>的灵感来源于LINQ,所以很多操作也跟LINQ中的操作差不多,例如Where、First、Last、Single、Max、Any。
    还有一些“谓词”则是新出现的,例如上面提到的”Merge”、“Scan”等,为了理解这些“谓词”的含义,我们请出一个神器RxSandbox

    1、Merge操作,从下面的图中我们可以清晰的看出Merge操作将三个事件流中的事件合并在了同一个时间轴上。

    2、Where操作则是根据指定的条件筛选出事件。

    有了这个工具我们可以更加方便的了解这些“谓词”的用途。

    五、IObservable<T>的创建

    Observable类提供了很多静态方法用来创建IObservable<T>,之前的例子我们都使用FromEventPattern方法来将事件转化为IObservable<T>,接下来再看看别的方法。

    Range方法可以产生一个指定范围内的IObservable<T>

    1
    2
    Observable.Range(1, 10)
              .Subscribe(x => Console.WriteLine(x.ToString()));

    Interval方法可以每隔一定时间产生一个IObservable<T>:

    1
    2
    Observable.Interval(TimeSpan.FromSeconds(1))
               .Subscribe(x => Console.WriteLine(x.ToString()));

    Subscribe方法有一个重载,可以分别对Observable发生异常和Observable完成定义一个回调函数。

    1
    2
    Observable.Range(1, 10)
              .Subscribe(x => Console.WriteLine(x.ToString()), e => Console.WriteLine("Error" + e.Message), () => Console.WriteLine("Completed"));

    还可以将IEnumerable<T>转化为IObservable<T>类型:

    1
    2
    Enumerable.Range(1, 10).ToObservable()
              .Subscribe(x => Console.WriteLine(x.ToString()));

    也可以将IObservable<T>转化为IEnumerable<T>

    1
    var list= Observable.Range(1, 10).ToEnumerable();

    六、其他Rx资源

    除了.net中的Rx.net,其他语言也纷纷推出了自己的Rx框架。

     
    分类: .NET
  • 相关阅读:
    完数
    自定义的allocator
    成绩的处理
    R语言-线性回归(1)
    R语言-朴素贝叶斯分类器(1)
    R语言控制流
    leetcode Two sum
    数据库环境搭建
    表单验证制作注册页面
    表单验证
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4983608.html
Copyright © 2011-2022 走看看