zoukankan      html  css  js  c++  java
  • 6-EasyNetQ之订阅

    一个EasyNetQ订阅者订阅一种消息类型(消息类为.NET 类型)。通过调用Subcribe方法一旦对一个类型设置了订阅,一个持久化队列就会在RabbitMQ broker上被创建,这个类型的任何消息都会被发送到这个队列上。订阅者无论什么时候连接上,RabbitMQ都将会将消息从队列中发送给订阅者。

    不管消息什么时候送达到,订阅这个消息的订阅者需要给RabbitMQ一个可执行的操作。我们通过传递一个订阅代理:

    bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
    

    现在每一次MyMessage实例被发送后,EasyNetQ将会调用我们的代理,打印这个消息的Text属性到控制台。

    你传给订阅的订阅Id是重要的。 EasyNetQ将会在RabbitMQ Broker上为特定的消息类型的和订阅id的组合创建唯一的队列。

    每一次调用Subscribe方法会创建一个新的队列消费者。如果你用相同的消息和订阅id调用Subscribe两次,你将会创建两个消费者去消费同一个队列。然后RabbitMQ将会依次连续轮询消息给每一个消费者。这种可伸缩性和工作分担是非常棒的。比如说,你创建了一个处理特殊消息的服务,但是他已经超负荷工作了。简单的创建一个新的服务实例(在同一个机器上,或者不同的机器上),不用配置任何东西,你自动就得到了伸缩性。

    假如相同的消息类型,用不同的订阅id调用了两次Subscribe,你将创建两个队列,每一个队列有自己的消费者。每一个消息的副本将会路由到每一个队列,因此不同的消费者都将得到所有消息(这个类型的)。假如你有几个不同的服务都关心相同类型的消息,这这样很好。

    写订阅回调委托时的注意事项

    通过EasyNetQ订阅到一个来至队列的消息,他们被放置在内存队列中。一个单独线程循环对垒得到消息,调用他们的委托方法。因为在一个独立线程上一个委托一次处理一个消息,你应该避免长时间的同步的IO操作。应该尽快从委托返回控制。

    使用异步订阅 SubscribeAsync

    SubscribeAsync 允许你的订阅者委托到一个能立即返回的Task,然后异步的执行长时间IO操作。一但长时间运行的订阅完成后,就简单的完成这个任务。下面的例子,我们请求一个web service使用一个异步IO操作(DownloadStringTask)。当这个task完成事,写一行信息到控制台。

    bus.SubscribeAsync<MyMessage>("subscribe_async_test",message => 
           new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
               .ContinueWith(task => 
                    Console.WriteLine("Received:'{0}',Downloaded:'{1}'",
                        message.Text,
                        task.Result)))
    

    另一个列子是如果有错误发生,返回结果会有异常抛出,那么消息将会被放到一个默认的错误队列中。

    _bus.SubscribeAsync<MessageType>("queue_Identifier", 
                 Message => Task.Factory.StartNew(() => 
                 {
                     //这里执行一些操作
                     //如果这里有一个异常,那么在这个Task执行完毕后,这个异常会作为结果返回,
                     // 然后任务将继续执行下去。
                 }).ContinueWith(task => 
                 {
                     if ( task.IsCompleted && ! task.IsFaulted)
                     {
                         // 一切都很好
                     }
                     else
                     {
                         // 不要Catch 异常,否则异常会进一步被嵌套,结果会被发送到默认的错误队列
                         throw new EasyNetQException("Message processing exception - look in t  the default error quenue(broker)");
                     }
                 }));
    

    取消订阅##

    所有的订阅都会返回一个ISubscriptionResult接口实例。它包含属性有订阅底层被IConsumer使用的IExchane和IQueue,如果你需要使用更高级的API IAdvancedBus更好的去处理,这会变为可能。

    你能够在任何时间取消一个订阅者,通过调用ISubscriptionResult实例上的Dispose方法,或者在它之上的 ConsumerCancellation属性。

    var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
    
    ...
    
    subscriptionResult.Dispose();
    // 这个等价与 subscriptionResult.ConsumerCancellation.Dispose();
    
    

    这将停止EasyNetQ对队列的消费,并且关闭这个消费者的channel。

    注意:IBus和IAndvancedBus的dispose,可能够取消所有消费者,并关闭对RabbitMQ的连接。

    不要在消息处理器中调用 subscriptionResult.Dispose()。这将在EasyNetQ ACK 消息时,在消费者的channel和subscriptionResult.Dispose()调用关闭Channel之间,创建一个竞争状态。由于EasyNetQ的内部架构这些将会在不同的线程被调用,还有时间上的不确定性。

    英文地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Subscribe
    本文地址:http://www.cnblogs.com/HuangLiang/p/EasyNetQ_Subscribe.html

  • 相关阅读:
    varnish反向代理
    Asp.Net MVC 3.0
    反向代理(Reverse Proxy)
    Go语言开发Windows应用
    Windows下安装NodeJS和CoffeeScript方法
    数据库设计....
    发布一个开源的c++网络事件库
    非小型电子商务系统设计经验分享 Coding changes the world
    SqlServer查询计划
    cocos2dx总结(一)HelloWord
  • 原文地址:https://www.cnblogs.com/HuangLiang/p/EasyNetQ_Subscribe.html
Copyright © 2011-2022 走看看