zoukankan      html  css  js  c++  java
  • SignalR与ActiveMQ结合构建实时通信

    一、概述

    本教程主要阐释了如何利用SignalR与消息队列的结合,实现不同客户端的交互

    • SignalR如何和消息队列交互(暂使用ActiveMQ消息队列)
    • SignalR寄宿在web中和其他SignalR、控制台客户端交互。
    • SignalR单独寄宿在控制台中和其他SignalR、控制台客户端交互。

    下面屏幕截图展示了各个客户端通过ActiveMQ相互通信

      1、SignalR寄宿在web:

      2、SignalR寄宿在控制台中,web客户端调用SignalR,读者自行测试。

     工程目录:

    一、创建项目

      1、创建生产者项目,该项目要是通过控制台输入消息,发送到消息队列

        创建控制台应用程序命名为ActiveMQNetProcucer,然后用包管理器安装ActiveMQ的.Net客户端

        Install-Package Apache.NMS.ActiveMQ

        主要代码如下:

     1 using Apache.NMS;
     2 using Apache.NMS.ActiveMQ;
     3 using System;
     4 using System.Collections.Generic;
     5 using System.Linq;
     6 using System.Text;
     7 using System.Threading.Tasks;
     8 namespace ActiveMQNet
     9 {
    10     class Program
    11     {
    12         static IConnectionFactory _factory = null;
    13         static IConnection _connection = null;
    14         static ITextMessage _message = null;
    15 
    16         static void Main(string[] args)
    17         {
    18             //创建工厂
    19             _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
    20 
    21             try
    22             {
    23                 //创建连接
    24                 using (_connection = _factory.CreateConnection())
    25                 {
    26                     //创建会话
    27                     using (ISession session = _connection.CreateSession())
    28                     {
    29                         //创建一个主题
    30                         IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
    31 
    32                         //创建生产者
    33                         IMessageProducer producer = session.CreateProducer(destination);
    34 
    35                         Console.WriteLine("Please enter any key to continue! ");
    36                         Console.ReadKey();
    37                         Console.WriteLine("Sending: ");
    38 
    39                         //创建一个文本消息
    40                         _message = producer.CreateTextMessage("Hello AcitveMQ....");
    41 
    42                         //发送消息
    43                         producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
    44                         while (true)
    45                         {
    46                             var msg = Console.ReadLine();
    47                             _message = producer.CreateTextMessage(msg);
    48                             producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
    49                         }
    50                        
    51                     }
    52                 }
    53 
    54             }
    55             catch (Exception ex)
    56             {
    57                 Console.WriteLine(ex.ToString());
    58             }
    59 
    60             Console.ReadLine();
    61 
    62         }
    63     }
    64 }
    View Code

       2、创建消费者项目,该项目主要是订阅消息队列中的消息  

        创建控制台应用程序命名为ActiveMQNetCustomer,然后用包管理器安装ActiveMQ的.Net客户端

        Install-Package Apache.NMS.ActiveMQ

        主要代码:

     1 using Apache.NMS;
     2 using Apache.NMS.ActiveMQ;
     3 using System;
     4 using System.Collections.Generic;
     5 using System.Linq;
     6 using System.Text;
     7 using System.Threading.Tasks;
     8 
     9 namespace ActiveMQNetCustomer
    10 {
    11     class Program
    12     {
    13         static IConnectionFactory _factory = null;
    14 
    15         static void Main(string[] args)
    16         {
    17             try
    18             {
    19                 //创建连接工厂
    20                 _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
    21                 //创建连接
    22                 using (IConnection conn = _factory.CreateConnection())
    23                 {
    24                     //设置客户端ID
    25                    // conn.ClientId = "Customer";
    26                     conn.Start();
    27                     //创建会话
    28                     using (ISession session = conn.CreateSession())
    29                     {
    30                         //创建主题
    31                         var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
    32 
    33                         //创建消费者
    34                         IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);
    35 
    36                         //注册监听事件
    37                         consumer.Listener += new MessageListener(consumer_Listener);
    38 
    39                         //这句代码非常重要,
    40                         //这里没有read方法,Session会话会被关闭,那么消费者将监听不到生产者的消息
    41                         Console.Read();
    42                     }
    43 
    44                     //关闭连接
    45                     conn.Stop();
    46                     conn.Close();
    47                 }
    48 
    49             }
    50             catch (Exception ex)
    51             {
    52                 Console.Write(ex.ToString());
    53             }
    54 
    55         }
    56 
    57         /// <summary>
    58         /// 消费监听事件
    59         /// </summary>
    60         /// <param name="message"></param>
    61         static void consumer_Listener(IMessage message)
    62         {
    63             ITextMessage msg = (ITextMessage)message;
    64             Console.WriteLine("Receive: " + msg.Text);
    65         }
    66     }
    67 }
    View Code

       3、创建包装ActiveMQ生产者和消费者项目,供SignalR.ActiveMQ.WebHost项目使用,来发布消息和订阅消息

        创建类库项目Signalr.ActiveMQ,然后用包管理器安装ActiveMQ的.Net客户端

        Install-Package Apache.NMS.ActiveMQ

        主要代码;

        生产者类:创建单实例生产者对象调用Send发放,发送消息到ActiveMQ消息队列    

    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Signalr.ActiveMQ
    {
      public  class Procucer
        {
            private IMessageProducer producer;
            private static Procucer instance=null;
            private Procucer(string customerId,string address)
            {
                instance = this;
                //创建工厂
                IConnectionFactory _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
    
                try
                {
                    //创建连接
                    IConnection _connection = _factory.CreateConnection();
                    {
                        //创建会话
                        ISession session = _connection.CreateSession();
                        {
                            //创建一个主题
                            IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
    
                            //创建生产者
                            producer = session.CreateProducer(destination);
    
                            Console.WriteLine("Please enter any key to continue! ");
                          //  Console.ReadKey();
                            Console.WriteLine("Sending: ");                      
    
                        }
                    }
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.ToString());
                }
    
                //Console.ReadLine();
            }
    
            public static Procucer GetInstance(string customerId="",string address= "tcp://127.0.0.1:61616/")
            {
                if (instance == null)
                    instance = new Procucer(customerId, address);
                return instance;
            }
    
            public void Send(string msg)
            {
                //创建一个文本消息
                ITextMessage _message = producer.CreateTextMessage(msg);
                //发送消息
                producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
            }
        }
    }
    View Code

         消费者类:启用单独的线程监听消息队列中的消息,当监听到消息后 广播给所有的 SinglaR客户端,其中静态属性Clients保存了所有的SinglaR客户端,当SinglaR客户端连接或者断开的时候会更新Clients属性详细代码在SignalR.ActiveMQ.WebHost中的 MyHub文件中。为了阻止当前线程退出调用了 System.Threading.Thread.CurrentThread.Join();阻塞当前线程,避免当web中方法执行完毕后对象被回收,起不到监听消息队列的作用。

    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using Microsoft.AspNet.SignalR.Hubs;
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Web;
    
    namespace SignalR.ActiveMQ
    {
        public class Customer
        {
            private static object lockObj = new object();
            private static IHubCallerConnectionContext<dynamic> _clients;
            public static IHubCallerConnectionContext<dynamic> Clients
            {
                get { return _clients; }
                set
                {
                    lock (lockObj)
                    {
                        _clients = value;
                    }
                }
            }
            public static void Run(string cutomerId="",string address= "tcp://127.0.0.1:61616/")
            {
                System.Threading.Thread t = new System.Threading.Thread(() =>
                {
                    try
                    {
                        //创建连接工厂
                        IConnectionFactory _factory = new ConnectionFactory(address);
                        //创建连接
                        using (IConnection conn = _factory.CreateConnection())
                        {
                            //设置客户端ID
                            conn.ClientId = cutomerId;
                            conn.Start();
                            //创建会话
                            using (ISession session = conn.CreateSession())
                            {
                                //创建主题
                                var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
    
                                //创建消费者
                                IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);
    
                                //注册监听事件
                                consumer.Listener += new MessageListener(consumer_Listener);
    
                                //阻塞当前线程,监听消息
                                System.Threading.Thread.CurrentThread.Join();
                            }
                            //关闭连接
                            conn.Stop();
                            conn.Close();
                        }
    
                    }
                    catch (Exception ex)
                    {
                        Debug.WriteLine(ex.ToString());
                        Console.WriteLine(ex.ToString());
                    }
                   
                });
    
                t.Start();
            }
            static void consumer_Listener(IMessage message)
            {
                ITextMessage msg = (ITextMessage)message;
                if (Clients != null)
                {
                    Clients.All.broadcastMessage(msg.Text);
                }
                Debug.WriteLine("Receive: " + msg.Text);
                Console.WriteLine("Receive: " + msg.Text);
            }
        }
    }
    View Code

       4、创建web自宿主的SignalR项目,该项目既发布消息,也订阅消息

        创建MVC项目SignalR.ActiveMQ.WebHost,然后用包管理器安装ActiveMQ的.Net客户端

        Install-Package Apache.NMS.ActiveMQ

        创建SignalR的hub:当有客户端连接或者断开的时候更新Customer.Clients 静态属性,保存所有的SignalR客户端。

        web端通过调用代理的Send方法发送消息到消息队列。

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Web;
    using Microsoft.AspNet.SignalR;
    using Signalr.ActiveMQ;
    using System.Threading.Tasks;
    
    namespace SignalR.ActiveMQ.Sample.Signal.Class
    {   
        public class chatHub : Hub
        {
            public void Send(string clientName, string message)
            {
                Procucer.GetInstance().Send(message);            
            }
            public override Task OnConnected()
            {
                Customer.Clients = this.Clients;
                return base.OnConnected();
            }
    
            public override Task OnDisconnected(bool stopCalled)
            {
                Customer.Clients = this.Clients;
                return base.OnDisconnected(stopCalled);
            }
        }
    }
    View Code

        Startup类中启动消费者监听线程,调用的项目Signalr.ActiveMQ中的Customer.Run()方法:

    using Microsoft.AspNet.SignalR;
    using Microsoft.Owin;
    using Owin;
    using SignalR.ActiveMQ;
    
    [assembly: OwinStartupAttribute(typeof(SignalR.ActiveMQ.Sample.Startup))]
    namespace SignalR.ActiveMQ.Sample
    {
        public partial class Startup
        {
            public void Configuration(IAppBuilder app)
            {          
                app.MapSignalR();
             
                Customer.Run();//启动消费者监听线程
            }
        }
    }
    View Code

     二、启动顺序:

    1、启动ActiveMQ程序 可参考  http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html

    2、启动ActiveMQNetProcucer项目

    3、ActiveMQNetCustomer项目

    4、启动SignalR.ActiveMQ.WebHost,开多个浏览器窗口,模拟多个SignalR客户端 

     三、SignalR宿主和web客户端分离两个项目 

    Signalr.ActiveMQ.SelfHost 用控制台寄宿SignalR提供的服务供Signalr.ActiveMQ.Web使用

    Signalr.ActiveMQ.Web 通过chart.html调用Signalr.ActiveMQ.SelfHost的服务 

    Signalr.ActiveMQ.SelfHost 和SignalR.ActiveMQ.WebHost不能同时启动,现在两个项目绑定到了同一个端口。

    四、测试

      在生产者窗口中输入消息回车,观察其他客户端的变化

         在Singlar的web客户端发送消息,观察其他客户端的变化

    源代码:https://github.com/zhaoyingju/SignalrActiveMQ.git

  • 相关阅读:
    BZOJ 1609: [Usaco2008 Feb]Eating Together麻烦的聚餐( LIS )
    BZOJ 1660: [Usaco2006 Nov]Bad Hair Day 乱发节( 单调栈 )
    BZOJ 1620: [Usaco2008 Nov]Time Management 时间管理( 二分答案 )
    BZOJ 1639: [Usaco2007 Mar]Monthly Expense 月度开支( 二分答案 )
    JAVA
    CodeForces-327A-Flipping Game
    Python学习笔记(九)- 变量进阶、函数进阶
    HDU6480-A Count Task-字符串+公式
    JQuery学习笔记(一)
    JAVA
  • 原文地址:https://www.cnblogs.com/zhyj/p/5071447.html
Copyright © 2011-2022 走看看