zoukankan      html  css  js  c++  java
  • RabbitMQ介绍4

    C#终端的说明文档: http://www.rabbitmq.com/dotnet-api-guide.html

    这里介绍使用RabbitMQ的几种典型场景。

    1. 简单direct模式( http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html)。生产者发送消息到一个队列,消费者从队列读取消息。这是最简单的使用场景,下面的代码使用默认exchange,消息自动确认。注意后台接收消息的线程完成前不要关闭连接,这里消费者是通过Console.ReadLine();保证连接不会Dispose。

    (P) -> [|||] -> (C)

    生产者:

    public static void test1()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    消费者

    public static void test1()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);
            };
            channel.BasicConsume(queue: "hello",
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    2. 多个消费者连接队列(http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html)。使用场景:生产者发送比较耗时的任务,多个消费者从队列获取任务,完成计算。优点:方便系统扩展,添加消费者即可增加系统的负载能力,通过显式消息确认、prefetch消息量的控制,可以实现多个消费者之间的负载均衡。prefetch N的意思是在消费者确认前,只发送N个消息,也就是等待确认的消息最多N个(默认不设,队列的消息会全部发给消费者,然后等待确认),只要客户端的连接保持,便不会重发,如果连接中断,消息还是没有确认,则会重新发送。

    生产者代码和前一个例子类似,这里只给出消费者代码

    public static void test2()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);


            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 10000);

                Console.WriteLine(" [x] Done");

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "task_queue",
                                 noAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    3. 订阅发布模式(http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html)。exchange使用fanout模式。注意下面代码创建了命名队列(通过队列名可以方便标识消费者),如果队列不需要持久化,也可以使用临时队列(queueName = channel.QueueDeclare().QueueName;),RabbitMQ为队列分配一个唯一标识,消费者断开后会自动删除队列。

    生产者:

    public static void test3(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("logs", "fanout");

            for (int i = 0; i < args.Length; i++)
            {
                var message = "task_queue_t3";
                var body = Encoding.UTF8.GetBytes(message);

                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);

                channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: properties, body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }

        }
    }

    消费者:

    public static void test3()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare("logs", ExchangeType.Fanout);

            string queueName = "task_queue_t3";

            Console.WriteLine(Encoding.UTF8.GetBytes(queueName).Length);

            channel.QueueDeclare(queue: queueName,
                 durable: true,
                 exclusive: true,
                 autoDelete: true,
                 arguments: null);
            //queueName = channel.QueueDeclare().QueueName;

            channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine(" [x] Done");

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    4. 路由(http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html)。exchange使用Direct模式,通过binding key 精确匹配routing key选择消息路由。可以有多个binding key。

    生产者:

    channel.ExchangeDeclare("direct_logs", "direct");

    channel.BasicPublish(exchange: "direct_logs", routingKey: "info", basicProperties: properties, body: body);

    消费者:

    channel.ExchangeDeclare("direct_logs", "direct");

    string queueName = channel.QueueDeclare().QueueName;

    channel.QueueBind(queueName, "direct_logs", "info");
    channel.QueueBind(queueName, "direct_logs", "error");

    5. Topic模式(http://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html)。exchange使用Topic模式,可以实现模糊匹配。

    生产者:

    channel.ExchangeDeclare("topic_logs", ExchangeType.Topic);

    channel.BasicPublish(exchange: "topic_logs", routingKey: "lazy.green.cat", basicProperties: properties, body: body);

    消费者:

    channel.ExchangeDeclare("topic_logs", "topic");

    string queueName = "Q2";
    channel.QueueDeclare(queue: queueName,
         durable: true,
         exclusive: true,
         autoDelete: true,
         arguments: null);

    channel.QueueBind(queueName, "topic_logs", "*.*.rabbit");
    channel.QueueBind(queueName, "topic_logs", "lazy.#");

    6. 远程过程调用RPC(http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html)。实现客户和服务之间的通信。一次通信过程如下:

    • 客户端发送请求消息。客户端发送请求时通过reply_to指定回复的地址(回复的queue,由于RPC的特殊性,我们使用默认的exchange来做路由,这时候queue名字便是路由键)和correlation_id(标记发送的消息,回复的时候通过这个ID来确认回复的是哪个请求)。在reply_to的queue上等待回复。
    • 服务器收到请求,处理后发送回复到默认exchange,用请求消息的reply_to做路由键,这样回复便发到了reply_to指定的queue。设置的回复消息的correlation_id=请求消息的correction_id。
    • 客户端在reply_to的队列上收到回复。
    • 通过ack和prefetch N控制同步。

    RPCServer端代码:

    public static void RPCServer()
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "rpc_queue",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue: "rpc_queue",
                                         noAck: false,
                                         consumer: consumer);
                    Console.WriteLine(" [x] Awaiting RPC requests");

                    while (true)
                    {
                        string response = null;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;

                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            int n = int.Parse(message);
                            Console.WriteLine(" [.] fib({0})", message);
                            response = "abc"; //do time consuming work here
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(" [.] " + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "",
                                                 routingKey: props.ReplyTo,
                                                 basicProperties: replyProps,
                                                 body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                             multiple: false);
                        }
                    }
                }
            }

    客户端代码:

    class RPCClient
        {
            private IConnection connection;
            private IModel channel;
            private string replyQueueName;
            private QueueingBasicConsumer consumer;

            public RPCClient()
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: replyQueueName,
                                     noAck: true,
                                     consumer: consumer);
            }

            public string Call(string message)
            {
                var corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                props.ReplyTo = replyQueueName;
                props.CorrelationId = corrId;

                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "",
                                     routingKey: "rpc_queue",
                                     basicProperties: props,
                                     body: messageBytes);

                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == corrId)
                    {
                        return Encoding.UTF8.GetString(ea.Body);
                    }
                }
            }

            public void Close()
            {
                connection.Close();
            }
        }

    //测试

    public static void RPCTest()
    {
        var rpcClient = new RPCClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");
        Console.WriteLine(" [.] Got '{0}'", response);

        rpcClient.Close();
    }

  • 相关阅读:
    如何保证消息不被重复消费?
    接口幂等性实现
    JVM 线上故障排查基本操作
    对于Arraylist 的一些疑问
    递归思想与递归编程
    linux配置javaJDK
    python数据分析-pandas常用方法
    python 数据分析-pandas数据结构
    python数据分析-numpy 矩阵操作
    python数据分析-numpy数组操作
  • 原文地址:https://www.cnblogs.com/lingshf/p/5193502.html
Copyright © 2011-2022 走看看