zoukankan      html  css  js  c++  java
  • EasyNetQ使用(三)【Publish与Subcribe】

    EasyNetQ支持的最简单的消息模式是发布/订阅.这个模式是一个极好的方法用来解耦消息提供者和消费者。消息发布者只要简单的对世界说,“这里有些事发生” 或者 “我现在有一个信息”。它不关心有没有人监听,或者接收者是谁,或者接收者在那里。我们能够添加和移除特定类型的消息的订阅者,不需发布者做任何的重新配置。我们也能够有多个发布者发布相同的消息,添加和删除发布者也不用其他的发布者或者订阅者做任何重新配置。

    EasyNetQ发布消息(假定你已经重建了一个IBus实例)

    1. 创建你自己的消息实例,可以是任何可序列化的 .NET 类型。
    2. 调用IBus上的Publish方法,并传入你的消息实例。

    代码如下:

    var message = new MyMessage{ Text = "Hello Rabbit" };
    bus.Publish(message);

    为确保消息投递成功,请看Publisher Confirms.

    发布者和订阅者之间彼此是不知道对方的。发布者简单的对世界说“这儿有事情发生”,订阅者告诉世界“我关心这种事儿的发生”。在这个模型中这是很好的,没有人关心特定的事件。可能有一个订阅者关心这个消息,也可能有200个,或者没有人关心它。发布者不应该关心EasyNetQ对这个消息模式的实现。假如你开始去发布消息,而没有任何订阅者曾经定义此消息,那么这个消息就简单的消失了。这是我们的设计意图。


    一个EasyNetQ订阅者订阅一种消息类型(消息类为.NET 类型)。通过调用Subcribe方法一旦对一个类型设置了订阅,一个持久化队列就会在RabbitMQ broker上被创建,这个类型的任何消息都会被发送到这个队列上。订阅者无论什么时候连接上,RabbitMQ都将会将消息从队列中发送给订阅者。

    不管消息什么时候送达到,订阅这个消息的订阅者需要给RabbitMQ一个可执行的操作。我们通过传递一个订阅代理:

    bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
    • 1

    现在每一次MyMessage实例被发送后,EasyNetQ将会调用我们的代理,打印这个消息的Text属性到控制台。

    你传给订阅的订阅Id是重要的。 EasyNetQ将会在RabbitMQ Broker上为特定的消息类型的和订阅id的组合创建唯一的队列。

    每一次调用Subscribe方法会创建一个新的队列消费者。如果你用相同的消息和订阅id调用Subscribe两次,你将会创建两个消费者去消费同一个队列。然后RabbitMQ将会依次连续轮询消息给每一个消费者。这种可伸缩性和工作分担是非常棒的。比如说,你创建了一个处理特殊消息的服务,但是他已经超负荷工作了。简单的创建一个新的服务实例(在同一个机器上,或者不同的机器上),不用配置任何东西,你自动就得到了伸缩性。

    假如相同的消息类型,用不同的订阅id调用了两次Subscribe,你将创建两个队列,每一个队列有自己的消费者。每一个消息的副本将会路由到每一个队列,因此不同的消费者都将得到所有消息(这个类型的)。假如你有几个不同的服务都关心相同类型的消息,这这样很好。
    写订阅回调委托时的注意事项

    通过EasyNetQ订阅到一个来至队列的消息,他们被放置在内存队列中。一个单独线程循环对垒得到消息,调用他们的委托方法。因为在一个独立线程上一个委托一次处理一个消息,你应该避免长时间的同步的IO操作。应该尽快从委托返回控制。

    使用异步订阅 SubscribeAsync

    SubscribeAsync 允许你的订阅者委托到一个能立即返回的Task,然后异步的执行长时间IO操作。一但长时间运行的订阅完成后,就简单的完成这个任务。下面的例子,我们请求一个web service使用一个异步IO操作(DownloadStringTask)。当这个task完成事,写一行信息到控制台。

    bus.SubscribeAsync<MyMessage>("subscribe_async_test",message => 
           new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
               .ContinueWith(task => 
                    Console.WriteLine("Received:'{0}',Downloaded:'{1}'",
                        message.Text,
                        task.Result)))

    另一个列子是如果有错误发生,返回结果会有异常抛出,那么消息将会被放到一个默认的错误队列中。

    _bus.SubscribeAsync<MessageType>("queue_Identifier", 
                 Message => Task.Factory.StartNew(() => 
                 {
                     //这里执行一些操作
                     //如果这里有一个异常,那么在这个Task执行完毕后,这个异常会作为结果返回,
                     // 然后任务将继续执行下去。
                 }).ContinueWith(task => 
                 {
                     if ( task.IsCompleted && ! task.IsFaulted)
                     {
                         // 一切都很好
                     }
                     else
                     {
                         // 不要Catch 异常,否则异常会进一步被嵌套,结果会被发送到默认的错误队列
                         throw new EasyNetQException("Message processing exception - look in t  the default error quenue(broker)");
                     }
                 }));

    取消订阅

    所有的订阅都会返回一个ISubscriptionResult接口实例。它包含属性有订阅底层被IConsumer使用的IExchaneIQueue,如果你需要使用更高级的API IAdvancedBus更好的去处理,这会变为可能。

    你能够在任何时间取消一个订阅者,通过调用ISubscriptionResult实例上的Dispose方法,或者在它之上的 ConsumerCancellation属性。

    var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
    
    ...
    
    subscriptionResult.Dispose();
    // 这个等价与 subscriptionResult.ConsumerCancellation.Dispose();

    这将停止EasyNetQ对队列的消费,并且关闭这个消费者的channel

    注意:IBusIAndvancedBusdispose,可能够取消所有消费者,并关闭对RabbitMQ的连接。

    不要在消息处理器中调用 subscriptionResult.Dispose()。这将在EasyNetQ ACK 消息时,在消费者的channelsubscriptionResult.Dispose()调用关闭Channel之间,创建一个竞争状态。由于EasyNetQ的内部架构这些将会在不同的线程被调用,还有时间上的不确定性。

     
  • 相关阅读:
    java 编码问题
    关于时间
    页面
    关于微信
    01-jQuery的介绍
    15-BOM
    14-定时器
    13-JS中的面向对象
    12-关于DOM操作的相关案例
    购物车练习
  • 原文地址:https://www.cnblogs.com/lhxsoft/p/11881076.html
Copyright © 2011-2022 走看看