zoukankan      html  css  js  c++  java
  • 15,EasyNetQ-高级API

    EasyNetQ的使命是为基于RabbitMQ的消息传递提供最简单的API。 核心IBus接口有意避免公开AMQP概念,如交换,绑定和队列,而是实现基于消息类型的默认交换绑定队列拓扑。

    对于某些场景,能够配置您自己的exchange绑定队列拓扑是很有用的;高级EasyNetQ API允许您这样做。高级API对AMQP有很好的理解。

    高级API通过IAdvancedBus接口实现。 该接口的一个实例可以通过IBus的高级属性进行访问: var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced; 

    1,声明交换机

    要声明交换机,请使用IAdvancedBus的ExchangeDeclare方法:

    IExchange ExchangeDeclare(
        string name, 
        string type, 
        bool passive = false, 
        bool durable = true, 
        bool autoDelete = false, 
        bool @internal = false, 
        string alternateExchange = null, 
        bool delayed = false);

    name: 交换机名称
    type: 有效的交换机类型(使用静态ExchangeType类的属性安全地声明交换)
    passive: 不要创建交换。 如果指定的交换不存在,则抛出异常。 (默认为false)
    durable: 生存服务器重新启动。 如果此参数为false,则在服务器重新启动时,交换将被删除。 (默认为true)
    autoDelete: 最后一个队列未被绑定时删除此交换。 (默认为false)
    internal: 这种交换不能由发布者直接使用,而只能由交换使用来交换绑定。 (默认为false)
    alternateExchange:如果无法路由邮件,则将邮件路由到此交换机。
    delayed:如果设置,则分配x延迟型交换以路由延迟的消息。

    ①简单案例

    // create a direct exchange
    var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);
    
    // create a topic exchange
    var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Topic);
    
    // create a fanout exchange
    var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Fanout);

    要获得RabbitMQ默认交换,请执行以下操作:

    var exchange = Exchange.GetDefault();

    2,声明队列

    要声明队列,请使用IAdvancedBus的QueueDeclare方法:

    IQueue QueueDeclare(
        string name, 
        bool passive = false, 
        bool durable = true, 
        bool exclusive = false, 
        bool autoDelete = false,
        int? perQueueMessageTtl  = null, 
        int? expires = null,
        byte? maxPriority = null,
        string deadLetterExchange = null, 
        string deadLetterRoutingKey = null,
        int? maxLength = null,
        int? maxLengthBytes = null);

    name: 队列的名称
    passive:如果队列不存在,则不要创建该队列,而是引发异常(默认为false)
    durable: 可以在服务器重新启动后继续运行 如果这是错误的,则在服务器重新启动时,队列将被删除。 (默认为true)
    exclusive: 只能由当前连接访问,其他连接上来会抛异常。 (默认为false)
    autoDelete: 所有消费者断开连接后删除队列。 (默认为false)
    perQueueMessageTtl:丢弃之前,消息在队列中应保留多长时间(以毫秒为单位)。 (默认未设置)
    expires: 自动删除之前,队列应该保持未使用状态的时间以毫秒为单位。 (默认未设置)
    maxPriority: 确定队列应支持的最大消息优先级。
    deadLetterExchange:确定交换机的名称在被服务器自动删除之前可以保持未使用状态。
    deadLetterRoutingKey:如果设置,将路由消息与指定的路由密钥,如果未设置,则消息将使用与最初发布的路由密钥相同的路由。
    maxLength: 队列中可能存在的最大可用消息数。 一旦达到限制,邮件就会从队列的前面被删除或死信,以便为新邮件腾出空间。
    maxLengthBytes:队列的最大大小(以字节为单位)。 一旦达到限制,邮件就会从队列的前面被删除或死信,以便为新邮件腾出空间

    请注意,如果定义了maxLength和/或maxLengthBytes属性,则RabbitMQ的行为可能并不如人们所期望的那样。 人们可能会期望代理拒绝进一步的消息; 但是RabbitMQ文档(https://www.rabbitmq.com/maxlength.html)表明,一旦达到限制,邮件将从队列的前端丢弃或死锁,以便为新邮件腾出空间。

    ①简单案例

    // declare a durable queue
    var queue = advancedBus.QueueDeclare("my_queue");
    
    // declare a queue with message TTL of 10 seconds:
    var queue = advancedBus.QueueDeclare("my_queue", perQueueTtl:10000);

    要声明一个'未命名的'独占队列,其中RabbitMQ提供队列名称,请使用不带参数的QueueDeclare重载:

    var queue = advancedBus.QueueDeclare();

    请注意,EasyNetQ的自动消费者重新连接逻辑被关闭以用于独占队列。

    3,绑定

    你将一个队列绑定到像这样的交换机上:

    var queue = advancedBus.QueueDeclare("my.queue");
    var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
    var binding = advancedBus.Bind(exchange, queue, "A.*");

    要指定队列和交换机之间的多个绑定,只需执行多个绑定调用即可:

    var queue = advancedBus.QueueDeclare("my.queue");
    var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
    advancedBus.Bind(exchange, queue, "A.B");
    advancedBus.Bind(exchange, queue, "A.C");

    你也可以将交换机绑定在一个链上:

    var sourceExchange = advancedBus.ExchangeDeclare("my.exchange.1", ExchangeType.Topic);
    var destinationExchange = advancedBus.ExchangeDeclare("my.exchange.2", ExchangeType.Topic);
    var queue = advancedBus.QueueDeclare("my.queue");
    
    advancedBus.Bind(sourceExchange, destinationExchange, "A.*");
    advancedBus.Bind(destinationExchange, queue, "A.C");

    4,发布

    先进的Publish方法允许您指定要发布消息的交换机。 它还允许访问消息的AMQP基本属性。

    创建你的消息。 高级API要求您的消息包装在消息中:

    var myMessage = new MyMessage {Text = "Hello from the publisher"};
    var message = new Message<MyMessage>(myMessage);

    Message类可让您访问AMQP基本属性,例如:

    message.Properties.AppId = "my_app_id";
    message.Properties.ReplyTo = "my_reply_queue";

    最后使用发布方法发布您的消息。 在这里,我们正在向默认交流发布:

    bus.Publish(Exchange.GetDefault(), queueName, false, false, message);

    发布的重载允许您绕过EasyNetQ的消息序列化并创建自己的字节数组消息:

    var properties = new MessageProperties();
    var body = Encoding.UTF8.GetBytes("Hello World!");
    bus.Publish(Exchange.GetDefault(), queueName, false, false, properties, body);

    5,订阅

    使用IAdvancedBus的Consume方法来消费队列中的消息。

    IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage) where T : class;

    onMessage委托是您为消息传递提供的处理程序。 其参数如下:

    如上面发布部分所述,IMessage使您可以访问消息及其MessageProperties。 MessageReceivedInfo为您提供有关消息消耗的上下文的额外信息:

    public class MessageReceivedInfo
    {
        public string ConsumerTag { get; set; }
        public ulong DeliverTag { get; set; }
        public bool Redelivered { get; set; }
        public string Exchange { get; set; }
        public string RoutingKey { get; set; }         
    }

    您返回一个允许您编写非阻塞异步处理程序的任务。

    消耗方法返回一个IDisposable。 调用其Dispose方法来取消使用者。

    如果您只需要同步处理程序,则可以使用同步重载:

    IDisposable Consume<T>(IQueue queue, Action<IMessage<T>, MessageReceivedInfo> onMessage) where T : class;

    要绕过EasyNetQ的消息序列化器,请使用提供原始字节数组消息的消耗超载:

    void Consume(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

    在这个例子中,我们使用队列'my_queue'中的原始消息字节:

    var queue = advancedBus.QueueDeclare("my_queue");
    advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
        {
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine("Got message: '{0}'", message);
        }));

    您可以选择使用Consume方法的这种重载向单个使用者注册多个处理程序:

    IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers);

    IHandlerRegistration接口如下所示:

    public interface IHandlerRegistration
    {
        /// <summary>
        /// 添加异步处理程序
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="handler">The handler</param>
        /// <returns></returns>
        IHandlerRegistration Add<T>(Func<IMessage<T>, MessageReceivedInfo, Task> handler)
            where T : class;
    
        /// <summary>
        /// 添加同步处理程序
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="handler">The handler</param>
        /// <returns></returns>
        IHandlerRegistration Add<T>(Action<IMessage<T>, MessageReceivedInfo> handler)
            where T : class;
    
        /// <summary>
        ///如果处理程序集合在未找到匹配的处理程序时应抛出EasyNetQException,则设置为true;如果应返回noop处理程序,则设置为false .Default为true。
        /// </summary>
        bool ThrowOnNoMatchingHandler { get; set; }
    }

    在这个例子中,我们注册了两个不同的处理程序,一个处理MyMessage类型的消息,另一个处理MyOtherMessage类型的消息:

    bus.Advanced.Consume(queue, x => x
            .Add<MyMessage>((message, info) => 
                { 
                    Console.WriteLine("Got MyMessage {0}", message.Body.Text);
                    countdownEvent.Signal();
                })
            .Add<MyOtherMessage>((message, info) =>
                {
                    Console.WriteLine("Got MyOtherMessage {0}", message.Body.Text);
                    countdownEvent.Signal();
                })
        );

    查看这篇博文了解更多信息:http://mikehadlow.blogspot.co.uk/2013/11/easynetq-multiple-handlers-per-consumer.html

    6,从队列中获取单个消息

    要从队列中获取单条消息,请使用IAdvancedBus.Get方法:

    IBasicGetResult<T> Get<T>(IQueue queue) where T : class;

    从AMQP文档:“此方法使用同步对话提供对队列中消息的直接访问,该同步对话旨在用于同步功能比性能更重要的特定类型的应用程序。” 不要使用Get来轮询消息。 在典型的应用场景中,您应该始终支持消费。

    IBasicGetResult具有以下签名:

    /// <summary>
    ///AdvancedBus Get方法的结果
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IBasicGetResult<T> where T : class
    {
        /// <summary>
        ///如果消息可用,则为true,否则为false。
        /// </summary>
        bool MessageAvailable { get; }
    
        /// <summary>
        /// 消息从队列中回收。 如果没有消息可用,此属性将引发MessageNotAvailableException。 在尝试访问它之前,您应该检查MessageAvailable属性。
        /// </summary>
        IMessage<T> Message { get; }
    }

    在访问Message属性前总是检查MessageAvailable方法。

    一个例子:

    var queue = advancedBus.QueueDeclare("get_test");
    advancedBus.Publish(Exchange.GetDefault(), "get_test", false, false,
        new Message<MyMessage>(new MyMessage{ Text = "Oh! Hello!" }));
    
    var getResult = advancedBus.Get<MyMessage>(queue);
    
    if (getResult.MessageAvailable)
    {
        Console.Out.WriteLine("Got message: {0}", getResult.Message.Body.Text);
    }
    else
    {
        Console.Out.WriteLine("Failed to get message!");
    }

    要访问原始二进制消息,请使用非通用Get方法:

    IBasicGetResult Get(IQueue queue);

    非泛型IBasicGetResult具有以下定义:

    public interface IBasicGetResult
    {
        byte[] Body { get; }
        MessageProperties Properties { get; }
        MessageReceivedInfo Info { get; }
    }

    7,消息类型必须匹配

    EasyNetQ高级API期望订户仅接收通用类型参数提供的类型的消息。 在上面的例子中,只有MyMessage类型的消息应该被接收。 但是,EasyNetQ不保护您不向用户发布错误类型的消息。 我可以很容易地设置一个交换绑定队列拓扑来发布NotMyMessage类型的消息,该消息将被上面的处理程序接收。 如果接收到错误类型的消息,EasyNetQ将抛出EasyNetQInvalidMessageTypeException异常:

    EasyNetQ.EasyNetQInvalidMessageTypeException: Message type is incorrect. Expected 'EasyNetQ_Tests_MyMessage:EasyNetQ_Tests', but was 'EasyNetQ_Tests_MyOtherMessage:EasyNetQ_Tests'
       at EasyNetQ.RabbitAdvancedBus.CheckMessageType[TMessage](MessageProperties properties) in D:SourceEasyNetQSourceEasyNetQRabbitAdvancedBus.cs:line 217
       at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1`1.<Subscribe>b__0(Byte[] body, MessageProperties properties, MessageReceivedInfo messageRecievedInfo) in D:SourceEasyNetQSourceEasyNetQRabbitAdvancedBus.cs:line 131
       at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass6.<Subscribe>b__5(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, Byte[] body) in D:SourceEasyNetQSourceEasyNetQRabbitAdvancedBus.cs:line 176
       at EasyNetQ.QueueingConsumerFactory.HandleMessageDelivery(BasicDeliverEventArgs basicDeliverEventArgs) in D:SourceEasyNetQSourceEasyNetQQueueingConsumerFactory.cs:line 85

    8,事件

    当通过RabbitHutch实例化一个IBus时,您可以指定一个AdvancedBusEventHandlers。 该类包含IAdvancedBus中存在的每个事件的事件处理程序属性,并提供了在总线实例化之前指定事件处理程序的方法。

    不必使用它,因为一旦创建了总线,它仍然可以添加事件处理程序。 但是,如果您希望能够捕获RabbitAdvancedBus的第一个Connected事件,则必须将AdvancedBusEventHandlers与Connected EventHandler一起使用。 这是因为总线将在构造函数返回之前尝试连接一次,如果连接尝试成功,将会引发RabbitAdvancedBus.OnConnected。

    var buss = RabbitHutch.CreateBus("host=localhost", new AdvancedBusEventHandlers(connected: (s, e) =>
    {
          var advancedBus = (IAdvancedBus)s;
          Console.WriteLine(advancedBus.IsConnected); // This will print true.
    }));
  • 相关阅读:
    UVa OJ 148 Anagram checker (回文构词检测)
    UVa OJ 134 LoglanA Logical Language (Loglan逻辑语言)
    平面内两条线段的位置关系(相交)判定与交点求解
    UVa OJ 130 Roman Roulette (罗马轮盘赌)
    UVa OJ 135 No Rectangles (没有矩形)
    混合函数继承方式构造函数
    html5基础(第一天)
    js中substr,substring,indexOf,lastIndexOf,split等的用法
    css的textindent属性实现段落第一行缩进
    普通的css普通的描边字
  • 原文地址:https://www.cnblogs.com/zd1994/p/8651966.html
Copyright © 2011-2022 走看看