zoukankan      html  css  js  c++  java
  • WinForm实现Rabbitmq官网6个案例-RPC

    获取源码

    客户端代码:

    namespace RabbitMQDemo
    {
        public partial class RPC : Form
        {
            private readonly static RPC _RPC;
            Action<string, TextBox> SetText;
            static RPC()
            {
                _RPC = new RPC();
            }
            /// <summary>
            /// 单例模式
            /// </summary>
            public static RPC SingleForm { get { return _RPC; } }
            private RPC()
            {
                CheckForIllegalCrossThreadCalls = false;
                InitializeComponent();
            }
    
            private void btnSendMsg_Click(object sender, EventArgs e)
            {//RPC客户端发出请求
                string message = txtPublisher.Text;
                if (message.Trim().Length <= 0)
                {
                    MessageBox.Show("请输入要发送的消息");
                }
                RpcClient client = new RpcClient();
                var response = client.Call(message);
                txtRpcClient.Text += string.Format("{0}
    ", response);
                client.Close();
            }
    
            /// <summary>
            /// 客户端类
            /// </summary>
            private class RpcClient
            {
                #region 参数
                /// <summary>
                /// rabbitmq连接
                /// </summary>
                private readonly IConnection connection;
                /// <summary>
                /// 通道
                /// </summary>
                private readonly IModel channel;
                /// <summary>
                /// 客户端关联的队列
                /// </summary>
                private readonly string replyQueueName;
                /// <summary>
                /// 消费者
                /// </summary>
                private readonly EventingBasicConsumer consumer;
                //private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>();
    
                private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>();
                /// <summary>
                /// 消息属性
                /// </summary>
                private readonly IBasicProperties props;
                #endregion
                /// <summary>
                /// 构造函数
                /// </summary>
                public RpcClient()
                {
                    var factory = new ConnectionFactory() { HostName = "localhost" };
                    connection = factory.CreateConnection();
                    channel = connection.CreateModel();
                    replyQueueName = channel.QueueDeclare().QueueName;
                    consumer = new EventingBasicConsumer(channel);
                    props = channel.CreateBasicProperties();
                    //关联response,request和replyQueueName
                    var correlationID = Guid.NewGuid().ToString();
                    props.CorrelationId = correlationID;
                    props.ReplyTo = replyQueueName;
    
                    consumer.Received += (model, ea) =>
                      {
                          var response = Encoding.UTF8.GetString(ea.Body);
                          //确定返回的响应是这个请求发出的
                          if (ea.BasicProperties.CorrelationId == correlationID)
                              resQueue.Add(response);
                      };
                }
    
    
                public string Call(string msg)
                {
                    var msgBytes = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish(exchange: "",
                        routingKey: "rpc_queue",
                        basicProperties: props,
                        body: msgBytes);
    
                    channel.BasicConsume(
                        consumer: consumer,
                        queue: replyQueueName,
                        noAck: true);
    
                    return resQueue.Take();
                }
    
                public void Close()
                {
                    connection.Close();
                }
            }//class
        }
    }
    View Code

    服务端代码:

    namespace RpcServer
    {
        public partial class RpcServer : Form
        {
            private readonly static RpcServer _RpcServer;
            Action<string, TextBox> SetText;
            static RpcServer()
            {
                _RpcServer = new RpcServer();
            }
            /// <summary>
            /// 单例模式
            /// </summary>
            public static RpcServer SingleForm { get { return _RpcServer; } }
            private RpcServer()
            {
                CheckForIllegalCrossThreadCalls = false;
                InitializeComponent();
                ReceiveMsg(txtRpcServer);//服务端
                SetText += OnSetText;
            }
    
            /// <summary>
            /// 服务端接收消息
            /// </summary>
            private void ReceiveMsg(TextBox box)
            {
                try
                {
                    var factory = new ConnectionFactory() { HostName = "localhost" };
                    var connection = factory.CreateConnection();
                    var channel = connection.CreateModel();
    
                    //声明队列
                    channel.QueueDeclare(queue: "rpc_queue",
                        durable: false,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
    
                    //每个消费者最多消费一条消息,没返回消息确认之前不再接收消息
                    channel.BasicQos(0, 1, false);
    
                    var consumer = new EventingBasicConsumer(channel);
    
                    consumer.Received += (model, ea) =>
                    {
                        string response = null;
                        var body = ea.Body;
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;
                        var msg = Encoding.UTF8.GetString(body);
                        //服务端显示内容
                        box.Invoke(SetText, msg, box);
                        response = "我将给你回复:已收到消息-" + msg;
    
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "",
                            routingKey: props.ReplyTo,
                            basicProperties: replyProps,
                            body: responseBytes);
                        //手动向rabbitmq发送消息确认
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                            multiple: false);
                    };
                    channel.BasicConsume(queue: "rpc_queue",
                        noAck: false,//手动确认消息
                        consumer: consumer);
                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.ToString());
                }
            }
    
            private void OnSetText(string text, TextBox box)
            {
                box.Text += string.Format("{0}
    ", text);
            }
        }
    }
    View Code

    界面:

    大概流程:

    客户端模拟发送一个请求到队列,服务端从队列消费消息并模拟发送一个响应到队列,客户端消费该消息(新建2个winform程序测试,一个客户端,一个服务端)

    vs同时启动两个winform程序:鼠标点击解决方案-右键属性-多个启动项目-操作改为启动-确定-即可

    测试结果:

     

  • 相关阅读:
    Windows下 Mysql忘记密码的修改方法
    java 多文件压缩下载
    MySQL配置文件中jdbc.url
    java实现excle文件上传,解析
    MySQL事务
    重拾MySQL
    Linux中修改MySql配置文件
    Freemarker模板引擎
    过滤器、监听器、拦截器
    XML基础
  • 原文地址:https://www.cnblogs.com/zhyue93/p/rabbitmq-rpc.html
Copyright © 2011-2022 走看看