zoukankan      html  css  js  c++  java
  • Rx.NET的使用

    Rx的核心接口

     

     相关链接

    教程:https://www.cnblogs.com/sheng-jie/p/10399049.html

    https://rehansaeed.com/reactive-extensions-part2-wrapping-events/

    代码:http://rxwiki.wikidot.com/101samples

     官网:http://reactivex.io/

    用一个例子认识这个2个接口

    实际中不可这样使用

    class Program
     {
    
       static void Main(string[] args)
       {
                Test();
       }
    
       private static void Test()
       {
           var numbers = new MySequenceOfNumbers();
           var observer = new MyConsoleObserver<int>();
           numbers.Subscribe(observer);
           Console.ReadLine();
       }
    
    }
    
        /// <summary>
        /// 自定义被观察队列
        /// </summary>
        public class MySequenceOfNumbers : IObservable<int>
        {
            public IDisposable Subscribe(IObserver<int> observer)
            {
                observer.OnNext(1);
                observer.OnNext(2);
                observer.OnNext(3);
                observer.OnCompleted();
                return Disposable.Empty;
            }
        }
    
      /// <summary>
        /// 自定义观察者对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        public class MyConsoleObserver<T> : IObserver<T>
        {
            public void OnNext(T value)
            {
                Console.WriteLine("接收到 value {0}", value);
            }
            public void OnError(Exception error)
            {
                Console.WriteLine("出现异常! {0}", error);
            }
            public void OnCompleted()
            {
                Console.WriteLine("关闭观察行为");
            }
        }
    View Code

    认识Subject类,同时实现了上面2个接口

    入门例子:取代event

    先看event的例子

    class Heater
    {
        private delegate void TemperatureChanged(int temperature);
        private event TemperatureChanged TemperatureChangedEvent;
        public void BoilWater()
        {
            TemperatureChangedEvent += ShowTemperature;
            TemperatureChangedEvent += MakeAlerm;
            Task.Run(
                () =>
            Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))
            );
        }
        private void ShowTemperature(int temperature)
        {
            Console.WriteLine($"当前温度:{temperature}");
        }
        private void MakeAlerm(int temperature)
        {
            Console.WriteLine($"嘟嘟嘟,当前水温{temperature}");
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            Heater heater = new Heater();        
            heater.BoilWater();
        }
    }
    View Code

    实现同样效果可以使用如下Rx代码

    var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可观察序列
    Subject<int> subject = new Subject<int>();//申明Subject
    subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject
    subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject
    observable.Subscribe(subject);//订阅observable

     Observable的创建方式

                Observable.Create<int>(observer =>
                {
                    for (int i = 0; i < 5; i++)
                    {
                        observer.OnNext(i);
                    }
                    observer.OnCompleted();
                    return Disposable.Empty;
                }).Subscribe(o => Console.WriteLine("Here is the data " + o));

    创建可以取消的Observable

            public static void TestCreate()
            {
                IObservable<int> ob =
                Observable.Create<int>(o =>
                {
                    var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
                    NewThreadScheduler.Default.Schedule
                    (
                        () =>
                        {
                        int i = 0;
                        for (; ; )
                        {
                            Thread.Sleep(200);  // here we do the long lasting background operation
                                if (!cancel.Token.IsCancellationRequested)    // check cancel token periodically
                                    o.OnNext(i++);
                            else
                            {
                                Console.WriteLine("Aborting because cancel event was signaled!");
                                o.OnCompleted(); // will not make it to the subscriber
                                    return;
                            }
                        }
                        }
                    );
    
                    return cancel;
                }
                );
                IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
                Console.WriteLine("Press any key to cancel");
                Console.ReadKey();
                subscription.Dispose();
                Console.WriteLine("Press any key to quit");
                Console.ReadKey();  // give background thread chance to write the cancel acknowledge message
            }
            private static void TestRange()
            {
                IObservable<int> observable = Observable.Range(0, 10).Select(i => i * 2);
                observable.Subscribe(o => Console.WriteLine("Data is " + o));
            }
            private static void TestGenerate()
            {
                var ob=Observable.Generate(0,
                    i => i < 10,
                    i => i + 1,
                    i=>i*2);
                ob.Subscribe(o => Console.WriteLine("Data is " + o));
            }

    启动后台工作

            public static void StartBackgroundWork()
            {
                Console.WriteLine("Shows use of Start to start on a background thread:");
                var o = Observable.Start(() =>
                {
                    //This starts on a background thread.
                    Console.WriteLine("From background thread. Does not block main thread.");
                    PrintThreadID();
                    Console.WriteLine("Calculating...");
                    Thread.Sleep(3000);
                    Console.WriteLine("Background work completed.");
                }).Finally(() =>
                {
                    Console.WriteLine("Main thread completed.");
                    PrintThreadID();
                });
                Console.WriteLine("
    	 In Main Thread...
    ");
                PrintThreadID();
                o.Wait();   // Wait for completion of background operation.
            }
    View Code

    封装事件

    封装Timer 的Elapsed时间

                var timer = new Timer(interval: 1000) { Enabled = true };
                var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>
                    (
                        hanlder => (s, a) => hanlder(s, a),
                        handler => timer.Elapsed += handler,
                        handler => timer.Elapsed -= handler
                    );
                ticks.Subscribe(data => Console.WriteLine("On Next: " + data.EventArgs.SignalTime));

    还可以写成这样

                var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
                var ticks = Observable.FromEventPattern(timer, "Elapsed");
                ticks.Subscribe(data => Console.WriteLine("OnNext: "
                + ((ElapsedEventArgs)data.EventArgs).SignalTime));
            public static event EventHandler SimpleEvent;
    
    
            private static void TestFromEvent()
            {
                var eventAsObservable = Observable.FromEventPattern(//
                        ev=> SimpleEvent+=ev,
                        ev=>SimpleEvent-=ev
                    );
    
                // SimpleEvent is null until we subscribe
                Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
    
                Console.WriteLine("Subscribe");
                //Create two event subscribers
                var s = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for s subscriber"));
                var t = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for t subscriber"));
    
                // After subscribing the event handler has been added
                Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
    
                Console.WriteLine("Raise event");
                if (null != SimpleEvent)
                {
                    SimpleEvent(null, EventArgs.Empty);
                }
    
                // Allow some time before unsubscribing or event may not happen
                Thread.Sleep(100);
    
                Console.WriteLine("Unsubscribe");
                s.Dispose();
                t.Dispose();
    
                // After unsubscribing the event handler has been removed
                Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
    
            }

    封装Button 的Click事件

            private void init()
            {
                var eventAsObservable = Observable.FromEventPattern(
                        ev=>button1.Click+=ev,
                        ev=>button1.Click-=ev
                    );
                eventAsObservable.Subscribe(o => MessageBox.Show("You clicked " + o.Sender.ToString()));
            }

    基于时间、序列等创建Observable

            private static void TestWhere()
            {
                //where
                IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
                var lowNums=oneNumberPerSecond.Where(n => n < 5).Select(n => n);
                lowNums.Subscribe(num => Console.WriteLine("This is " + num));
    
    
            }
            private static void TestToObAndInterval()
            {
                IEnumerable<int> someInts = new List<int> { 1, 2, 3, 4, 5 };
    
                // To convert a generic IEnumerable into an IObservable, use the ToObservable extension method.
                IObservable<int> observable = someInts.ToObservable();
    
                // 0 after 1s, 1 after 2s, 2 after 3s, etc.
                IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
                
            }

    调度

    对于基于Inerval的消息,通知会在线程池线程中引发

            private void button1_Click(object sender, EventArgs e)
            {
                var uiContext = SynchronizationContext.Current;
                Observable.Interval(TimeSpan.FromSeconds(1))
                    .ObserveOn(uiContext)
                    .Subscribe(x => this.Text=("Interval " + x + " on thread " +
                    Environment.CurrentManagedThreadId));
            }

    NotifyCollectionChangedEventHandler的原本使用以及基于Rx的封装

    原本的使用方式

           private static void TestNormalNotifyCollectionChanged()
            {
                ObservableCollection<string> names = new ObservableCollection<string>();
                names.CollectionChanged += (object sender, NotifyCollectionChangedEventArgs e) =>
                {
                    Console.WriteLine("Change type: " + e.Action);
                    if (e.NewItems != null)
                    {
                        Console.WriteLine("Items added: ");
                        foreach (var item in e.NewItems)
                        {
                            Console.WriteLine(item);
                        }
                    }
    
                    if (e.OldItems != null)
                    {
                        Console.WriteLine("Items removed: ");
                        foreach (var item in e.OldItems)
                        {
                            Console.WriteLine(item);
                        }
                    }
                };
                names.Add("Adam");
                names.Add("Eve");
                names.Remove("Adam");
                names.Add("John");
                names.Add("Peter");
                names.Clear();
            }
           private static void TestNotifyCollectionChanged()
            {
                var customers = new ObservableCollection<Customer>();
    
                var customerChanges = Observable.FromEventPattern(
                    (EventHandler<NotifyCollectionChangedEventArgs> ev)
                       => new NotifyCollectionChangedEventHandler(ev),
                    ev => customers.CollectionChanged += ev,
                    ev => customers.CollectionChanged -= ev);
    
                var watchForNewCustomersFromWashington =
                    from c in customerChanges
                    where c.EventArgs.Action == NotifyCollectionChangedAction.Add
                    from cus in c.EventArgs.NewItems.Cast<Customer>().ToObservable()
                    where cus.Region == "WA"
                    select cus;
    
    
    
                
    
                Console.WriteLine("New customers from Washington and their orders:");
    
                watchForNewCustomersFromWashington.Subscribe(cus =>
                {
                    Console.WriteLine("Customer {0}:", cus.CustomerName);
    
                    foreach (var order in cus.Orders)
                    {
                        Console.WriteLine("Order {0}: {1}", order.OrderId, order.OrderDate);
                    }
                });
    
                customers.Add(new Customer
                {
                    CustomerName = "Lazy K Kountry Store",
                    Region = "WA",
                    Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 1 } }
                });
    
                Thread.Sleep(1000);
                customers.Add(new Customer
                {
                    CustomerName = "Joe's Food Shop",
                    Region = "NY",
                    Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 2 } }
                });
    
                Thread.Sleep(1000);
                customers.Add(new Customer
                {
                    CustomerName = "Trail's Head Gourmet Provisioners",
                    Region = "WA",
                    Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 3 } }
                });

    其中相关类

        class Customer
        {
            public Customer() { Orders = new ObservableCollection<Order>(); }
            public string CustomerName { get; set; }
            public string Region { get; set; }
            public ObservableCollection<Order> Orders { get; private set; }
        }
    
        class Order
        {
            public int OrderId { get; set; }
            public DateTimeOffset OrderDate { get; set; }
        }
    View Code
  • 相关阅读:
    September 29th 2017 Week 39th Friday
    September 28th 2017 Week 39th Thursday
    September 27th 2017 Week 39th Wednesday
    September 26th 2017 Week 39th Tuesday
    September 25th 2017 Week 39th Monday
    September 24th 2017 Week 39th Sunday
    angular2 学习笔记 ( Form 表单 )
    angular2 学习笔记 ( Component 组件)
    angular2 学习笔记 ( Http 请求)
    angular2 学习笔记 ( Router 路由 )
  • 原文地址:https://www.cnblogs.com/noigel/p/14239299.html
Copyright © 2011-2022 走看看