zoukankan      html  css  js  c++  java
  • RabbitMQRPC 官方demo

    public class RPCServer
        {
            public static void Test()
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using(var conn = factory.CreateConnection())
                using(var channel = conn.CreateModel())
                {
                    channel.QueueDeclare(queue: "rpc_queue",
                        durable: false,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null
                        );
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue: "rpc_queue",
                        noAck: false,
                        consumer: consumer
                        );
                    Console.WriteLine(" [x] Awaiting RPC requests");
                    while(true)
                    {
                        string response = null;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var body = ea.Body;
                        var props = ea.BasicProperties;
                        var replyPros = channel.CreateBasicProperties();
                        replyPros.CorrelationId = props.CorrelationId;
    
                        try
                        {
                            var msg = Encoding.UTF8.GetString(body);
                            int n = Int32.Parse(msg);
                            Console.WriteLine(" [.] fib({0})", msg);
                            response = fib(n).ToString();
                        }catch(Exception e)
                        {
                            Console.WriteLine(" [.] " + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo
                                , basicProperties: replyPros, body: responseBytes
                                );
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        }
                    }
                }
            }
            private static int fib(int n)
            {
                if (n == 0 || n == 1)
                {
                    return n;
                }
    
                return fib(n - 1) + fib(n - 2);
            }
        }

        public class RabbitMQRPCClient
        {
            private IConnection conn;
            private IModel channel;
            private string replyQueueName;
            private QueueingBasicConsumer consumer;
            public RabbitMQRPCClient()
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                conn = factory.CreateConnection();
                channel = conn.CreateModel();
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: replyQueueName,
                    noAck: true, consumer: consumer);
            }
            public string Call(string msg)
            {
                var corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                props.ReplyTo = replyQueueName;
                props.CorrelationId = corrId;
                var msgBytes = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish(exchange: ""
                    , routingKey: "rpc_queue",
                    basicProperties: props, body: msgBytes);
                while(true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    if(ea.BasicProperties.CorrelationId == corrId)
                    {
                        return Encoding.UTF8.GetString(ea.Body);
                    }
                }
            }
            public void Close()
            {
                conn.Close();
            }
        }
        public class RPCClientTest
        {
            public static void Test()
            {
                var rpcClient = new RabbitMQRPCClient();
                Console.WriteLine(" [x] Requesting fib(30)");
                var response = rpcClient.Call("30");
                Console.WriteLine(" [.] Got '{0}'", response);
                rpcClient.Close();
            }
        }
  • 相关阅读:
    CSS 浮动
    函数defined
    SVN提交冲突
    抽象方法和抽象类
    Teco测试总结
    面向对象三大特性(封装,继承,多态)之多态
    PHP工厂模式的简单实现
    DOM 访问节点
    c程序的执行过程
    PHP实现菱形与杨辉三角形【php趣味案例】
  • 原文地址:https://www.cnblogs.com/ly7454/p/5367022.html
Copyright © 2011-2022 走看看