zoukankan      html  css  js  c++  java
  • 15,EasyNetQ-高级API

    EasyNetQ的使命是为基于RabbitMQ的消息传递提供最简单的API。 核心IBus接口有意避免公开AMQP概念,如交换,绑定和队列,而是实现基于消息类型的默认交换绑定队列拓扑。

    对于某些场景,能够配置您自己的exchange绑定队列拓扑是很有用的;高级EasyNetQ API允许您这样做。高级API对AMQP有很好的理解。

    高级API通过IAdvancedBus接口实现。 该接口的一个实例可以通过IBus的高级属性进行访问: var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced; 

    1,声明交换机

    要声明交换机,请使用IAdvancedBus的ExchangeDeclare方法:

    IExchange ExchangeDeclare(
        string name, 
        string type, 
        bool passive = false, 
        bool durable = true, 
        bool autoDelete = false, 
        bool @internal = false, 
        string alternateExchange = null, 
        bool delayed = false);

    name: 交换机名称
    type: 有效的交换机类型(使用静态ExchangeType类的属性安全地声明交换)
    passive: 不要创建交换。 如果指定的交换不存在,则抛出异常。 (默认为false)
    durable: 生存服务器重新启动。 如果此参数为false,则在服务器重新启动时,交换将被删除。 (默认为true)
    autoDelete: 最后一个队列未被绑定时删除此交换。 (默认为false)
    internal: 这种交换不能由发布者直接使用,而只能由交换使用来交换绑定。 (默认为false)
    alternateExchange:如果无法路由邮件,则将邮件路由到此交换机。
    delayed:如果设置,则分配x延迟型交换以路由延迟的消息。

    ①简单案例

    // create a direct exchange
    var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);
    
    // create a topic exchange
    var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Topic);
    
    // create a fanout exchange
    var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Fanout);

    要获得RabbitMQ默认交换,请执行以下操作:

    var exchange = Exchange.GetDefault();

    2,声明队列

    要声明队列,请使用IAdvancedBus的QueueDeclare方法:

    IQueue QueueDeclare(
        string name, 
        bool passive = false, 
        bool durable = true, 
        bool exclusive = false, 
        bool autoDelete = false,
        int? perQueueMessageTtl  = null, 
        int? expires = null,
        byte? maxPriority = null,
        string deadLetterExchange = null, 
        string deadLetterRoutingKey = null,
        int? maxLength = null,
        int? maxLengthBytes = null);

    name: 队列的名称
    passive:如果队列不存在,则不要创建该队列,而是引发异常(默认为false)
    durable: 可以在服务器重新启动后继续运行 如果这是错误的,则在服务器重新启动时,队列将被删除。 (默认为true)
    exclusive: 只能由当前连接访问,其他连接上来会抛异常。 (默认为false)
    autoDelete: 所有消费者断开连接后删除队列。 (默认为false)
    perQueueMessageTtl:丢弃之前,消息在队列中应保留多长时间(以毫秒为单位)。 (默认未设置)
    expires: 自动删除之前,队列应该保持未使用状态的时间以毫秒为单位。 (默认未设置)
    maxPriority: 确定队列应支持的最大消息优先级。
    deadLetterExchange:确定交换机的名称在被服务器自动删除之前可以保持未使用状态。
    deadLetterRoutingKey:如果设置,将路由消息与指定的路由密钥,如果未设置,则消息将使用与最初发布的路由密钥相同的路由。
    maxLength: 队列中可能存在的最大可用消息数。 一旦达到限制,邮件就会从队列的前面被删除或死信,以便为新邮件腾出空间。
    maxLengthBytes:队列的最大大小(以字节为单位)。 一旦达到限制,邮件就会从队列的前面被删除或死信,以便为新邮件腾出空间

    请注意,如果定义了maxLength和/或maxLengthBytes属性,则RabbitMQ的行为可能并不如人们所期望的那样。 人们可能会期望代理拒绝进一步的消息; 但是RabbitMQ文档(https://www.rabbitmq.com/maxlength.html)表明,一旦达到限制,邮件将从队列的前端丢弃或死锁,以便为新邮件腾出空间。

    ①简单案例

    // declare a durable queue
    var queue = advancedBus.QueueDeclare("my_queue");
    
    // declare a queue with message TTL of 10 seconds:
    var queue = advancedBus.QueueDeclare("my_queue", perQueueTtl:10000);

    要声明一个'未命名的'独占队列,其中RabbitMQ提供队列名称,请使用不带参数的QueueDeclare重载:

    var queue = advancedBus.QueueDeclare();

    请注意,EasyNetQ的自动消费者重新连接逻辑被关闭以用于独占队列。

    3,绑定

    你将一个队列绑定到像这样的交换机上:

    var queue = advancedBus.QueueDeclare("my.queue");
    var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
    var binding = advancedBus.Bind(exchange, queue, "A.*");

    要指定队列和交换机之间的多个绑定,只需执行多个绑定调用即可:

    var queue = advancedBus.QueueDeclare("my.queue");
    var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);
    advancedBus.Bind(exchange, queue, "A.B");
    advancedBus.Bind(exchange, queue, "A.C");

    你也可以将交换机绑定在一个链上:

    var sourceExchange = advancedBus.ExchangeDeclare("my.exchange.1", ExchangeType.Topic);
    var destinationExchange = advancedBus.ExchangeDeclare("my.exchange.2", ExchangeType.Topic);
    var queue = advancedBus.QueueDeclare("my.queue");
    
    advancedBus.Bind(sourceExchange, destinationExchange, "A.*");
    advancedBus.Bind(destinationExchange, queue, "A.C");

    4,发布

    先进的Publish方法允许您指定要发布消息的交换机。 它还允许访问消息的AMQP基本属性。

    创建你的消息。 高级API要求您的消息包装在消息中:

    var myMessage = new MyMessage {Text = "Hello from the publisher"};
    var message = new Message<MyMessage>(myMessage);

    Message类可让您访问AMQP基本属性,例如:

    message.Properties.AppId = "my_app_id";
    message.Properties.ReplyTo = "my_reply_queue";

    最后使用发布方法发布您的消息。 在这里,我们正在向默认交流发布:

    bus.Publish(Exchange.GetDefault(), queueName, false, false, message);

    发布的重载允许您绕过EasyNetQ的消息序列化并创建自己的字节数组消息:

    var properties = new MessageProperties();
    var body = Encoding.UTF8.GetBytes("Hello World!");
    bus.Publish(Exchange.GetDefault(), queueName, false, false, properties, body);

    5,订阅

    使用IAdvancedBus的Consume方法来消费队列中的消息。

    IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage) where T : class;

    onMessage委托是您为消息传递提供的处理程序。 其参数如下:

    如上面发布部分所述,IMessage使您可以访问消息及其MessageProperties。 MessageReceivedInfo为您提供有关消息消耗的上下文的额外信息:

    public class MessageReceivedInfo
    {
        public string ConsumerTag { get; set; }
        public ulong DeliverTag { get; set; }
        public bool Redelivered { get; set; }
        public string Exchange { get; set; }
        public string RoutingKey { get; set; }         
    }

    您返回一个允许您编写非阻塞异步处理程序的任务。

    消耗方法返回一个IDisposable。 调用其Dispose方法来取消使用者。

    如果您只需要同步处理程序,则可以使用同步重载:

    IDisposable Consume<T>(IQueue queue, Action<IMessage<T>, MessageReceivedInfo> onMessage) where T : class;

    要绕过EasyNetQ的消息序列化器,请使用提供原始字节数组消息的消耗超载:

    void Consume(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

    在这个例子中,我们使用队列'my_queue'中的原始消息字节:

    var queue = advancedBus.QueueDeclare("my_queue");
    advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
        {
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine("Got message: '{0}'", message);
        }));

    您可以选择使用Consume方法的这种重载向单个使用者注册多个处理程序:

    IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers);

    IHandlerRegistration接口如下所示:

    public interface IHandlerRegistration
    {
        /// <summary>
        /// 添加异步处理程序
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="handler">The handler</param>
        /// <returns></returns>
        IHandlerRegistration Add<T>(Func<IMessage<T>, MessageReceivedInfo, Task> handler)
            where T : class;
    
        /// <summary>
        /// 添加同步处理程序
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="handler">The handler</param>
        /// <returns></returns>
        IHandlerRegistration Add<T>(Action<IMessage<T>, MessageReceivedInfo> handler)
            where T : class;
    
        /// <summary>
        ///如果处理程序集合在未找到匹配的处理程序时应抛出EasyNetQException,则设置为true;如果应返回noop处理程序,则设置为false .Default为true。
        /// </summary>
        bool ThrowOnNoMatchingHandler { get; set; }
    }

    在这个例子中,我们注册了两个不同的处理程序,一个处理MyMessage类型的消息,另一个处理MyOtherMessage类型的消息:

    bus.Advanced.Consume(queue, x => x
            .Add<MyMessage>((message, info) => 
                { 
                    Console.WriteLine("Got MyMessage {0}", message.Body.Text);
                    countdownEvent.Signal();
                })
            .Add<MyOtherMessage>((message, info) =>
                {
                    Console.WriteLine("Got MyOtherMessage {0}", message.Body.Text);
                    countdownEvent.Signal();
                })
        );

    查看这篇博文了解更多信息:http://mikehadlow.blogspot.co.uk/2013/11/easynetq-multiple-handlers-per-consumer.html

    6,从队列中获取单个消息

    要从队列中获取单条消息,请使用IAdvancedBus.Get方法:

    IBasicGetResult<T> Get<T>(IQueue queue) where T : class;

    从AMQP文档:“此方法使用同步对话提供对队列中消息的直接访问,该同步对话旨在用于同步功能比性能更重要的特定类型的应用程序。” 不要使用Get来轮询消息。 在典型的应用场景中,您应该始终支持消费。

    IBasicGetResult具有以下签名:

    /// <summary>
    ///AdvancedBus Get方法的结果
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IBasicGetResult<T> where T : class
    {
        /// <summary>
        ///如果消息可用,则为true,否则为false。
        /// </summary>
        bool MessageAvailable { get; }
    
        /// <summary>
        /// 消息从队列中回收。 如果没有消息可用,此属性将引发MessageNotAvailableException。 在尝试访问它之前,您应该检查MessageAvailable属性。
        /// </summary>
        IMessage<T> Message { get; }
    }

    在访问Message属性前总是检查MessageAvailable方法。

    一个例子:

    var queue = advancedBus.QueueDeclare("get_test");
    advancedBus.Publish(Exchange.GetDefault(), "get_test", false, false,
        new Message<MyMessage>(new MyMessage{ Text = "Oh! Hello!" }));
    
    var getResult = advancedBus.Get<MyMessage>(queue);
    
    if (getResult.MessageAvailable)
    {
        Console.Out.WriteLine("Got message: {0}", getResult.Message.Body.Text);
    }
    else
    {
        Console.Out.WriteLine("Failed to get message!");
    }

    要访问原始二进制消息,请使用非通用Get方法:

    IBasicGetResult Get(IQueue queue);

    非泛型IBasicGetResult具有以下定义:

    public interface IBasicGetResult
    {
        byte[] Body { get; }
        MessageProperties Properties { get; }
        MessageReceivedInfo Info { get; }
    }

    7,消息类型必须匹配

    EasyNetQ高级API期望订户仅接收通用类型参数提供的类型的消息。 在上面的例子中,只有MyMessage类型的消息应该被接收。 但是,EasyNetQ不保护您不向用户发布错误类型的消息。 我可以很容易地设置一个交换绑定队列拓扑来发布NotMyMessage类型的消息,该消息将被上面的处理程序接收。 如果接收到错误类型的消息,EasyNetQ将抛出EasyNetQInvalidMessageTypeException异常:

    EasyNetQ.EasyNetQInvalidMessageTypeException: Message type is incorrect. Expected 'EasyNetQ_Tests_MyMessage:EasyNetQ_Tests', but was 'EasyNetQ_Tests_MyOtherMessage:EasyNetQ_Tests'
       at EasyNetQ.RabbitAdvancedBus.CheckMessageType[TMessage](MessageProperties properties) in D:SourceEasyNetQSourceEasyNetQRabbitAdvancedBus.cs:line 217
       at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1`1.<Subscribe>b__0(Byte[] body, MessageProperties properties, MessageReceivedInfo messageRecievedInfo) in D:SourceEasyNetQSourceEasyNetQRabbitAdvancedBus.cs:line 131
       at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass6.<Subscribe>b__5(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, Byte[] body) in D:SourceEasyNetQSourceEasyNetQRabbitAdvancedBus.cs:line 176
       at EasyNetQ.QueueingConsumerFactory.HandleMessageDelivery(BasicDeliverEventArgs basicDeliverEventArgs) in D:SourceEasyNetQSourceEasyNetQQueueingConsumerFactory.cs:line 85

    8,事件

    当通过RabbitHutch实例化一个IBus时,您可以指定一个AdvancedBusEventHandlers。 该类包含IAdvancedBus中存在的每个事件的事件处理程序属性,并提供了在总线实例化之前指定事件处理程序的方法。

    不必使用它,因为一旦创建了总线,它仍然可以添加事件处理程序。 但是,如果您希望能够捕获RabbitAdvancedBus的第一个Connected事件,则必须将AdvancedBusEventHandlers与Connected EventHandler一起使用。 这是因为总线将在构造函数返回之前尝试连接一次,如果连接尝试成功,将会引发RabbitAdvancedBus.OnConnected。

    var buss = RabbitHutch.CreateBus("host=localhost", new AdvancedBusEventHandlers(connected: (s, e) =>
    {
          var advancedBus = (IAdvancedBus)s;
          Console.WriteLine(advancedBus.IsConnected); // This will print true.
    }));
  • 相关阅读:
    我的ORM之六-- 批量
    我的ORM之五-- 事务
    我的ORM之四--删除
    我的ORM之三 -- 更新
    我的ORM之二--添加
    我的ORM之一 -- 查询
    hmailserver
    jquery 插件原则
    C#中 ToString 和 override ToString 的区别
    linq操作符:元素操作符
  • 原文地址:https://www.cnblogs.com/zd1994/p/8651966.html
Copyright © 2011-2022 走看看