zoukankan      html  css  js  c++  java
  • Wcf通讯基础框架方案(五)——更新通知

    对于负载均衡环境,多服务器内存中缓存数据的话,需要解决的一个很重要的问题就是一旦数据库中数据有更新,怎么让缓存的数据立即更新?

    如果可以容忍延迟或是差异性的话,可以考虑缓存的数据有一个过期时间。但是,最好的方式还是采用通知方式,或者说发布订阅方式。

    所有的客户端会订阅客户端配置修改的消息,所有的服务端会订阅服务端配置修改的消息,配置后台在修改后复杂发布这个消息。

    在这里采用redis作为发布订阅的服务端,利用TCP双工特性与所有客户端和服务端保持长连接,进行消息的推送。

            string contract = "";
                using (WcfConfigDataContext data = new WcfConfigDataContext())
                {
                    var s = data.ServerFarms.First();
                    s.ServerFarmAddress = TextBox1.Text;
                    data.SubmitChanges();
                    contract = data.ServiceEndpoints.First().ServiceContractType;
                }
    
                using (WcfConfigDataContext data = new WcfConfigDataContext())
                {
                    TextBox1.Text = data.ServerFarms.First().ServerFarmAddress;
                }
                using (var redisClient = cm.GetClient())
                {
                    redisClient.PublishMessage("WcfConfigClientChange", contract);
                }

    假设后台更新了服务集群的地址,势必要通知客户端来重新更新缓存的信道工厂。可以看到,在后台修改了配置之后立即向WcfConfigClientChange通道发布一个消息,消息内容就是契约的类型名。

    在客户端的WcfServiceClientFactory中:

        [MethodImpl(MethodImplOptions.Synchronized)]
            private static void CreateRedisSubThread()
            {
                StartSub();
                redisCheckTimer = new Timer(obj =>
                {
                    if (((TimeSpan)(DateTime.Now - lastMsg)).Seconds > 10)
                    {
    #if DEBUG
                        LocalLogService.Log("recreate redis sub thread");
    #endif
                        try
                        {
                            redisSubThread.Abort();
                        }
                        catch
                        {
    
                        }
                        finally
                        {
                            StartSub();
                        }
                    }
                }, null, 0, 5000);
            }
    
            private static void StartSub()
            {
                redisSubThread = new Thread(() =>
                {
                    try
                    {
                        using (var subscription = cm.GetClient().CreateSubscription())
                        {
                            subscription.OnUnSubscribe = channel =>
                            {
    #if DEBUG
                                LocalLogService.Log("OnUnSubscribe");
    #endif
                            };
                            subscription.OnMessage = (channel, msg) =>
                            {
                                try
                                {
                                    if (msg == "heart")
                                    {
                                        lastMsg = DateTime.Now;
                                    }
                                    ChannelFactory cf;
                                    if (channelFactoryCache.TryGetValue(msg, out cf))
                                    {
                                        lock (cacheLocker)
                                        {
                                            channelFactoryCache.Remove(msg);
    #if DEBUG
                                            LocalLogService.Log("Remove channel factory cache");
    #endif
                                        }
                                    }
                                }
                                catch (Exception ex)
                                {
                                    LocalLogService.Log(ex.ToString());
                                }
                            };
    
                            subscription.SubscribeToChannels(REDIS_MESSAGECHANNEL);
                        }
                    }
                    catch (Exception ex)
                    {
                        LocalLogService.Log(ex.ToString());
                    }
                });
    
                redisSubThread.IsBackground = true;
                redisSubThread.Start();
            }

    可以看到,在初始化信道工程缓存的时候会调用CreateRedisSubThread方法,完成两个工作:

    1) 启动心跳检测定时器,如果过长时间都没有收到订阅心跳消息的话,重新尝试建立订阅通道。

    2) 后台线程接受订阅的消息,如果契约类型在当前的信道工厂缓存中的话,删除缓存等待重新建立。

    客户端的缓存更新比较简单,服务端的话可能就涉及到关闭ServiceHost重新启动等问题了,这里不给出详细实现。

    除了redis之外,当然也可以使用wcf框架做一个发布订阅功能:

    1) 使用双工信道

    2) 使用回调特性

    简单起见,这里也没有使用实现wcf的发布订阅。

  • 相关阅读:
    「日常报错」Response to preflight request doesn't pass access control check: It does not have HTTP ok status.
    LeetCode1026. 节点与其祖先之间的最大差值
    Erlang TCP 实例
    「笔记」Systemd 的基础
    折腾日记「乱七八糟的过程」
    LeetCode15.三数之和
    Sql Paging
    行变列
    SQL JOINS
    DBML存储过程返回值
  • 原文地址:https://www.cnblogs.com/lovecindywang/p/2032004.html
Copyright © 2011-2022 走看看