zoukankan      html  css  js  c++  java
  • C#队列学习笔记:RabbitMQ实现客户端相互通讯

        一、引言

        fanout类型的Exchange,路由规则非常简单:它会把所有发送到该Exchange的消息,路由到所有与它绑定的Queue中。假设有一个聊天室,各个客户端都订阅在同一fanout exchange type,那每个客户端发送出来的消息,所有的客户端都能收到,因为大家都订阅了。此时,只需要简单地限制一下,只有是与我有关的消息,才在聊天界面上显示。这样,即可达到相互通讯的效果。

        二、示例

        2.1、环境准备

        本示例使用EasyNetQ来实现,请先在NuGet上安装。

        2.2、实体类

        新建一个实体类MessageBody:

        public class MessageBody
        {
            public string FromUserId { get; set; }
            public string Message { get; set; }
            public string ToUserId { get; set; }
        }

        2.3、主窗体

        新建一个ChatMain窗体:

        代码如下:

        public partial class ChatMain : Form
        {
            public ChatMain()
            {
                InitializeComponent();
            }
    
            /// <summary>
            /// 客户端 A
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void Button1_Click(object sender, EventArgs e)
            {
                ChatWith chatWith = new ChatWith(currentUserId: "UserA")
                {
                    StartPosition = FormStartPosition.CenterScreen
                };
                chatWith.Show();
            }
    
            /// <summary>
            /// 客户端 B
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void Button2_Click(object sender, EventArgs e)
            {
                ChatWith chatWith = new ChatWith(currentUserId: "UserB")
                {
                    StartPosition = FormStartPosition.CenterScreen
                };
                chatWith.Show();
            }
    
            /// <summary>
            /// 客户端 C
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void Button3_Click(object sender, EventArgs e)
            {
    
                ChatWith chatWith = new ChatWith(currentUserId: "UserC")
                {
                    StartPosition = FormStartPosition.CenterScreen
                };
                chatWith.Show();
    
            }
    
            /// <summary>
            /// 客户端 D
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void Button4_Click(object sender, EventArgs e)
            {
                ChatWith chatWith = new ChatWith(currentUserId: "UserD")
                {
                    StartPosition = FormStartPosition.CenterScreen
                };
                chatWith.Show();
            }
        }
    View Code

        2.4、客户端窗体

        新建一个ChatWith窗体:

        代码如下:

        public partial class ChatWith : Form
        {
            public delegate void ChatWithDelegate();
            public delegate void ChatWithDelegate<T1>(T1 obj1);
            public delegate void ChatWithDelegate<T1, T2>(T1 obj1, T2 obj2);
    
            public string CurrentUserId { get; }
    
            private IBus bus;
            public const string ConnStringMQ = "host=192.168.2.242:5672,192.168.2.165:5672;virtualHost=/;username=hello;password=world";
            public const string FanoutExchange = "fanoutEC";
    
            /// <summary>
            /// 有参构造函数
            /// </summary>
            /// <param name="currentUserId"></param>
            public ChatWith(string currentUserId)
            {
                InitializeComponent();
    
                //在多线程程序中,新创建的线程不能访问UI线程创建的窗口控件。
                //此时若想访问窗体的控件,可将窗体构造函数中的CheckForIllegalCrossThreadCalls设置为false。
                //这时线程就能安全地访问窗体控件了。
                CheckForIllegalCrossThreadCalls = false;
    
                CurrentUserId = currentUserId;
            }
    
            /// <summary>
            /// ShowMessage重载
            /// </summary>
            /// <param name="msg"></param>
            private void ShowMessage(string msg)
            {
                if (InvokeRequired)//InvokeRequired:当前线程不是创建控件的线程时为true
                {
                    BeginInvoke(new ChatWithDelegate<string>(ShowMessage), msg);
                }
                else
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg });
                    lvwReceiveMsg.Items.Insert(0, item);
                }
            }
    
            /// <summary>
            /// ShowMessage重载
            /// </summary>
            /// <param name="toUserId"></param>
            /// <param name="msg"></param>
            private void ShowMessage(string toUserId, string msg)
            {
                if (InvokeRequired)
                {
                    BeginInvoke(new ChatWithDelegate<string, string>(ShowMessage), toUserId, msg);
                }
                else
                {
                    ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), toUserId, msg });
                    lvwReceiveMsg.Items.Insert(0, item);
                }
            }
    
            /// <summary>
            /// 绑定队列并订阅
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void ChatWith_Load(object sender, EventArgs e)
            {
                cmbOnLine.SelectedIndex = 0;
                Text = Text + $"[{CurrentUserId}]";
    
                //这里不能使用using,否则订阅者立即就释放了,订阅不到消息。
                bus = RabbitHutch.CreateBus(ConnStringMQ);
                {
                    if (bus.IsConnected)
                    {
                        var exchange = bus.Advanced.ExchangeDeclare(name: FanoutExchange, type: ExchangeType.Fanout);
                        var queue = bus.Advanced.QueueDeclare(name: $"{FanoutExchange}_queue_{CurrentUserId}");
                        bus.Advanced.Bind(exchange: exchange, queue: queue, routingKey: "");
    
                        bus.Advanced.Consume(queue, registration =>
                        {
                            registration.Add<MessageBody>((message, info) =>
                            {
                                if (message.Body.ToUserId == CurrentUserId)
                                {
                                    ShowMessage(message.Body.FromUserId, message.Body.Message);
                                }
                            });
                        });
                    }
                    else
                    {
                        ShowMessage("服务器连接失败。");
                    }
                }
            }
    
            /// <summary>
            /// 发送
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void btnSend_Click(object sender, EventArgs e)
            {
                try
                {
                    using (var bus = RabbitHutch.CreateBus(ConnStringMQ))
                    {
                        if (bus.IsConnected)
                        {
                            if (cmbOnLine.Text == "*")//群发
                            {
                                foreach (var item in cmbOnLine.Items.Cast<string>().Where(s => s != "*" && s != CurrentUserId))
                                {
                                    var exchange = bus.Advanced.ExchangeDeclare(name: FanoutExchange, type: ExchangeType.Fanout);
                                    var messageBody = new MessageBody
                                    {
                                        FromUserId = CurrentUserId,
                                        Message = txtSendMsg.Text,
                                        ToUserId = item
                                    };
                                    bus.Advanced.Publish(exchange: exchange,
                                        routingKey: "",
                                        mandatory: false,
                                        message: new Message<MessageBody>(messageBody));
                                }
                            }
                            else//私聊
                            {
                                var exchange = bus.Advanced.ExchangeDeclare(name: FanoutExchange, type: ExchangeType.Fanout);
                                var messageBody = new MessageBody
                                {
                                    FromUserId = CurrentUserId,
                                    Message = txtSendMsg.Text,
                                    ToUserId = cmbOnLine.Text
                                };
                                bus.Advanced.Publish(exchange: exchange,
                                    routingKey: "",
                                    mandatory: false,
                                    message: new Message<MessageBody>(messageBody));
                            }
                        }
                        else
                        {
                            ShowMessage("发送消息失败。");
                        }
                    }
                }
                catch (Exception ex)
                {
                    ShowMessage(ex.Message);
                }
            }
    
            /// <summary>
            /// 关闭
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void btnClose_Click(object sender, EventArgs e)
            {
                Close();
            }
    
            /// <summary>
            /// 窗体关闭事件
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void ChatWith_FormClosed(object sender, FormClosedEventArgs e)
            {
                bus?.Dispose();
            }
        }
    View Code

        2.5、运行结果

  • 相关阅读:
    spring4之依赖注入的三种方式
    Hibernate之总结
    Hibernate之dynamic-update
    ThinkPhp调用webservice
    Robot Framework:Web自动化之-元素处理
    Robot Framework:Web自动化之-元素定位
    RobotFramework:python+robotframework+selenium2library测试环境部署说明文档
    Robot Framework:Httplibrary库
    URL备忘
    Windows:CMD命令备忘
  • 原文地址:https://www.cnblogs.com/atomy/p/12665075.html
Copyright © 2011-2022 走看看