zoukankan      html  css  js  c++  java
  • C# IObservable与IObserver通知机制 观察者模式(推式模型)

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
     
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                /*
                    C# IObservable与IObserver通知机制 观察者模式(推式模型)             
                 */
                WeatherDataPublisher publisher = new WeatherDataPublisher();
                CurrentConditionDisplay currentDisplay = new CurrentConditionDisplay();
                StatisticsConditionDisplay statisticsDisplay = new StatisticsConditionDisplay();
                //订阅当前天气展示板
                IDisposable currentDisplayUnsubscriber = publisher.Subscribe(currentDisplay);
                //订阅气温统计展示板
                IDisposable statisticsDisplayUnsubscriber = publisher.Subscribe(statisticsDisplay);
     
                for (int i = 0; ; i++)
                {
                    WeatherData weatherData = new WeatherData();
                    Console.WriteLine("请输入温度,湿度,压力");
                    string input = Console.ReadLine();
                    var array = input.Split(',');
                    weatherData.temperature = array[0];
                    weatherData.humility = array[1];
                    weatherData.pressure = array[2];
                    Console.WriteLine("");
                    //将输入的新的天气数据传给天气数据发布器
                    publisher.ReciveNewData(weatherData);
                    Console.WriteLine("=============================");
                }
            }
        }
     
        /// <summary>
        /// WeatherData类包含气温,湿度,气压等属性。
        /// </summary>
        class WeatherData
        {
            /// <summary>
            /// 气温
            /// </summary>
            public string temperature { get; set; }
            /// <summary>
            /// 湿度
            /// </summary>
            public string humility { get; set; }
            /// <summary>
            /// 气压
            /// </summary>
            public string pressure { get; set; }
        }
     
        /// <summary>
        /// WeatherDataPublisher类实现了IObservable接口,实现了Subscribe订阅方法。
        /// </summary>
        class WeatherDataPublisher : IObservable<WeatherData>
        {
            List<IObserver<WeatherData>> observers = new List<IObserver<WeatherData>>();
     
            /// <summary>
            /// 订阅主题,将观察者添加到列表中
            /// </summary>
            /// <param name="observer"></param>
            /// <returns></returns>
            public IDisposable Subscribe(IObserver<WeatherData> observer)
            {
                observers.Add(observer);
                return new Unsubscribe(this.observers, observer);
            }
     
            /// <summary>
            /// 取消订阅类
            /// </summary>
            private class Unsubscribe : IDisposable
            {
                List<IObserver<WeatherData>> observers;
                IObserver<WeatherData> observer;
                public Unsubscribe(List<IObserver<WeatherData>> observers
                , IObserver<WeatherData> observer)
                {
                    this.observer = observer;
                    this.observers = observers;
                }
     
                public void Dispose()
                {
                    if (this.observers != null)
                    {
                        this.observers.Remove(observer);
                    }
                }
            }
     
            /// <summary>
            /// 通知已订阅的观察者
            /// </summary>
            /// <param name="weatherData"></param>
            private void Notify(WeatherData weatherData)
            {
                foreach (var observer in observers)
                {
                    observer.OnNext(weatherData);
                }
            }
     
            /// <summary>
            /// 接收最新的天气数据
            /// </summary>
            /// <param name="weatherData"></param>
            public void ReciveNewData(WeatherData weatherData)
            {
                Notify(weatherData);
            }
        }
     
        /// <summary>
        /// 抽象类WeatherDisplayBase实现了IObserver接口,所有的天气展示板(观察者)继承这个抽象类,
        /// 需实现OnNext方法,即接收到新数据推送后要进行的数据处理展示工作,并且可重写OnCompleted,OnError方法。
        /// </summary>
        abstract class WeatherDisplayBase : IObserver<WeatherData>
        {
            public virtual void OnCompleted()
            {
            }
            public virtual void OnError(Exception error)
            {
            }
            public abstract void OnNext(WeatherData value);
        }
     
        /// <summary>
        /// CurrentConditionDisplay类为当前天气状况展示板,继承WeatherDisplayBase抽象类,展示最新的天气数据。
        /// </summary>
        class CurrentConditionDisplay : WeatherDisplayBase
        {
            public override void OnNext(WeatherData value)
            {
                Console.WriteLine("------------------");
                Console.WriteLine("当前天气状况板");
                Console.WriteLine(string.Format("温度:{0}\n湿度:{1}\n气压:{2}",
                    value.temperature, value.humility, value.pressure));
            }
        }
     
        /// <summary>
        /// StatisticsConditionDisplay类为气温统计展示板,继承WeatherDisplayBase抽象类,展示历史最高温度,最低温度,平均温度。
        /// </summary>
        class StatisticsConditionDisplay : WeatherDisplayBase
        {
            List<float> temperatures = new List<float>();
            public override void OnNext(WeatherData value)
            {
                float temperature;
                if (float.TryParse(value.temperature, out temperature))
                {
                    temperatures.Add(temperature);
                }
                Console.WriteLine("------------------");
                Console.WriteLine("温度统计板");
                Console.WriteLine(string.Format("平均温度:{0}\n最高温度:{1}\n最低温度:{2}",
                    temperatures.Average(), temperatures.Max(), temperatures.Min()));
            }
        }
    }

    注解

    很多时候被观察者(IObservable)向观察者(IObserver)提供的数据并不像Location这样简单的结构体。

    而是一个包含复杂数据的类,通常可能是被观察者本身,这种情况是允许的,即IObserver<T> 实现和 T 可以表示同一类型。

    这时候的实现变成下面的型式:


    public class LocationTracker2 : IObservable<LocationTracker>
    {
        public IDisposable Subscribe(IObserver<LocationTracker> observer)
        {
            throw new NotImplementedException();
        }
    }
    
    public class LocationReporter2 : IObserver<LocationTracker2>
    {
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }
    
        public void OnError(Exception error)
        {
            throw new NotImplementedException();
        }
    
        public void OnNext(LocationTracker2 value)
        {
            throw new NotImplementedException();
        }
    }

     观察者模式是常用的设计模式,在.net环境下,其运行时库为开发者提供了IObservable<T>和 IObserver<T>接口,用于实现观察者模式软件设计。

      //
        // 摘要:
        //     定义基于推送的通知的提供程序。
        //
        // 类型参数:
        //   T:
        //     提供通知信息的对象。
        public interface IObservable<out T>
        {
            //
            // 摘要:
            //     通知提供程序:某观察程序将要接收通知。
            //
            // 参数:
            //   observer:
            //     要接收通知的对象。
            //
            // 返回结果:
            //     使资源释放的观察程序的接口。
            IDisposable Subscribe(IObserver<T> observer);
        }

    注解Subscribe:

    调用Subscribe通知提供程序某观察程序将要接收通知(即注册、订阅),不同于常规实现,它具有一个返回值,是一个IDisposable对象,当观察者不再接收通知时,可调用Dispose函数取消订阅(反注册),这种方法充分发挥C#语言的特性。

    IObserver<in T>

    //
    // 摘要:
    //     提供用于接收基于推送的通知的机制。
    //
    // 类型参数:
    //   T:
    //     提供通知信息的对象。
    public interface IObserver<in T>
    {
        //
        // 摘要:
        //     通知观察者,提供程序已完成发送基于推送的通知。
        void OnCompleted();
        //
        // 摘要:
        //     通知观察者,提供程序遇到错误情况。
        //
        // 参数:
        //   error:
        //     一个提供有关错误的附加信息的对象。
        void OnError(Exception error);
        //
        // 摘要:
        //     向观察者提供新数据。
        //
        // 参数:
        //   value:
        //     当前的通知信息。
        void OnNext(T value);
    }

    示例

    下面例子演示观察者设计模式,实现定位系统实时通知当前经纬度坐标。

    包含经纬度坐标的Locaiton结构体

    public struct Location
    {
        public Location(double latitude, double longitude)
        {
            Latitude = latitude;
            Longitude = longitude;
        }
    
        public double Latitude
        {
            get; private set;
        }
    
        public double Longitude
        {
            get;
            private set;
        }
    }

    LocationTracker 类
    实现了IObservable<T> 接口。

    public class LocationTracker : IObservable<Location>
    {
        public LocationTracker()
        {
            observers = new List<IObserver<Location>>();
        }
    
        private List<IObserver<Location>> observers;
    
        public IDisposable Subscribe(IObserver<Location> observer)
        {
            if (!observers.Contains(observer))
                observers.Add(observer);
            return new Unsubscriber(observers, observer);
        }
        // 用于取消订阅通知的IDisposable对象的实现
        private class Unsubscriber : IDisposable
        {
            private List<IObserver<Location>> _observers;
            private IObserver<Location> _observer;
    
            public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
            {
                this._observers = observers;
                this._observer = observer;
            }
    
            public void Dispose()
            {
                if (_observer != null && _observers.Contains(_observer))
                    _observers.Remove(_observer);
            }
        }
        // TrackLocation 方法传递了一个包含纬度和经度数据的Location对象。 
        // 如果Location值不为null,则 TrackLocation 方法会调用每个观察程序的 OnNext 方法,
        // 否则调用OnError方法
        public void TrackLocation(Nullable<Location> loc)
        {
            foreach (var observer in observers)
            {
                if (!loc.HasValue)
                    observer.OnError(new LocationUnknownException());
                else
                    observer.OnNext(loc.Value);
            }
        }
    
        public void EndTransmission()
        {
            foreach (var observer in observers.ToArray())
                if (observers.Contains(observer))
                    observer.OnCompleted();
    
            observers.Clear();
        }
    }

    public class LocationUnknownException : Exception
    {
       internal LocationUnknownException() 
       { }
    }

    LocationObserver类

    定位信息的观察者,实现了IObserver<Location>接口

    public class LocationReporter : IObserver<Location>
    {
        private IDisposable unsubscriber;
        private string instName;
    
        public LocationReporter(string name)
        {
            this.instName = name;
        }
    
        public string Name
        { get { return this.instName; } }
    
        public virtual void Subscribe(IObservable<Location> provider)
        {
            if (provider != null)
                unsubscriber = provider.Subscribe(this);
        }
    
        public virtual void OnCompleted()
        {
            Console.WriteLine("The Location Tracker has completed transmitting data to {0}.", this.Name);
            this.Unsubscribe();
        }
    
        public virtual void OnError(Exception e)
        {
            Console.WriteLine("{0}: The location cannot be determined.", this.Name);
        }
    
        public virtual void OnNext(Location value)
        {
            Console.WriteLine("{2}: The current location is {0}, {1}", value.Latitude, value.Longitude, this.Name);
        }
        // 取消订阅
        public virtual void Unsubscribe()
        {
            unsubscriber.Dispose();
        }
    }
    最后来看一下怎么使用这个定位系统
    class Program2
    {
        static void Main(string[] args)
        {
            // Define a provider and two observers.
            LocationTracker provider = new LocationTracker();
            LocationReporter reporter1 = new LocationReporter("FixedGPS");
            reporter1.Subscribe(provider);
            LocationReporter reporter2 = new LocationReporter("MobileGPS");
            reporter2.Subscribe(provider);
    
            provider.TrackLocation(new Location(47.6456, -122.1312));
            reporter1.Unsubscribe();
            provider.TrackLocation(new Location(47.6677, -122.1199));
            provider.TrackLocation(null);
            provider.EndTransmission();
        }
    }
    C#.net. WPF.core 技术交流群 群号205082182,欢迎加入,也可以直接点击左侧和下方的"加入QQ群",直接加入
  • 相关阅读:
    对vue-cli各个目录的理解 和 在 vue 中使用json-server
    发论文的一些常见问题
    latex初步入门:springer llncs
    docker tomcat8 mysql8部署常见错误
    docker快速部署本地项目到服务器(tomcat8+mysql8)
    IDEA构建spring项目
    [b0042] python 归纳 (二七)_gui_tkinter_基本使用
    [b0038] python 归纳 (二三)_多进程数据共享和同步_队列Queue
    springboot进入html
    HbaseShell启动
  • 原文地址:https://www.cnblogs.com/aijiao/p/15800723.html
Copyright © 2011-2022 走看看