zoukankan      html  css  js  c++  java
  • EasyNetQ使用(三)【Publish与Subcribe】

    EasyNetQ支持的最简单的消息模式是发布/订阅.这个模式是一个极好的方法用来解耦消息提供者和消费者。消息发布者只要简单的对世界说,“这里有些事发生” 或者 “我现在有一个信息”。它不关心有没有人监听,或者接收者是谁,或者接收者在那里。我们能够添加和移除特定类型的消息的订阅者,不需发布者做任何的重新配置。我们也能够有多个发布者发布相同的消息,添加和删除发布者也不用其他的发布者或者订阅者做任何重新配置。

    EasyNetQ发布消息(假定你已经重建了一个IBus实例)

    1. 创建你自己的消息实例,可以是任何可序列化的 .NET 类型。
    2. 调用IBus上的Publish方法,并传入你的消息实例。

    代码如下:

    var message = new MyMessage{ Text = "Hello Rabbit" };
    bus.Publish(message);

    为确保消息投递成功,请看Publisher Confirms.

    发布者和订阅者之间彼此是不知道对方的。发布者简单的对世界说“这儿有事情发生”,订阅者告诉世界“我关心这种事儿的发生”。在这个模型中这是很好的,没有人关心特定的事件。可能有一个订阅者关心这个消息,也可能有200个,或者没有人关心它。发布者不应该关心EasyNetQ对这个消息模式的实现。假如你开始去发布消息,而没有任何订阅者曾经定义此消息,那么这个消息就简单的消失了。这是我们的设计意图。


    一个EasyNetQ订阅者订阅一种消息类型(消息类为.NET 类型)。通过调用Subcribe方法一旦对一个类型设置了订阅,一个持久化队列就会在RabbitMQ broker上被创建,这个类型的任何消息都会被发送到这个队列上。订阅者无论什么时候连接上,RabbitMQ都将会将消息从队列中发送给订阅者。

    不管消息什么时候送达到,订阅这个消息的订阅者需要给RabbitMQ一个可执行的操作。我们通过传递一个订阅代理:

    bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
    • 1

    现在每一次MyMessage实例被发送后,EasyNetQ将会调用我们的代理,打印这个消息的Text属性到控制台。

    你传给订阅的订阅Id是重要的。 EasyNetQ将会在RabbitMQ Broker上为特定的消息类型的和订阅id的组合创建唯一的队列。

    每一次调用Subscribe方法会创建一个新的队列消费者。如果你用相同的消息和订阅id调用Subscribe两次,你将会创建两个消费者去消费同一个队列。然后RabbitMQ将会依次连续轮询消息给每一个消费者。这种可伸缩性和工作分担是非常棒的。比如说,你创建了一个处理特殊消息的服务,但是他已经超负荷工作了。简单的创建一个新的服务实例(在同一个机器上,或者不同的机器上),不用配置任何东西,你自动就得到了伸缩性。

    假如相同的消息类型,用不同的订阅id调用了两次Subscribe,你将创建两个队列,每一个队列有自己的消费者。每一个消息的副本将会路由到每一个队列,因此不同的消费者都将得到所有消息(这个类型的)。假如你有几个不同的服务都关心相同类型的消息,这这样很好。
    写订阅回调委托时的注意事项

    通过EasyNetQ订阅到一个来至队列的消息,他们被放置在内存队列中。一个单独线程循环对垒得到消息,调用他们的委托方法。因为在一个独立线程上一个委托一次处理一个消息,你应该避免长时间的同步的IO操作。应该尽快从委托返回控制。

    使用异步订阅 SubscribeAsync

    SubscribeAsync 允许你的订阅者委托到一个能立即返回的Task,然后异步的执行长时间IO操作。一但长时间运行的订阅完成后,就简单的完成这个任务。下面的例子,我们请求一个web service使用一个异步IO操作(DownloadStringTask)。当这个task完成事,写一行信息到控制台。

    bus.SubscribeAsync<MyMessage>("subscribe_async_test",message => 
           new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
               .ContinueWith(task => 
                    Console.WriteLine("Received:'{0}',Downloaded:'{1}'",
                        message.Text,
                        task.Result)))

    另一个列子是如果有错误发生,返回结果会有异常抛出,那么消息将会被放到一个默认的错误队列中。

    _bus.SubscribeAsync<MessageType>("queue_Identifier", 
                 Message => Task.Factory.StartNew(() => 
                 {
                     //这里执行一些操作
                     //如果这里有一个异常,那么在这个Task执行完毕后,这个异常会作为结果返回,
                     // 然后任务将继续执行下去。
                 }).ContinueWith(task => 
                 {
                     if ( task.IsCompleted && ! task.IsFaulted)
                     {
                         // 一切都很好
                     }
                     else
                     {
                         // 不要Catch 异常,否则异常会进一步被嵌套,结果会被发送到默认的错误队列
                         throw new EasyNetQException("Message processing exception - look in t  the default error quenue(broker)");
                     }
                 }));

    取消订阅

    所有的订阅都会返回一个ISubscriptionResult接口实例。它包含属性有订阅底层被IConsumer使用的IExchaneIQueue,如果你需要使用更高级的API IAdvancedBus更好的去处理,这会变为可能。

    你能够在任何时间取消一个订阅者,通过调用ISubscriptionResult实例上的Dispose方法,或者在它之上的 ConsumerCancellation属性。

    var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
    
    ...
    
    subscriptionResult.Dispose();
    // 这个等价与 subscriptionResult.ConsumerCancellation.Dispose();

    这将停止EasyNetQ对队列的消费,并且关闭这个消费者的channel

    注意:IBusIAndvancedBusdispose,可能够取消所有消费者,并关闭对RabbitMQ的连接。

    不要在消息处理器中调用 subscriptionResult.Dispose()。这将在EasyNetQ ACK 消息时,在消费者的channelsubscriptionResult.Dispose()调用关闭Channel之间,创建一个竞争状态。由于EasyNetQ的内部架构这些将会在不同的线程被调用,还有时间上的不确定性。

     
  • 相关阅读:
    POJ 3710 Christmas Game#经典图SG博弈
    POJ 2599 A funny game#树形SG(DFS实现)
    POJ 2425 A Chess Game#树形SG
    LeetCode Array Easy 122. Best Time to Buy and Sell Stock II
    LeetCode Array Easy121. Best Time to Buy and Sell Stock
    LeetCode Array Easy 119. Pascal's Triangle II
    LeetCode Array Easy 118. Pascal's Triangle
    LeetCode Array Easy 88. Merge Sorted Array
    ASP.NET MVC 学习笔记之 MVC + EF中的EO DTO ViewModel
    ASP.NET MVC 学习笔记之面向切面编程与过滤器
  • 原文地址:https://www.cnblogs.com/lhxsoft/p/11881076.html
Copyright © 2011-2022 走看看