zoukankan      html  css  js  c++  java
  • RabbitMQ(三)

    官方的使用教程(测试运行)

    1、"Hello World!" -- 发送接收

    We're about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we provide a callback. That is what EventingBasicConsumer.Received event handler does.

    我们将告诉服务器从队列中发给我们消息。服务器异步地推送给我们消息,我们提供一个回调。这就是 EventingBasicConsumer.Received 事件处理程序做的事。

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Send_Simple
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "hello",
                                             durable: false,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
    
                        string message = "Hello World!";
                        var body = Encoding.UTF8.GetBytes(message);
    
                        channel.BasicPublish(exchange: "",
                                             routingKey: "hello",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Send {0}", message);
                    }
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
    Send_Simple
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Receive_Simple
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue: "hello",
                                             durable: false,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] Received {0}", message);
                        };
                        channel.BasicConsume(queue: "hello",
                                             noAck: true,
                                             consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    Receive_Simple

    官方测试用例:

    Receive_Simple.exe

    Send_Simple.exe

    结果:Receive 可以接受到消息

    2、Work Queues -- 单发多接,只有一个能接受到消息

    One of the advantages of using a Task Queue is the ability to easily parallelise work.

    简单地并行工作能力是使用 Task Queue 的优势之一。

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Send_Work_queues
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        // Message durability(消息持久机制),关键参数 durable
                        channel.QueueDeclare(queue: "task_queue",
                                             durable: true /* 持久化机制 */,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
    
                        var message = GetMessage(args);
                        var body = Encoding.UTF8.GetBytes(message);
    
                        // 如果 channel.QueueDeclare 中参数 durable 设置为 true,必须加上持久化语句
                        var properties = channel.CreateBasicProperties();
                        properties.SetPersistent(true);
    
                        channel.BasicPublish(exchange: "",
                                             routingKey: "task_queue",
                                             basicProperties: properties,
                                             body: body);
                        Console.WriteLine(" [x] Send {0}", message);
                    }
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
    
            private static string GetMessage(string[] args)
            {
                return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
            }
        }
    }
    Send_Work_queues
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Receive_Work_queues
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        // Message durability(消息持久机制),关键参数 durable
                        channel.QueueDeclare(queue: "task_queue",
                                             durable: true /* 如果 生产者 设置为 true,相应的 消费者 也需要设置为 true 才会生效 */,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
    
                        // Fair dispatch(公平触发机制)
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                        Console.WriteLine(" [*] Waiting for messages.");
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] Received {0}", message);
    
                            int dots = message.Split('.').Length - 1;
                            Thread.Sleep(dots * 1000);
    
                            Console.WriteLine(" [x] Done");
    
                            // 如果 channel.BasicConsume 中参数 noAck 设置为 false,必须加上消息确认语句
                            // Message acknowledgment(消息确认机制作用)
                            // consumer dies(its channel is closed, connection is closed, or TCP connection is lost)
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
                        channel.BasicConsume(queue: "task_queue", 
                                             noAck: false /* Message acknowledgment(消息确认机制) */, 
                                             consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    Receive_Work_queues

    官方测试用例:

    Receive_Work_queues.exe

    Receive_Work_queues.exe

    Send_Work_queues.exe

    结果:有且只有一个 Receive 可以接受到消息

    3、Publish/Subscribe -- 单发多接,每一个都可以接收到消息

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Send_Publish_Subscribe
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    
                        var message = GetMessage(args);
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "logs",
                                             routingKey: "",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
    
            private static string GetMessage(string[] args)
            {
                return ((args.Length > 0)
                       ? string.Join(" ", args)
                       : "info: Hello World!");
            }
        }
    }
    Send_Publish_Subscribe
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Receive_Publish_Subscribe
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    
                        // Temporary queues
                        var queueName = channel.QueueDeclare().QueueName;
                        // Bindings
                        channel.QueueBind(queue: queueName,
                                          exchange: "logs",
                                          routingKey: "");
    
                        Console.WriteLine(" [*] Waiting for logs.");
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [x] {0}", message);
                        };
                        channel.BasicConsume(queue: queueName,
                                             noAck: true,
                                             consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    Receive_Publish_Subscribe

    官方测试用例:

    Receive_Publish_Subscribe.exe

    Receive_Publish_Subscribe.exe

    Send_Publish_Subscribe.exe

    结果:两个 Receive 都可以接受到信息

    4、Routing -- 单发多接,满足路由规则的可以接收到消息

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Send_Routing
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
    
                        var severity = (args.Length > 0) ? args[0] : "info";
                        var message = (args.Length > 1)
                                      ? string.Join(" ", args.Skip(1).ToArray())
                                      : "Hello World!";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "direct_logs",
                                             routingKey: severity,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
                    }
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }
    Send_Routing
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Receive_Routing
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
    
                        var queueName = channel.QueueDeclare().QueueName;
    
                        if (args.Length < 1)
                        {
                            Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                                    Environment.GetCommandLineArgs()[0]);
                            Console.WriteLine(" Press [enter] to exit.");
                            Console.ReadLine();
                            Environment.ExitCode = 1;
                            return;
                        }
    
                        foreach (var severity in args)
                        {
                            channel.QueueBind(queue: queueName,
                                              exchange: "direct_logs",
                                              routingKey: severity);
                        }
    
                        Console.WriteLine(" [*] Waiting for messages.");
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            var routingKey = ea.RoutingKey;
                            Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
                        };
                        channel.BasicConsume(queue: queueName,
                                             noAck: true,
                                             consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    Receive_Routing

    官方测试用例:

    Receive_Routing.exe info

    Receive_Routing.exe warning

    Receive_Routing.exe error

    Receive_Routing.exe info warning error

    Send_Routing.exe error "Run. Run. Or it will explode."

    结果:只有 Receive_Routing.exe error 和 Receive_Routing.exe info warning error 可以接受到消息

    5、Topics -- 单发多接,匹配规则的可以接收到消息

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Send_Topics
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
    
                        var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
                        var message = (args.Length > 1)
                                      ? string.Join(" ", args.Skip(1).ToArray())
                                      : "Hello World!";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "topic_logs",
                                             routingKey: routingKey,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
                    }
                }
            }
        }
    }
    Send_Topics
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Receive_Topics
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
    
                        var queueName = channel.QueueDeclare().QueueName;
    
                        if (args.Length < 1)
                        {
                            Console.Error.WriteLine("Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0]);
                            Console.WriteLine(" Press [enter] to exit.");
                            Console.ReadLine();
                            Environment.ExitCode = 1;
                            return;
                        }
    
                        foreach (var bindingKey in args)
                        {
                            channel.QueueBind(queue: queueName,
                                              exchange: "topic_logs",
                                              routingKey: bindingKey);
                        }
    
                        Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            var routingKey = ea.RoutingKey;
                            Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
                        };
                        channel.BasicConsume(queue: queueName,
                                             noAck: true,
                                             consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    Receive_Topics

    官方测试用例:

    Receive_Topics.exe "#"

    Receive_Topics.exe "kern.*"

    Receive_Topics.exe "*.critical"

    Receive_Topics.exe "kern.*" "*.critical"

    Send_Topics.exe "kern.critical" "A critical kernel error"

    结果:每一个 Receive 都可以接收到一次消息

    6、RPC

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Send_RPC
    {
        class RPCClient
        {
            private IConnection connection;
            private IModel channel;
            private string replyQueueName;
            private QueueingBasicConsumer consumer;
    
            public RPCClient()
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: replyQueueName,
                                     noAck: true,
                                     consumer: consumer);
            }
    
            public string Call(string message)
            {
                var corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                props.ReplyTo = replyQueueName;
                props.CorrelationId = corrId;
    
                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "",
                                     routingKey: "rpc_queue",
                                     basicProperties: props,
                                     body: messageBytes);
    
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == corrId)
                    {
                        return Encoding.UTF8.GetString(ea.Body);
                    }
                }
            }
    
            public void Close()
            {
                connection.Close();
            }
        }
    
        class Program
        {
            static void Main(string[] args)
            {
                var rpcClient = new RPCClient();
    
                Console.WriteLine(" [x] Requesting fib(30)");
                var response = rpcClient.Call("30");
                Console.WriteLine(" [.] Got '{0}'", response);
    
                rpcClient.Close();
            }
        }
    }
    Send_RPC
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Receive_RPC
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "", UserName = "tangqun", Password = "123456" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "rpc_queue",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
    
                    // Fair dispatch(公平触发机制)
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                    var consumer = new QueueingBasicConsumer(channel);
    
                    channel.BasicConsume(queue: "rpc_queue",
                                         noAck: false,
                                         consumer: consumer);
    
                    Console.WriteLine(" [x] Awaiting RPC requests");
    
                    while (true)
                    {
                        string response = null;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    
                        var body = ea.Body;
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;
    
                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            int n = int.Parse(message);
                            Console.WriteLine(" [.] fib({0})", message);
                            response = fib(n).ToString();
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(" [.] " + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "",
                                                 routingKey: props.ReplyTo,
                                                 basicProperties: replyProps,
                                                 body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                             multiple: false);
                        }
                    }
                }
            }
    
            /// <summary>
            /// Assumes only valid positive integer input.
            /// Don't expect this one to work for big numbers,
            /// and it's probably the slowest recursive implementation possible.
            /// </summary>
            private static int fib(int n)
            {
                if (n == 0 || n == 1)
                {
                    return n;
                }
    
                return fib(n - 1) + fib(n - 2);
            }
        }
    }
    Receive_RPC

    官方测试用例:

    Receive_RPC.exe

    Send_RPC.exe

    Send_RPC.exe

    结果:两个 Send(correlationId) 都发送请求,每个 Send 只会打印对应 correlationId 请求的响应

  • 相关阅读:
    Java8初体验(二)Stream语法详解
    java8的新特性以及用法简介
    HDFS之SequenceFile和MapFile
    深入分析Parquet列式存储格式【转】
    Flume中的HDFS Sink配置参数说明【转】
    采用alluxio提升MR job和Spark job性能的注意点
    spark on alluxio和MR on alluxio测试(改进版)【转】
    python入门-分类和回归各种初级算法
    C++函数调用时的参数传递-3中传递方式
    OpenCV颜色空间——HLS颜色空间
  • 原文地址:https://www.cnblogs.com/tq1226112215/p/5885742.html
Copyright © 2011-2022 走看看