zoukankan      html  css  js  c++  java
  • EasyNetQ使用(八)【对延迟消息插件的支持,自动订阅者】

    RabbitMQ延迟消息插件仍然在实验阶段。你使用这个功能要自担风险。

    RabbitMQ延迟消息插件RabbitMQ增加了新的交换机类型,允许延时消息投递。

    EasyNetQ为交换机通过定义一种新的日程类型:DelayedExchangeScheduler来支持这种能力。

    这样允许你使用之前同样的Future Publish接口,但是取消Future Message会抛出异常。因为延迟消息插件不支持消息取消,不管你在调用FuturePublish是否指定了cancellationKey,或当你调用CancelFuturePublish时,调度器将会抛出NotImplementedException异常。

    下面例子展示的是:你如何发布一个消息,这个消息会在未来3个月后收到这个消息。

    bus = RabbitHutch.CreateBus("host=localhost", 
            x=> x.Register<IScheduler,DelayedExchangeScheduler>());
    
    var followUpCallMessage = new FollowUpCallMessage( .. );
    
    bus.FuturePublish(DateTime.UtcNow.AddMonths(3), followUpCallMessage);        

    第一行代码告示EasyNetQ使用新的支持延迟消息交换机作为调度器。下面,消息被创建,然后在发布时,指定了投递时间是3个月后。注意:FuturePublish使用了UTC时间。
    插件安装

    可以在Community Plugins page找到延迟消息插件。下载与你安装的RabbitMQ对应 .ez文件,拷贝这个文件到RabbitMQ plugin文件夹下,然后通过运行下面的命令启动之。

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    这个插件要求RabbitMQ最低版本为3.4或更新版本。

    它是怎么运行的

    当你调用bus.FuturePublish(..),EasyNetQ跟正常的交换机和binds一起,自动创建了一个新的 x-delayed-message交换机,这个消息被发送到这个延迟交换机上,并存储这个消息,直到到期时去投递这个消息。就在投递这个时刻,这个消息会被路由到普通交换机,然后从普通交换机到绑定的队列中。

    当你调用Publish(..)方法,消息会发布到正常交换机, 防止降低使用与x-delayed-message有关系的交换机的性能。

    延迟交换机持久化消息使用了Mnesia.这样避免在服务器停机时丢失消息。在服务器恢复之后,所有符合条件的消息将会按计划日程投递。

    取消Future消息

    如前所述,延迟消息插件不支持消息取消,因此这个功能是不支持的。任何调用FuturePublish时指定cancellationKey,或者调用CancelFuturePublish,都会抛出NotImplementedException异常。

    如果你需要这个功能,请上一篇所讲的使用Scheduler 服务,具体参考EasyNetQ之用Future Publish发布预定中事件


    EasyNetQ v0.7.1.30版本有了一个简单AutoSubscriber。你能够使用它很容易的去扫描指定程序集中实现了IConsume 或 IConsumeAsync接口的类,然后这个自动订阅者让这些消费者订阅到你的事件总线中。IConsume的实现将使用事件总线的Subscribe方法,同时IConsumeAsync的实现使用事件总线的SubscribeAsync方法,详情参看EasyNetQ之订阅。你当然能够让你的消费者处理多个消息。让我们看一些示例。

    注意:从0.13.0版本开始,所有的AutoSubscriber类都在EasyNetQ.AutoSubscribe命名空间中。因此请添加下面的using语句。

    using EasyNetQ.AutoSubscribe;

    让我们定义一个简单消费者,处理三个消息:MessageAMessageBMessageC

    public class MyConsumer : IConsume<MessageA>,
                              IConsume<MessageB>,
                              IConsumeAsync<MessageC>
    {
        public void Consume(MessageA message) {...}
    
        public void Consume(MessageB message) {...}
    
        public Task Consume(MessageC message) {...}
    }

    首先创建一个新的AutoSubscriber实例,传你的IBus实例和subscriberId前缀给AutoSubscriber的构造函数。这个subscriberId前缀会作为所有自动生成的subscriberIds的前缀。但是不能自定义subscriberId(参看下文)。

    注册在相同程序集中的所有其他消费者,我们仅仅需要去传包含你的消费者所在的程序集给这个方法:AutoSubscriber.Subscribe(assembly)。注意!,这些事情你应该只做一次,最好在应用启动的时候。

    var subscriber = new AutoSubscriber(bus,
            "my_applications_subscriptionId_prefix");
    subscriber.Subscribe(Assembly.GetExecutingAssembly());

    通过主题订阅

    默认的AutoSubscriber将不带主题的绑定。下面的例子中MessageA被注册到两个主题。

    注意! 如果你运行下面代码,没有加上ForTopic属性标签,它将有一个”#”路由Key,这个Key将会选择任何订阅这个消息类型的订阅者。假定在默认端口上且admin插件已被安装,简单访问http://localhost:15672/#/queues,如果需要请解除绑定的路由。

    public class MyConsumer : IConsume<MessageA>, 
                              IConsume<MessageB>,
                              IConsumeAsync<MessageC>
    {
        [ForTopic("Topic.Foo")]
        [ForTopic("Topic.Bar")]
        public void Consume(MessageA message) {...}
    
        public void Consume(MessageB message) {...}
    
        public Task Consume(MessageC message) {...}
    }
    
    //通过主题发布
    var bus = RabbitHutch.CreateBus("host=localhost");
    
    //被选中
    var msg1 = new MessageA("topic.foo");
    bus.Publish(msg1, "Topic.Foo");
    
    //被选中
    var msg2 = new MessageA("topic.bar");
    bus.Publish(msg1, "Topic.Bar");
    
    //不会被选中
    var msg3 = new MessageA("no pick up);                
    bus.Publish(msg3);

    指定一个特定的SubscriptionId

    AutoSubscriber 默认情况下为每一组Message/Consumer生成一个唯一的SubscriptionId。这意味着当相同的消费者启动多个实例后,这些实例将会从相同队列中循环消费消息。

    如果你希望subscriptionId是固定的,你能通过带有AutoSubscriberConsumerAttribute属性标签的Consume方法实现。为什么你会让SubscriptionId是固定的,可以阅读这里。

    假如,消费MessageB消息的Consume方法需要固定的SubscriptionId。仅仅需要使用AutoSubscriberConsumerAttribute属性标签,为其SubscriptionId设置一个值。

    [AutoSubscriberConsumer(SubscriptionId = "MyExplicitId")]
    public void Consume(MessageB message) { }

    控制SubscriptionId的生成

    你当然也可以控制实际的SubscriptionId的生成,只要用AutoSubscriber.GenerateSubscriptionId : Func替换就可以实现。

    var subscriber = new AutoSubscriber(bus)
    {
        CreateConsumer = t => objectResolver.Resolve(t),
        GenerateSubscriptionId = c => AppDomain.CurrentDomain.FriendlyName
            + c.ConcreteType.Name
    };
    subscriber.Subscribe(Assembly.GetExecutingAssembly());

    注意! 这只是实现的一个样例。请确保你已经读过并理解了SubscriptionId值的重要价值。

    控制消费者配置设置

    当使用autosubsriber去订阅队列消息,你能够设置ISubscriptionConfiguration值,例如AutoDeletePriority 等。

    var subscriber = new AutoSubscriber(bus)
    {    
        ConfigureSubscriptionConfiguration = c => c.WithAutoDelete()
                                                   .WithPriority(10)
    };
    subscriber.Subscribe(Assembly.GetExecutingAssembly());

    或者,你可以用在consume方法上用属性标签的方式。

    public class MyConsumer : IConsume<MessageA>
    {
        [SubscriptionConfiguration(CancelOnHaFailover = true, PrefetchCount = 10)]
        public void Consume(MessageA message) {...}
    }
    

    AutoSubscriber使用容器

    AutoSubscriber 有一个属性:MessageDispatcher,这个属性允许插入你自己消息调度代码。这样允许你从IoC容器中或者其他第三方任务调度中,解析你自己的消费者。
    让我们写一个自定义的IAutoSubscriberMessageDispatcher实现,从 Windsor IoC 容器中去解析消费者。

    public class WindsorMessageDispatcher : IAutoSubscriberMessageDispatcher
    {
        private readonly IWindsorContainer container;
    
        public WindsorMessageDispatcher(IWindsorContainer container)
        {
            this.container = container;
        }
    
        public void Dispatch<TMessage, TConsumer>(TMessage message) where TMessage : class where TConsumer : IConsume<TMessage>
        {
            var consumer = container.Resolve<TConsumer>();
            try
            {
                consumer.Consume(message);
            }
            finally
            {
                container.Release(consumer);
            }
        }
    
        public Task DispatchAsync<TMessage, TConsumer>(TMessage message) where TMessage: class where TConsumer: IConsumeAsync<TMessage>
        {
            var consumer = _container.Resolve<TConsumer>();           
            return consumer.Consume(message).ContinueWith(t=>_container.Release(consumer));            
        }
    }

    现在,我们需要注册消费者到我们的IoC容器中。

    var container = new WindsorContainer();
    container.Register(
        Component.For<MyConsumer>().ImplementedBy<MyConsumer>()
        );

    下一步,让AutoSubscriber 用我们自定义的IMessageDispatcher

    var bus = RabbitHutch.CreateBus("host=localhost");
    
    var autoSubscriber = new AutoSubscriber(bus, "My_subscription_id_prefix")
    {
        MessageDispatcher = new WindsorMessageDispatcher(container)
    };
    autoSubscriber.Subscribe(GetType().Assembly);
    autoSubscriber.SubscribeAsync(GetType().Assembly);

    现在每一次消息到来是,一个新的消费者实例,将会从我们自定义的容器中拿到。

     
  • 相关阅读:
    第三方支付集成
    文件并发(日志处理)--队列--Redis+Log4Net
    ReportingServies——SQLServer报表开发综合实例
    C#开发可以可视化操作的windows服务
    4、ASP.NET MVC入门到精通——NHibernate构建一个ASP.NET MVC应用程序
    Lucene.net站内搜索—6、站内搜索第二版
    Lucene.net站内搜索—5、搜索引擎第一版实现
    Lucene.net站内搜索—4、搜索引擎第一版技术储备(简单介绍Log4Net、生产者消费者模式)
    谈谈爱情——祭奠那逝去的青春
    Lucene.net站内搜索—3、最简单搜索引擎代码
  • 原文地址:https://www.cnblogs.com/lhxsoft/p/11881796.html
Copyright © 2011-2022 走看看