zoukankan      html  css  js  c++  java
  • 简易的集群通讯组件

    简易的集群通讯组件

    上一篇演示了泛型Hub的实现,微软于6月17日更新了SignalR 2.1.0,然后自带了泛型Hub,于是就不需要自己去实现了…(微软你为啥不早一个月自带啊…)。不过没关系,SignalR出彩之处不在泛型Hub,本篇为各位观众带来了基于SignalR的简易集群通讯组件Demo,可用于分布式定时任务。

    说到集群,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制于成员数量,Cluster用数量堆高可用性,HPC太复杂。本着SignalR的双向异步通讯的特点,其实是可以用来玩弹性计算的。初始状态由一台计算任务分发节点,一台监控以及一台计算节点构成。随着任务分发队列中的任务数越来越多,一台执行节点无法及时消耗待执行任务,达到某个阈值的时候,动态的加入一个计算节点来增加计算吞吐量。同样的,当队列中的任务基本处于很低的数量的时候,自动移除一个计算节点来减少资源消耗。当然,如果是大型的计算量之下,分发节点,队列都应该是集群的,还要考虑各种计算节点故障之类的问题,这不在本篇考虑的范畴内,本篇以初始状态模型来一步步实现简易集群通讯组件。

    好,废话不说了,正篇开始。

    任务分发节点

    image

    任务分发节点只有一个公开的行为,就是接受计算节点任务执行完成的消息。

    下面是实现。

    复制代码
    /// <summary>
        /// 集群交换器
        /// </summary>
        public class ClusterHub : Hub<IClusterClient>
        {
            /// <summary>
            /// 
            /// </summary>
            static ClusterHub()
            {
                aliveDictionary = new ConcurrentDictionary<string, Guid>();
            }
            
            /// <summary>
            /// 
            /// </summary>
            /// <param name="dispatcher"></param>
            public ClusterHub(IDispatcher dispatcher)
            {
                this.dispatcher = dispatcher;
                db = OdbFactory.Open(localDbFileName);
            }
    
            /// <summary>
            /// 本地数据库文件名
            /// </summary>
            const string localDbFileName = "ClusterStorage.dll";
    
            /// <summary>
            /// 监视器连接Id
            /// </summary>
            static string monitorConnectionId;
    
            /// <summary>
            /// 调度器
            /// </summary>
            IDispatcher dispatcher;
    
            /// <summary>
            /// 在线词典
            /// </summary>
            static ConcurrentDictionary<string, Guid> aliveDictionary;
    
            /// <summary>
            /// 
            /// </summary>
            static IOdb db;
    
            /// <summary>
            /// 完成任务
            /// </summary>
            /// <param name="jobResult"></param>
            public void Finished(Contracts.Messages.JobResultDto jobResult)
            {
                lock (db)
                {
                    var members = db.AsQueryable<MemberDo>();
                    var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id));
                    if (member != null)
                    {
                        member.UpdateStatisticsInfo(jobResult.ProcessedTime);
                        db.Store(member);
                        if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                        {
                            Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime });
                        }
                    }
                }
                Clients.Caller.RunJob(dispatcher.GetJobId());
            }
    
            /// <summary>
            /// 加入
            /// </summary>
            void Join()
            {
                object ip = string.Empty;
                var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor";
                Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip);
                lock (db)
                {
                    var members = db.AsQueryable<MemberDo>();
                    var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor);
                    if (member != null)
                    {
                        member.MemberStatusType = MemberStatusTypeEnum.Connectioned;
                    }
                    else
                    {
                        member = new MemberDo(ip.ToString(), isMonitor);
                        if (isMonitor)
                        {
                            monitorConnectionId = Context.ConnectionId;
                        }
                    }
                    db.Store(member);
    
                    aliveDictionary.TryAdd(Context.ConnectionId, member.Id);
                    if (!isMonitor)
                    {
                        if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                        {
                            Clients.Client(monitorConnectionId).MemberJoin(member.Id);
                        }
                        Clients.Caller.GetId(member.Id.ToString());
                        Clients.Caller.RunJob(dispatcher.GetJobId());
                    }
                }
            }
    
            /// <summary>
            /// 离开
            /// </summary>
            void Leave()
            {
                var id = Guid.Empty;
                aliveDictionary.TryRemove(Context.ConnectionId, out id);
                lock (db)
                {
                    var members = db.AsQueryable<MemberDo>();
                    var member = members.SingleOrDefault(m => m.Id == id);
                    if (member != null)
                    {
                        member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned;
                        db.Store(member);
                        if (member.IsMonitor)
                        {
                            monitorConnectionId = string.Empty;
                        }
                        else if (!string.IsNullOrWhiteSpace(monitorConnectionId))
                        {
                            Clients.Client(monitorConnectionId).MemberLeave(id);
                        }
                    }
                }
            }
    
            public override Task OnConnected()
            {
                Console.WriteLine(Context.ConnectionId+":Connected");
                Join();
                return base.OnConnected();
            }
    
            public override Task OnDisconnected()
            {
                Console.WriteLine(Context.ConnectionId + ":Disconnected");
                Leave();
                return base.OnDisconnected();
            }
    
            public override Task OnReconnected()
            {
                Console.WriteLine(Context.ConnectionId + ":Reconnected");
                return base.OnReconnected();
            }
        }
    复制代码

    ClusterHub承载着2种客户端角色的交互,计算节点和监控。

    这边采用了一个轻量级的基于C#开发的无引擎对象数据库来存储客户端信息。

    先说重载的部分:

    OnConnected - 当有客户端连接的时候,执行Join方法。

    OnDisconnected - 当有客户端离线的时候,执行Leave方法。

    然后是私有方法:

    Join - 根据QueryString来区分客户端类型是计算节点还是监视器,如果是计算节点,就直接通知监视器有成员加入,然后通过IDispatcher来获取任务Id,通知计算节点开始执行任务。

    Leave -  计算节点离线的时候通知监视器。

    公开方法:

    Finished - 计算节点完成任务后就调用该方法,Hub将计算的一些统计信息更新到本地存储,同时通知监视器更新计算结果。

    私有变量:

    IDispatcher– 任务调度器接口,由外部组件来负责具体的实现。

    计算节点

    image

    计算节点有两个行为:

    GetId - 获取节点身份。

    RunJob - 执行任务。

    复制代码
    /// <summary>
        /// 集群客户端
        /// </summary>
        public class ClusterClient
        {
            /// <summary>
            /// 
            /// </summary>
            /// <param name="jobProvider"></param>
            public ClusterClient(IJobProvider jobProvider)
            {
                this.jobProvider = jobProvider;
                url = ConfigurationManager.AppSettings["HubAddress"];
                var queryStrings = new Dictionary<string, string>();
                queryStrings.Add("ClientRole", "Normal");
                connection = new HubConnection(url, queryStrings);
                hubProxy = connection.CreateHubProxy(typeof(IClusterHub).GetCustomAttributes(typeof(DescriptionAttribute), false).OfType<DescriptionAttribute>().First().Description);
                InitClientEvents();
                connection.Start().Wait();
            }
    
            string url;
    
            HubConnection connection;
    
            IHubProxy hubProxy;
    
            IJobProvider jobProvider;
    
            string id;
    
            /// <summary>
            /// 
            /// </summary>
            void InitClientEvents()
            {
                hubProxy.On("GetId", (id) => GetId(id));
                hubProxy.On("RunJob", (jobId) => RunJob(jobId));
            }
    
            /// <summary>
            /// 执行任务
            /// </summary>
            /// <param name="id"></param>
            void GetId(string id)
            {
                this.id = id;
            }
    
            /// <summary>
            /// 执行任务
            /// </summary>
            /// <param name="jobId"></param>
            void RunJob(string jobId)
            {
                var startTime = DateTime.Now;
                jobProvider.Invoke(jobId);
                var stopTime = DateTime.Now;
                hubProxy.Invoke("Finished", new JobResultDto() { Id = id, JobId = jobId, ProcessedTime = (stopTime - startTime).TotalMilliseconds });
            }
        }
    复制代码

    客户端的实现很简单,核心就是通过构造函数注入任务提供接口,由接口通过任务Id来执行任务。

     

    监视器

    image

    监视器具有三个公开行为:

    MemberJoin - 计算节点加入

    MemberLeave - 计算节点离线

    UpdateMemberStatisticsInfo - 更新节点统计信息

    复制代码
    /// <reference path="jquery-2.1.1.js" />
    /// <reference path="jquery.signalR-2.1.0.js" />
    (function ($) {
    
        var members = [];
    
        var methods = {
            reloadList: function () {
                var list = "";
                $.each(members, function (i, n) {
                    list += "<li id='member_" + n.Id + "'>[" + n.Id + "]:AverageProcessedTime " + n.AverageProcessedTime + " Milliseconds</li>";
                });
                $('#members').html(list);
            }
        }
    
        var hubs = {
            clusterHub: $.connection.clusterHub,
            init: function () {
                $.connection.hub.logging = true;
                $.connection.hub.url = 'http://192.168.1.124:10086/signalr';
                $.connection.hub.qs = { "ClientRole": "Monitor" }
                $.connection.hub.start().done(function () { });
            }
        }
    
        var cluster = {
            on: {
                updateMemberStatisticsInfo: function (data) {
                    $.each(members, function (i, n) {
                        if (n.Id == data.Id) {
                            n.AverageProcessedTime = data.AverageProcessedTime;
                            return;
                        }
                    });
                    methods.reloadList();
                },
                memberJoin: function (id) {
                    members.push({ "Id": id, "AverageProcessedTime": 0 });
                    methods.reloadList();
                },
                memberLeave: function (id) {
                    members = $.grep(members, function (n) { return n.Id != id });
                    methods.reloadList();
                }
            }
        }
    
        $(function () {
            hubs.clusterHub.client.UpdateMemberStatisticsInfo = cluster.on.updateMemberStatisticsInfo;
            hubs.clusterHub.client.MemberJoin = cluster.on.memberJoin;
            hubs.clusterHub.client.MemberLeave = cluster.on.memberLeave;
            hubs.init();
        });
    })(jQuery);
    复制代码
    复制代码
    <!DOCTYPE html>
    <html xmlns="http://www.w3.org/1999/xhtml">
    <head>
        <title>集群监视器</title>
    </head>
    <body>
        <div>
            <ul id="members"></ul>
        </div>
        <script src="scripts/jquery-2.1.1.min.js"></script>
        <script src="scripts/jquery.signalR-2.1.0.min.js"></script>
        <script src="http://192.168.1.124:10086/signalr/hubs"></script>
        <script src="scripts/core.js"></script>
    </body>
    </html>
    复制代码

    监视器用real-time的Web平台实现,一共注册三个方法的实现。

    最终效果

    image

     

    Hub端启动后,先启动监视器,然后在不同的机器上启动计算端,图上是2个计算节点,监视器上也显示着2个节点,每个节点执行一个JobId后,监视器上就会刷新结果。

    进一步思考和扩展

    简易集群组件就到这儿了,本篇演示的是一个思路,可以在这个基础上深度扩展成文章开头所描述的那样,高性能高可用的基于SignalR的集群组件。欢迎各位有兴趣的同学进行讨论和拍砖。

    转载请注明出处:http://www.cnblogs.com/royding/p/3811169.html

     
     
     
    标签: SignalR
  • 相关阅读:
    如何确定系统上的CPU插槽数量
    如何查找物理cpu,cpu核心和逻辑cpu的数量
    libssl.so.10: cannot open shared object file: No such file or directory
    yum安装的时候报错,关于python的函数库
    LVM分区无损增减
    LVM管理之减少LV的大小
    Windows 系统判断MD5 值的办法
    CentOS6系列系统启动常见故障排查与解决方法
    解决centos中vsftpd中文乱码
    我的博客即将入驻“云栖社区”,诚邀技术同仁一同入驻。
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3812449.html
Copyright © 2011-2022 走看看