zoukankan      html  css  js  c++  java
  • EasyNetQ笔记-Publish/Subscribe模式(发布订阅)

    介绍

    asyNetQ支持的最简单的消息模式是发布/订阅.这个模式是一个极好的方法用来解耦消息提供者和消费者。消息发布者只要简单的对世界说,“这里有些事发生” 或者 “我现在有一个信息”。它不关心有没有人监听,或者接收者是谁,或者接收者在那里。我们能够添加和移除特定类型的消息的订阅者,不需发布者做任何的重新配置。我们也能够有多个发布者发布相同的消息,添加和删除发布者也不用其他的发布者或者订阅者做任何重新配置。
    假如你开始去发布消息,而没有任何订阅者曾经定义此消息,那么这个消息就简单的消失了。
    一个EasyNetQ订阅者订阅一种消息类型(消息类为.NET 类型)。通过调用Subcribe方法一旦对一个类型设置了订阅,一个持久化队列就会在RabbitMQ broker上被创建,这个类型的任何消息都会被发送到这个队列上。订阅者无论什么时候连接上,RabbitMQ都将会将消息从队列中发送给订阅者。

    消息发布(Publish)

      EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后,任意消费者可以订阅该消息,也可以多个消费者订阅。并且不需要额外配置。首先,如上文中需要先创建一个IBus对象,然后,在创建一个可序列化的.NET对象。调用Publish方法即可。

    var _bus = RabbitHutch.CreateBus("host=xxxxxx;port=5672;virtualHost=my_vhost;username=admin;password=admin;timeout=30;publisherConfirms=true");//连接字符串末尾不要加";"
    var message = new MyMessage { Text = "Hello Rabbit" };
                for (int i = 0; i < 10; i++)
                {
                    _bus.Publish<MyMessage>(message);
                }
    

      警告,Publish只顾发送消息到队列,但是不管有没有消费端订阅,所以,发布之后,如果没有消费者,该消息将不会被消费甚至丢失。

    消息订阅(Subscribe)

      EasyNetQ提供了消息订阅,当调用Subscribe方法时候,EasyNetQ会创建一个用于接收消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id.代码如下:

    _bus.Subscribe<MyMessage>("subscribe_id", myMessage => {
                    lbxMessage1.Invoke(new Action(() => { lbxMessage1.Items.Add(myMessage.Text); }));
                });
    

      第一个参数是订阅id,另外一个是delegate参数,用于处理接收到的消息。
    这里要注意的是,subscribe_id参数很重要,假如开发者用同一个subscribeid订阅了同一种消息类型两次或者多次,RabbitMQ会以轮训的方式给每个订阅的队列发送消息。接收到之后,其他队列就接收不到该消息。
    如果用不同的subscribeid订阅同一种消息类型,那么生成的每一个队列都会收到该消息。
      举个例子:出库发货,我们有五个商品仓库,每个仓库的商品都是一样的,假如来了一堆订单,那么我们需要五个仓库共同工作,分别处理订单。而同样,总仓库需要知道总出货量,正常情况下,可以用每个仓库的出货量相加即可。不过如果我们在总仓库也监听商品订单消息,那么,每次来订单,总仓库也都会收到一份,那么可以作相应的统计了。

    //不同的subscribe_id将会创建两个队列,消息同时发给两个队列
    
            private void btn_subscribe11_Click(object sender, EventArgs e)
            {
                _bus.Subscribe<MyMessage>("subscribe_id_1", myMessage => {
                    lbxMessage1.Invoke(new Action(() => { lbxMessage1.Items.Add(myMessage.Text); }));
                });
            }
    
            private void btnSubscribe22_Click(object sender, EventArgs e)
            {
                _bus.Subscribe<MyMessage>("subscribe_id_2", myMessage => {
                    lbxMessage2.Invoke(new Action(() => { lbxMessage2.Items.Add(myMessage.Text); }));
                });
            }
    

      需要注意的是,在收到消息处理消息时候,不要占用太多的时间,会影响消息的处理效率,所以,遇到占用长时间的处理方法,最好用异步处理。代码如下:

    bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
        new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
            .ContinueWith(task => 
                Console.WriteLine("Received: '{0}', Downloaded: '{1}'", 
                    message.Text, 
                    task.Result)));
    

    非泛型发布订阅

    如果想在项目运行期间发布订阅消息,EasyNetQ提供了非泛型的发布订阅
    加using

    using EasyNetQ.NonGeneric;
    

    提供如下非泛型的发布订阅方法:

    //订阅
    public static IDisposable Subscribe(
        this IBus bus,
        Type messageType,
        string subscriptionId,
        Action<object> onMessage,
        Action<ISubscriptionConfiguration> configure)    
    public static IDisposable SubscribeAsync(    
        this IBus bus,     
        Type messageType,     
        string subscriptionId,     
        Func<object, Task> onMessage,     
        Action<ISubscriptionConfiguration> configure)
     
    //发布
     public static void Publish(
        this IBus bus, 
        Type messageType, 
        object message, 
        string topic)
     public static Task PublishAsync(
        this IBus bus, 
        Type messageType, 
        object message, 
        string topic)
    

      取消订阅,可以用如下方法:

    var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
    subscriptionResult.Dispose();
    //或者直接IBus.Dispose();
    
  • 相关阅读:
    Redis 补充
    python 魔法方法补充(__setattr__,__getattr__,__getattribute__)
    Mongodb 补充
    Mysql补充
    HTML
    优秀工具
    优秀文章收藏
    MySQL
    爬虫
    Python
  • 原文地址:https://www.cnblogs.com/fanfan-90/p/13620140.html
Copyright © 2011-2022 走看看