zoukankan      html  css  js  c++  java
  • 【C#】从RabbitMQ的消费者事件窥.NET标准事件

    rabbitMQ中,官方文档中,接收消息最方便且推荐的方法:使用IBasicConsumer消费者接口设置订阅messages到达队列后将自动发送,只要订阅了Received事件,就可以从中接收到队列消息,而不必主动请求。实现这种消费者(发布订阅)模式 ,.NET/C# Client API是通过C#事件。事件的本质就是多播委托。

    1.RabbitMQ中的事件

    首先我们来看一下在RabbitMQ的使用方式:

    1.1 订阅事件

    订阅消费者的received事件

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (ch, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        // copy or deserialise the payload
                        // and process the message
                        // ...
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
    // this consumer tag identifies the subscription
    // when it has to be cancelled
    String consumerTag = channel.BasicConsume(queueName, false, consumer);
    

    1.2 定义事件

    再来看一下源码,省略部分源码

    namespace RabbitMQ.Client.Events
    {
        ///<summary>Experimental class exposing an IBasicConsumer's
        ///methods as separate events.</summary>
        public class EventingBasicConsumer : DefaultBasicConsumer
        {
            public EventingBasicConsumer(IModel model) : base(model)
            {
            }
    
            public event EventHandler<BasicDeliverEventArgs> Received;
    
            ///<summary>
            /// Invoked when a delivery arrives for the consumer.
            /// </summary>
            /// <remarks>
            /// Handlers must copy or fully use delivery body before returning.
            /// Accessing the body at a later point is unsafe as its memory can
            /// be already released.
            /// </remarks>
            public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
            {
                base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
                Received?.Invoke(
                    this,
                    new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
            }
        }
    }
    
    

    1.3 事件传递信息

    其中received为事件,当调用HandleBasicDeliver方法便会触发事件,这也是事件特定,只能在类的内部调用,并传递事件源,及事件传递信息BasicDeliverEventArgs类,如下

        public class BasicDeliverEventArgs : EventArgs
        {
            ///<summary>Default constructor.</summary>
            public BasicDeliverEventArgs()
            {
            }
    
            ///<summary>Constructor that fills the event's properties from
            ///its arguments.</summary>
            public BasicDeliverEventArgs(string consumerTag,
                ulong deliveryTag,
                bool redelivered,
                string exchange,
                string routingKey,
                IBasicProperties properties,
                ReadOnlyMemory<byte> body)
            {
                ConsumerTag = consumerTag;
                DeliveryTag = deliveryTag;
                Redelivered = redelivered;
                Exchange = exchange;
                RoutingKey = routingKey;
                BasicProperties = properties;
                Body = body;
            }
    
            ///<summary>The content header of the message.</summary>
            public IBasicProperties BasicProperties { get; set; }
    
            ///<summary>The message body.</summary>
            public ReadOnlyMemory<byte> Body { get; set; }
    
            ///<summary>The consumer tag of the consumer that the message
            ///was delivered to.</summary>
            public string ConsumerTag { get; set; }
    
            ///<summary>The delivery tag for this delivery. See
            ///IModel.BasicAck.</summary>
            public ulong DeliveryTag { get; set; }
    
            ///<summary>The exchange the message was originally published
            ///to.</summary>
            public string Exchange { get; set; }
    
            ///<summary>The AMQP "redelivered" flag.</summary>
            public bool Redelivered { get; set; }
    
            ///<summary>The routing key used when the message was
            ///originally published.</summary>
            public string RoutingKey { get; set; }
        }
    

    2.标准的.NET事件模式

    接下来我们看一下标准的.NET事件模式

    2.1 定义事件传递信息类EventArgs

    public class PriceChangeEventArgs:System.EventArgs
    {
        public readonly decimal LastPrice;
        public readonly decimal NewPrice;
        public PriceChangeEventArgs(decimal lastPrice,decimal newPrice)
        {
            LastPrice=lastPrice;
            NewPrice=newPrice;
        }
    }
    
    • System.EventArgs是.net framework中预定义的类,除了静态的Empty属性之外,没有其他成员
      • EventArgs为事件传递信息类的基类
      • 继承这个基类来自定义事件传递信息类

    2.2* 为事件定义委托EventHandler

    public delegate void PriceChangedEventHandler<TEventArgs>(object source,TEventArgs e) where TEventArgs:EventArgs;
    

    事件可以说是依赖委托的,实质本来也就是一种类型安全的委托,要想定义事件,必定指定委托,对于能够定义事件的委托,有如下要求:

    • 返回类型是void
    • 接收两个参数,第一个参数类型是object,第二个参数类型是EventArgs的子类。
      • 第一个参数表示事件的广播者(触发事件的对象)
      • 第二个参数包含事件需要传递的信息
    • 名称必须以EventHandler结尾

    在最新的.NET Core事件模式下,为了事件传递参数更更加的灵活,已经不再要求 TEventArgs 必须是派生自 System.EventArgs 的类,上面的代码就可以不再继承System.EventArgs

    public delegate void PriceChangedEventHandler<TEventArgs>(object source,TEventArgs e);
    

    然后上面的信息传递类也是可以改造的。

    public class PriceChangeEventArgs
    {
        public readonly decimal LastPrice;
        public readonly decimal NewPrice;
        public PriceChangeEventArgs(decimal lastPrice, decimal newPrice)
        {
            LastPrice = lastPrice;
            NewPrice = newPrice;
        }
    }
    

    其实.net已经为我们定义了一个泛型委托System.EventHandler与非泛型委托EventHandler

    public delegate void EventHandler<TEventArgs>(object source,TEventArgs e);
    public delegate void EventHandler(object? sender, EventArgs e);
    

    这里使用自定义的委托,还是.NET预定义委托,都是可以的。如果我们使用.NE预定义委托,本小节就可以省略,可有可无。

    建议使用预定义委托,毕竟已经有现成的

    2.3 使用委托定义事件event

    2.3.1 针对自定义委托

    public event PriceChangedEventHandler<PriceChangeEventArgs> PriceChanged;
    

    2.3.2 针对预定义的泛型委托

    public event EventHandler<PriceChangeEventArgs> PriceChanged;
    

    2.3.3 针对预定义的非泛型委托

    public event EventHandler PriceChanged;
    

    2.4 触发事件的方法On

    protected virtual void OnPriceChanged(PriceChangeEventArgs e)
    {
        PriceChanged?.Invoke(this,e);
    }
    
    • 标准模式下要求方法名必须和事件一致,前面再加上On,接收一个EventArgs参数
      • rabbitMQ的源码中,并没有遵从这种标准,而是使用的Handle前缀。如HandleBasicDeliver,大家灵活使用。
    • 这里记住 PriceChanged?.Invoke(this,e);事件名?.Invoke(this,事件传递参数),等价下面代码。
    if(PriceChanged!=null)
    {
        PriceChanged(this,e);
    }
    

    2.6 触发事件

    定义了触发事件的方法,还需要定义触发事件的条件,事件的本质是类型安全的委托,实质也是封装了一个多播委托,只是功能上比委托有了跟个多限制,只能在定义事件的类的内部直接调用事件。

    public decimal Price
    {
        get { return price; }
        set
        {
            if (price == value) return;
            decimal oldPrice = price;
            price = value;
            OnPriceChanged(new PriceChangeEventArgs(oldPrice, price));
        }
    }
    

    上述示例中,在属性的set访问器,中触发事件:当价格发生变化时,将触发股价变化事件(PriceChanged)。

    2.5 事件的使用(订阅事件)

    没有订阅的事件,永远不会触发(因为为null)。所以我们需要订阅事件。

    //  一个股票类
    //  股票的价格变化订阅事件
    static void Main(string[] args)
    {
        Stock st = new Stock("股票");
        st.Price = 100;
        st.PriceChanged += stock_PriceChanged;
        st.Price = 200;
        Console.WriteLine("Hello World!");
    }
    static void stock_PriceChanged(object sender, PriceChangeEventArgs e)
    {
        if ((e.NewPrice - e.LastPrice) / e.LastPrice > 0.1M)
        {
            Console.WriteLine("Alert,10% stock price increase");
        }
    }
    

    2.5.1 使用匿名方法订阅事件

    st.PriceChanged += delegate(object o, PriceChangeEventArgs e)  
    {  
        var lastPrice = e.LastPrice;
        var newPrice = e.NewPrice;
        //...
    };  
    

    2.5.2 使用lambda表达式订阅事件

    使用+=操作符订阅事件,更多的实际操作是使用lambda表达式,用于接收事件源参数和事件传递的信息:

    st.PriceChanged += (sender, e) =>
    {
        var lastPrice = e.LastPrice;
        var newPrice = e.NewPrice;
        //...
    };
    

    3.总结

    C#通过事件机制实现线程间(进程内)的通信。让我想起了MediatR这个库。

    • 事件的定义是在一个类中
    • 事件的注册是在另一个类中
    • 在定义事件的类中触发
    • 传递参数至注册类中使用。

    3.1 什么情况下使用事件?

    当我们学习到某种方法总会有疑问,到底什么时候使用事件,事件能够办到的,看起来委托也能办到。

    当事件源将在很长一段时间内触发事件,基于事件的设计就显得非常自然,例如RabbitMQ的消费者Recived事件,一旦订阅了事件,在当前程序的整个生命周期,事件源随时都可以触发事件。在CS程序中,UI控件设计示例基本也是基于各种事件。

    3.2 事件定义与使用

    • 信息传递类:public class xxxEventArgs{}

      • 可继承EventArgs,也可以自定义
    • *委托:public delegate void EventHandler<xxxEventArgs>(object source,xxxEventArgs e)

      • 使用.NET预定义类(这一步可以省略)
    • 事件:public event EventHandler<xxxEventArgs> EventName;

    • 触发事件的方法:

      protected virtual void OnEventName(xxxEventArgs e)
      {
        	eventName?.Invoke(this,e);
      }
      
    • 触发事件:在特定的需求下,执行OnEventName(new xxxEventArgs(){})

    • 订阅(注册)事件:xx.EventName+=(src,e)=>{}

    参考链接

    https://www.bilibili.com/video/BV1Ht41137R1

    https://docs.microsoft.com/zh-cn/dotnet/csharp/event-pattern

  • 相关阅读:
    存储过程之基本语法
    SQL存储过程概念剖析
    SQL2005之SA提权总结
    关于存储过程
    Java开发环境之------MyEclipse快捷键和排除错误第一选择ctrl+1(***重点***:ctrl+1,快速修复---有点像vs中的快速using
    MyEclipse Servers视窗出现“Could not create the view: An unexpected exception was thrown”错误解决办法
    艾泰路由器端口映射怎么设置
    常用端口
    jdbc注册驱动 class.forName()
    MyEclipse不能自动编译解决办法总结
  • 原文地址:https://www.cnblogs.com/RandyField/p/13977875.html
Copyright © 2011-2022 走看看