zoukankan      html  css  js  c++  java
  • RabbitMQ学习之:(九)Headers Exchange (转贴+我的评论)

    From: http://lostechies.com/derekgreer/2012/05/29/rabbitmq-for-windows-headers-exchanges/


    RabbitMQ for Windows: Headers Exchanges

    This is the eighth and final installment to the series: RabbitMQ for Windows.  In thelast installment, we walked through creating a topic exchange example.  As the last installment, we’ll walk through a headers exchange example.

    Headers exchanges examine the message headers to determine which queues a message should be routed to.  As discussed earlier in this series, headers exchanges are similar to topic exchanges in that they allow you to specify multiple criteria, but offer a bit more flexibility in that the headers can be constructed using a wider range of data types (1).

    To subscribe to receive messages from a headers exchange, a dictionary of headers is specified as part of the binding arguments.  In addition to the headers, a key of “x-match” is also included in the dictionary with a value of “all”, specifying that messages must be published with all the specified headers in order to match, or “any”, specifying that the message needs to only have one of the specified headers specified.

    As our final example, we’ll create a Producer application which publishes the message “Hello, World!” using a headers exchange.  Here’s our Producer code:

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Text;
    using System.Threading;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Framing.v0_9_1;
    
    namespace Producer
    {
      class Program
      {
        const string ExchangeName = "header-exchange-example";
    
        static void Main(string[] args)
        {
          var connectionFactory = new ConnectionFactory();
          connectionFactory.HostName = "localhost";
    
          IConnection connection = connectionFactory.CreateConnection();
          IModel channel = connection.CreateModel();
          channel.ExchangeDeclare(ExchangeName, ExchangeType.Headers, false, true, null);
          byte[] message = Encoding.UTF8.GetBytes("Hello, World!");
    
          var properties = new BasicProperties();
          properties.Headers = new Dictionary<string, object>();
          properties.Headers.Add("key1", "12345");
          
          TimeSpan time = TimeSpan.FromSeconds(10);
          var stopwatch = new Stopwatch();
          Console.WriteLine("Running for {0} seconds", time.ToString("ss"));
          stopwatch.Start();
          var messageCount = 0;
    
          while (stopwatch.Elapsed < time)
          {
            channel.BasicPublish(ExchangeName, "", properties, message);
            messageCount++;
            Console.Write("Time to complete: {0} seconds - Messages published: {1}\r", (time - stopwatch.Elapsed).ToString("ss"), messageCount);
            Thread.Sleep(1000);
          }
    
          Console.Write(new string(' ', 70) + "\r");
          Console.WriteLine("Press any key to exit");
          Console.ReadKey();
          message = Encoding.UTF8.GetBytes("quit");
          channel.BasicPublish(ExchangeName, "", properties, message);
          connection.Close();
        }
      }
    }

    In the Producer, we’ve used a generic dictionary of type Dictionary<string, object> and added a single key “key1” with a value of “12345”.  As with our previous example, we’re using a stopwatch as a way to publish messages continually for 10 seconds.

    For our Consumer application, we can use an “x-match” argument of “all” with the single key/value pair specified by the Producer, or we can use an “x-match” argument of “any” which includes the key/value pair specified by the Producer along with other potential matches.  We’ll use the latter for our example.   Here’s our Consumer code:

    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Consumer
    {
      class Program
      {
        const string QueueName = "header-exchange-example";
        const string ExchangeName = "header-exchange-example";
    
        static void Main(string[] args)
        {
          var connectionFactory = new ConnectionFactory();
          connectionFactory.HostName = "localhost";
    
          IConnection connection = connectionFactory.CreateConnection();
          IModel channel = connection.CreateModel();
          channel.ExchangeDeclare(ExchangeName, ExchangeType.Headers, false, true, null);
          channel.QueueDeclare(QueueName, false, false, true, null);
    
          IDictionary specs = new Dictionary();
          specs.Add("x-match", "any");
          specs.Add("key1", "12345");
          specs.Add("key2", "123455");
          channel.QueueBind(QueueName, ExchangeName, string.Empty, specs);
                  // 注意,这个StartConsume是我们写的函数,其实他是在一个循环内反复调用CallBack函数。
          channel.StartConsume(QueueName, MessageHandler);
          connection.Close();
        }
    
        public static void MessageHandler(IModel channel, DefaultBasicConsumer consumer, BasicDeliverEventArgs eventArgs)
        {
          string message = Encoding.UTF8.GetString(eventArgs.Body);
          Console.WriteLine("Message received: " + message);
          foreach (object headerKey in eventArgs.BasicProperties.Headers.Keys)
          {
            Console.WriteLine(headerKey + ": " + eventArgs.BasicProperties.Headers[headerKey]);
          }
    
          if (message == "quit")
            channel.BasicCancel(consumer.ConsumerTag);
        }
      }
    }

    Rather than handling our messages inline as we’ve done in previous examples, this example uses an extension method named StartConsume() which accepts a callback to be invoked each time a message is received.  Here’s the extension method used by our example:

    using System;
    using System.IO;
    using System.Threading;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Consumer
    {
      public static class ChannelExtensions
      {
        public static void StartConsume(this IModel channel, string queueName,  Action<IModel, DefaultBasicConsumer, BasicDeliverEventArgs> callback)
        {
          var consumer = new QueueingBasicConsumer(channel);
          channel.BasicConsume(queueName, true, consumer);
    
          while (true)
          {
            try
            {
              var eventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
              new Thread(() => callback(channel, consumer, eventArgs)).Start();
            }
            catch (EndOfStreamException)
            {
              // The consumer was cancelled, the model closed, or the connection went away.
              break;
            }
          }
        }
      }
    }

    Setting our solution to run both the Producer and Consumer applications upon startup, running our example produces output similar to the following:

    Producer

    Running for 10 seconds
    Time to complete: 08 seconds - Messages published: 2

    Consumer

    Message received: Hello, World!
    key1: 12345
    Message received: Hello, World!
    key1: 12345

    That concludes our headers exchange example as well as the RabbitMQ for Windows series.  For more information on working with RabbitMQ, see the documentation athttp://www.rabbitmq.com or the purchase the book RabbitMQ in Action by Alvaro Videla and Jason Williams.  I hope you enjoyed the series.

     

    Footnotes:

    1 – See http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 andhttp://hg.rabbitmq.com/rabbitmq-dotnet-client/diff/4def852523e2/projects/client/RabbitMQ.Client/src/client/impl/WireFormatting.csfor supported field types.


  • 相关阅读:
    网页表格或div层在网页中被撑开解决之道
    jquery把给定的json自动生成多级下拉框
    jquery理想菜单实现(显示全国省市区分级效果)
    正则表达式记录
    jQuery自定义插件
    js数组及其常用方法
    vue自定义组件
    GET和POST
    可变对象和不可变对象
    js 不同元素的同一属性运动
  • 原文地址:https://www.cnblogs.com/puncha/p/3876955.html
Copyright © 2011-2022 走看看