zoukankan      html  css  js  c++  java
  • Rx基础

    >>返回《C# 并发编程》

    要在应用中安装一个 NuGetSystem.Reactive

    • Rx 可以认作是 LINQ to events(基于 IObservable<T>
    • 所有的 LINQ 操作都可以在 Rx 中使用。
    • 从概念上看,过滤(Where)、投影(Select)等简单操作,和其他 LINQ 提供者的操作是一样的

    1. 转换.NET事件

    1.1. 进度通知

    public static void ProgressRun()
    {
        var progress = new Progress<int>();
        var progressReports = Observable.FromEventPattern<int>(
            handler => progress.ProgressChanged += handler,
            handler => progress.ProgressChanged -= handler)
            //.Where(u => u.EventArgs % 2 == 0)
            ;
        progressReports.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs},ThreadId:{Thread.CurrentThread.ManagedThreadId}."));
    
        Reports(progress);
    }
    
    private static void Reports(IProgress<int> progress)
    {
        System.Console.WriteLine($"Reporting ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
        for (int i = 0; i < 10; i++)
        {
            progress.Report(i);
        }
        System.Console.WriteLine($"Reported ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
    }
    

    输出:

    Reporting ThreadId:1.
    Reported ThreadId:1.
    OnNext:5,ThreadId:9.
    OnNext:0,ThreadId:4.
    OnNext:6,ThreadId:10.
    OnNext:1,ThreadId:5.
    OnNext:2,ThreadId:6.
    OnNext:4,ThreadId:8.
    OnNext:3,ThreadId:7.
    OnNext:7,ThreadId:11.
    OnNext:9,ThreadId:13.
    OnNext:8,ThreadId:12.
    

    1.2. 定时器示例

    public static void TimerRun()
    {
        var timer = new System.Timers.Timer(interval: 300) { Enabled = true };
        var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => timer.Elapsed += handler,
            handler => timer.Elapsed -= handler);
        ticks.Subscribe(data => Console.WriteLine($"OnNext:{data.EventArgs.SignalTime.Millisecond}, ThreadId:{Thread.CurrentThread.ManagedThreadId}."));
        System.Console.WriteLine($"Timer start ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
        timer.Start();
        Thread.Sleep(2000);
        timer.Stop();
        System.Console.WriteLine($"Timer stop ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
    }
    

    输出:

    Timer start ThreadId:1.
    OnNext:473, ThreadId:4.
    OnNext:772, ThreadId:5.
    OnNext:73, ThreadId:5.
    OnNext:373, ThreadId:5.
    OnNext:673, ThreadId:5.
    OnNext:975, ThreadId:5.
    Timer stop ThreadId:1.
    

    1.3. 错误传递

    public static void ObErrorRun()
    {
        var tcs = new TaskCompletionSource<string>();
        var client = new WebClient();
        var downloadedStrings = Observable.FromEventPattern(client,
        "DownloadStringCompleted"); downloadedStrings.Subscribe(
        data =>
        {
            var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;
            if (eventArgs.Error != null)
            {
                Console.WriteLine("OnNext: (Error) " + eventArgs.Error.GetType());
            }
            else
            {
                Console.WriteLine("OnNext: " + eventArgs.Result);
            }
        },
        ex => Console.WriteLine("OnError: " + ex.GetType()),
        () => Console.WriteLine("OnCompleted"));
        client.DownloadStringAsync(new Uri("http://invalid.example.com/"));
        //client.DownloadStringAsync(new Uri("http://www.baidu.com/"));
        Thread.Sleep(3000);
    }
    

    输出:

    OnNext: (Error) System.Net.WebException
    

    把事件封装进 Observable 对象后,每次引发该事件都会调用 OnNext 。在处理 AsyncCompletedEventArgs 时会发生令人奇怪的现象,所有的异常信息都是通过数据形式传递的(OnNext),而不是通过错误传递(OnError)。

    2. 发通知给上下文

    如 UI 元素只能被它所属的 UI 线程控制,因此,如果要根据 Rx 的通知来修改 UI,就应该把通知“转移”到 UI 线程。

    • 可以使用 ObserveOn 把通知移动到一个线程池线程,在那里进行计算,然后再把表示结果的通知返回给 UI 线程
    • 通过同步上下文切换实现
    public delegate void HelloEventHandler(object sender, HelloEventArgs e);
    public class HelloEventArgs : EventArgs
    {
        public string Name { get; set; }
        public HelloEventArgs(string name)
        {
            Name = name;
        }
        public int SayHello()
        {
            System.Console.WriteLine(Name + " Hello.");
            return DateTime.Now.Millisecond;
        }
    }
    
    public static event HelloEventHandler HelloHandlerEvent;
    public static void ObservableEventRun()
    {
        IDisposable ob = null;
        var task = Task.Run(() =>
        {
            Thread.Sleep(500);
            HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei"));
            HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei"));
            HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom"));
            HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry"));
            Thread.Sleep(2000);
            ob?.Dispose();
            HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("NoConsole")); // 由于
        });
        // AsyncContext 比如就是 UI上下文
        AsyncContext.Run(() =>
        {
            var uiContext = SynchronizationContext.Current;
            Console.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);                                                                           //Observable.FromEvent()
    
            ob = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>(
                handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler)
            .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default)
            .Select(s =>
             {
                // 复杂的计算过程。
                Thread.Sleep(100);
                var result = s;
                Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId);
                return result;
             })
            .ObserveOn(uiContext)
            .Subscribe(s => Console.WriteLine("Subscribe Result " + s + " on thread " + Environment.CurrentManagedThreadId));
            //此处不能 task.Wait(); ,会和 Subscribe 中的委托发生死锁
            System.Console.WriteLine("AsyncContext.Run Done on thread " + Environment.CurrentManagedThreadId);
        });
        task.Wait();
    }
    

    输出:

    UI thread is 1
    AsyncContext.Run Done on thread 1
    lilei Hello.
    HanMeimei Hello.
    Tom Hello.
    Jerry Hello.
    Now Millisecond result 36 on thread 6
    Subscribe Result 36 on thread 1
    Now Millisecond result 44 on thread 6
    Subscribe Result 44 on thread 1
    Now Millisecond result 44 on thread 6
    Subscribe Result 44 on thread 1
    Now Millisecond result 44 on thread 6
    Subscribe Result 44 on thread 1
    

    3. 用窗口和缓冲对事件分组

    下面的例子使用 Interval ,每秒创建 1 个 OnNext 通知,然后, 使用 Buffer , 每 2 个通知做一次缓冲:

    public static void BufferRun()
    {
        System.Console.WriteLine($"Buffer start ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
    
        var ob = Observable.Interval(TimeSpan.FromMilliseconds(10))
         .Buffer(2)
         .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Got {string.Join(",", x)}({Thread.CurrentThread.ManagedThreadId})"));
        Thread.Sleep(100);
        ob.Dispose();
        System.Console.WriteLine($"Buffer end ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
    }
    

    输出:

    Buffer start ThreadId:1.
    459: Got 0,1(5)
    478: Got 2,3(5)
    498: Got 4,5(5)
    516: Got 6,7(5)
    536: Got 8,9(5)
    Buffer end ThreadId:1.
    

    下面的例子有些类似,使用 Window 创建一些事件组,每组包含 2 个事件:

    public static void WindowRun()
    {
        System.Console.WriteLine($"Window start ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
        var ob = Observable.Interval(TimeSpan.FromMilliseconds(10))
        .Window(2)
        .Subscribe(group =>
        {
            Console.WriteLine($"{DateTime.Now.Millisecond}: Starting new group({Thread.CurrentThread.ManagedThreadId})");
            group.Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x},(TID:{Thread.CurrentThread.ManagedThreadId})"),
                () => Console.WriteLine($"{DateTime.Now.Millisecond}: Ending group"));
        });
        Thread.Sleep(100);
        ob.Dispose();
        System.Console.WriteLine($"Window end ThreadId:{Thread.CurrentThread.ManagedThreadId}.");
    
    }
    

    输出:

    Window start ThreadId:1.
    959: Starting new group(1)
    987: Saw 0,(TID:4)
    991: Saw 1,(TID:4)
    992: Ending group
    994: Starting new group(4)
    0: Saw 2,(TID:4)
    11: Saw 3,(TID:4)
    11: Ending group
    11: Starting new group(4)
    21: Saw 4,(TID:4)
    30: Saw 5,(TID:4)
    30: Ending group
    30: Starting new group(4)
    40: Saw 6,(TID:4)
    50: Saw 7,(TID:4)
    50: Ending group
    51: Starting new group(4)
    60: Saw 8,(TID:4)
    70: Saw 9,(TID:4)
    70: Ending group
    70: Starting new group(4)
    Window end ThreadId:1.
    

    这几个例子说明了 BufferWindow 的区别:

    • Buffer 等待组内的所有事件,然后把所有事件作为一个集合发布
    • Window 用同样的方法进行分组,但它是在每个事件到达时就发布

    4. 用限流和抽样抑制事件流

    4.1. Throttle

    下面的例子也是监视鼠标移动, 但使用了 Throttle ,在鼠标保持静止 1 秒后才报告最近一条移动事件。

    public delegate void MouseEventHandler(object sender, MouseEventArgs e);
    public class MouseEventArgs : EventArgs
    {
        public (int, int) XY { get; set; }
        public MouseEventArgs((int, int) xy)
        {
            XY = xy;
        }
        public (int, int) GetPosition()
        {
            return XY;
        }
    }
    
    public static event MouseEventHandler MouseMove;
    public static void ThrottleRun()
    {
        IDisposable ob = null;
        var task = Task.Run(() =>
        {
            Thread.Sleep(200);
            //不触发
            MouseMoveProcess((1, 1));
            MouseMoveProcess((1, 11));
            MouseMoveProcess((1, 111));
            MouseMoveProcess((1, 1111));
            //触发
            MouseMoveProcess((2, 2), 2000);
            //超时结束
            MouseMoveProcess((2, 22));
            ob?.Dispose();
        });
    
        ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition())
        .Throttle(TimeSpan.FromMilliseconds(500))
        .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"));
    
        task.Wait();
    }
    
    private static void MouseMoveProcess((int, int) xy, int sleepMillsecond = 200)
    {
        System.Console.WriteLine($"Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}.");
        MouseMove?.Invoke(new object(), new MouseEventArgs(xy));
        Thread.Sleep(sleepMillsecond);
    }
    

    输出:

    Mouse Move 1,1,After sleep 200.
    Mouse Move 1,11,After sleep 200.
    Mouse Move 1,111,After sleep 200.
    Mouse Move 1,1111,After sleep 200.
    Mouse Move 2,2,After sleep 2000.
    251: Saw 2,2
    Mouse Move 2,22,After sleep 200.
    

    Throttle 常用于类似“文本框自动填充”这样的场合

    • 用户在文本框中输入文字,当他停止输入时,才需要进行真正的检索。

    4.2. Sample

    为抑制快速运动的事件序列, Sample 建立了一个有规律的超时时间段, 每个时间段结束时,它就发布该时间段内最后的一条数据。如果这个时间段没有数据,就不发布。

    每隔一秒采样一次

    public static void SampleRun()
    {
        IDisposable ob = null;
        var task = Task.Run(() =>
        {
            Thread.Sleep(200);
            //不触发
            MouseMoveProcess((1, 1));
            MouseMoveProcess((1, 11));
            MouseMoveProcess((1, 111));
            MouseMoveProcess((1, 1111));
            //触发
            MouseMoveProcess((2, 2), 2000);
            //超时结束
            MouseMoveProcess((2, 22));
            ob?.Dispose();
        });
    
        ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition())
        .Sample(TimeSpan.FromMilliseconds(500))
        .Subscribe(x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"));
    
        task.Wait();
    }
    

    输出:

    Mouse Move 1,1,After sleep 200.
    Mouse Move 1,11,After sleep 200.
    498: Saw 1,11
    Mouse Move 1,111,After sleep 200.
    Mouse Move 1,1111,After sleep 200.
    Mouse Move 2,2,After sleep 2000.
    991: Saw 2,2
    Mouse Move 2,22,After sleep 200.
    992: Saw 2,22
    

    ThrottleSample 操作符与 Where 基本差不多,唯一的区别是:

    • ThrottleSample 根据时间段过滤
    • Where 根据事件的数据过滤

    在抑制快速涌来的输入流时,这三种操作符提供了三种不同的方法。

    5. 超时

    Timeout 操作符在输入流上建立一个可调节的超时窗口。一旦新的事件到达,就重置超时窗口。如果超过期限后事件仍没到达, Timeout 操作符就结束流,并产生一个包含TimeoutException 的 OnError 通知。

    public static void TimeoutRun()
    {
        IDisposable ob = null;
        var task = Task.Run(() =>
        {
            Thread.Sleep(200);
            //不触发
            MouseMoveProcess((1, 1));
            MouseMoveProcess((1, 11));
            MouseMoveProcess((1, 111));
            MouseMoveProcess((1, 1111));
            //触发
            MouseMoveProcess((2, 2), 1100);
            //超时结束
            MouseMoveProcess((2, 22));
            ob?.Dispose();
        });
        ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition())
        .Timeout(TimeSpan.FromSeconds(1))//Subscribe后相对一秒超时(连续触发则不会超时)
        .Subscribe(
            x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"),
            ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}"),
            // onCompleted 不会执行
            () => System.Console.WriteLine($"{DateTime.Now.Millisecond}: Finished.")
        );
    
        System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done");
    
        task.Wait();
    }
    

    输出:

    138 Subscribe Done
    Mouse Move 1,1,After sleep 200.
    313: Saw 1,1
    Mouse Move 1,11,After sleep 200.
    517: Saw 1,11
    Mouse Move 1,111,After sleep 200.
    722: Saw 1,111
    Mouse Move 1,1111,After sleep 200.
    923: Saw 1,1111
    Mouse Move 2,2,After sleep 1100.
    124: Saw 2,2
    139: TimeoutException
    Mouse Move 2,22,After sleep 200.
    

    在超时之前观察鼠标移动,超时发生后进行切换

    public static event MouseEventHandler OtherMouseMove;
    
    public static void TimeoutMoveRun()
    {
        IDisposable ob = null;
        var task = Task.Run(() =>
        {
            Thread.Sleep(200);
            //不触发
            MouseMoveProcess((1, 1), 400);
            MouseMoveProcess((1, 11), 0);
            //为了触发超时
            Thread.Sleep(1100);
            System.Console.WriteLine("sleep: 1100");
            //由于超时,时间流被迁移到other,下面不会触发
            MouseMoveProcess((2, 2), 400);
            MouseMoveProcess((2, 22), 400);
            //other的事件,可以触发
            OtherMouseMoveProcess((3, 3), 400);
            OtherMouseMoveProcess((3, 33), 400);
    
            ob?.Dispose();
        });
    
        var other = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => OtherMouseMove += handler,
            handler => OtherMouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition());
    
        ob = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition())
        .Timeout(TimeSpan.FromSeconds(1), other)
        .Subscribe(
            x => Console.WriteLine($"{DateTime.Now.Millisecond}: Saw {x.Item1},{x.Item2}"),
            ex => Console.WriteLine($"{DateTime.Now.Millisecond}: {ex.GetType().Name}"));
    
        System.Console.WriteLine($"{DateTime.Now.Millisecond} Subscribe Done");
    
        task.Wait();
    }
    
    private static void OtherMouseMoveProcess((int, int) xy, int sleepMillsecond = 200)
    {
        System.Console.WriteLine($"Other Mouse Move {xy.Item1},{xy.Item2},After sleep {sleepMillsecond}.");
        OtherMouseMove?.Invoke(new object(), new MouseEventArgs(xy));
        Thread.Sleep(sleepMillsecond);
    }
    

    输出:

    793 Subscribe Done
    Mouse Move 1,1,After sleep 400.
    970: Saw 1,1
    Mouse Move 1,11,After sleep 0.
    373: Saw 1,11
    sleep: 1100
    Mouse Move 2,2,After sleep 400.
    Mouse Move 2,22,After sleep 400.
    Other Mouse Move 3,3,After sleep 400.
    281: Saw 3,3
    Other Mouse Move 3,33,After sleep 400.
    684: Saw 3,33
    
  • 相关阅读:
    Jersey 2.x 运行项目
    Jersey 2.x 探索新建的工程
    Jersey 2.x 从Maven Archetype 创建一个新项目
    Jersey 2.x 服务器端应用支持的容器
    Jersey 2.x JDK 上的客户端应用
    Jersey 2.x 基于 Servlet 的服务器端应用
    =面试题:java面试基本方向 背1 有用 项目二技术学完再看
    面试题:项目开发经验总结 框架 比较难的问题 可以找一下有用
    面试题: !=!=未看
    面试题:大公司面试题 !=!=未看
  • 原文地址:https://www.cnblogs.com/etoumao/p/12738009.html
Copyright © 2011-2022 走看看