zoukankan      html  css  js  c++  java
  • Thrift搭建分布式微服务(二)

    第二篇 连接池

        连接池配置,请前往Thrift搭建分布式微服务(一)

        下面要介绍的其实不是单一的连接池,应该说是连接池集合。因为它要管理多个Tcp Socket连接节点,每个服务节点都有设置了自己的最大激活连接数、最大空闲连接数、最小空闲连接数、等待连接时间。

     1     internal class ServiceTransportPool
     2     {
     3         public ServiceConfig ServiceConfig { get; set; }
     4 
     5         public ConcurrentStack<TTransport> TransportPool { get; set; }
     6 
     7         public AutoResetEvent ResetEvent { get; set; }
     8 
     9         public int ActivedTransportCount { get; set; }
         
         internal object sync_obeject = new object();
    10 }

      

            一个ServiceTransportPool类对应一个服务配置,一个服务配置对应一个服务节点。连接池集合应具有下列成员:

    1     internal class ThriftFactory
    2     {
    3         private static volatile List<ServiceTransportPool> transport_pools;
    4         
    5         private static object sync_obj = new object();
    6 
    7         private static IThriftFactoryMonitor monitor = new ThriftFactoryMonitor();
    8     }

            transport_pools实现了对服务节点的管理,monitor 用来监控连接池的状态,如上一篇所讲,等待连接超时怎么通知连接池管理者,就用monitor来实现。这里monitor有个默认的实现,后面再讲。

            初始化连接池集合:

     1         static ThriftFactory()
     2         {
     3             if (transport_pools == null || transport_pools.Count == 0)
     4             {
     5                 lock (sync_obj)
     6                 {
     7                     if (transport_pools == null || transport_pools.Count == 0)
     8                     {
     9                         var services = ConfigHelper.GetServiceConfigs();
    10                         transport_pools = new List<ServiceTransportPool>(services.Count);
    11                         foreach (var service in services)
    12                         {
    13                             ServiceTransportPool stp = new ServiceTransportPool()
    14                             {
    15                                 ServiceConfig = service,
    16                                 TransportPool = new ConcurrentStack<TTransport>(),
    17                                 ResetEvent = new AutoResetEvent(false),
    18                                 ActivedTransportCount = 0
    19                             };
    20                             transport_pools.Add(stp);
    21                         }
    22                     }
    23                 }
    24             }
    25        }

       如何向连接池借出一个Socket连接:

     1         public static TTransport BorrowInstance(string serviceName)
     2         {
     3             var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault();
     4             if (transpool == null)
     5             {
     6                 throw new ThriftException(string.Format("There Is No Service Named "{0}"", serviceName));
     7             }
     8 
     9             TTransport transport;
    10             lock (transpool.sync_obeject)
    11             {
    12                 if (transpool.TransportPool.Count() == 0)
    13                 {
    14                     if (transpool.ActivedTransportCount == transpool.ServiceConfig.MaxActive)
    15                     {
    16                         bool result = transpool.ResetEvent.WaitOne(transpool.ServiceConfig.WaitingTimeout);
    17                         if (!result)
    18                         {
    19                             monitor.TimeoutNotify(transpool.ServiceConfig.Name, transpool.ServiceConfig.WaitingTimeout);
    20                         }
    21                     }
    22                     else
    23                     {
    24                         transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig));
    25                     }
    26                 }
    27 
    28                 if (!transpool.TransportPool.TryPop(out transport))
    29                 {
    30                     throw new ThriftException("Connection Pool Exception");
    31                 }
    32 
    33                 transpool.ActivedTransportCount++;
    34 
    35                 if (transpool.TransportPool.Count() < transpool.ServiceConfig.MinIdle && transpool.ActivedTransportCount < transpool.ServiceConfig.MaxActive)
    36                 {
    37                     transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig));
    38                 }
    39             }
    40             if (!transport.IsOpen)
    41             {
    42                 transport.Open();
    43             }
    44             Monitor();
    45             return transport;
    46         }

            当实际激活的连接数达到服务节点配置的最大激活连接数,获取Socket连接的请求就将处于等待状态,超过等待时间设置,使用监视器方法monitor.TimeoutNotify()去通知管理者。连接池空闲的连接小于最小空闲连接数设置,每次请求连接都会建立一个新的连接放到池子里。

            归还连接:

     1         public static void ReturnInstance(string serviceName,TTransport transport)
     2         {
     3             var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault();
     4             if (transpool == null)
     5             {
     6                 throw new ThriftException("Connection Pool Exception");
     7             }
     8             if (transpool.TransportPool.Count() == transpool.ServiceConfig.MaxIdle)
     9             {
    10                 transport.Flush();
    11                 if (transport.IsOpen)
    12                 {
    13                     transport.Close();
    14                 }
    15                 transport.Dispose();
    16             }
    17             else
    18             {
    19                 lock (transpool.sync_obeject)
    20                 {
    21                     if (transport.IsOpen)
    22                     {
    23                         transport.Close();
    24                     }
    25                     transpool.TransportPool.Push(transport);
    26                     transpool.ActivedTransportCount--;
    27                     transpool.ResetEvent.Set();
    28                 }
    29             }
    30             Monitor();
    31         }

            当连接池最大空闲连接达到了服务节点设置的最大空闲连接数时,归还的连接将被销毁。借出连接和归还连接两段代码里都有一个Monitor()方法,此方法监控连接池连接的使用情况:

            /// <summary>
            /// 监控连接池状态
            /// </summary>
            private static void Monitor()
            {
                List<Tuple<string, int, int>> tuples = new List<Tuple<string, int, int>>(transport_pools.Count);
                foreach(var transpool in transport_pools)
                {
                    Tuple<string, int, int> tuple = new Tuple<string, int, int>(transpool.ServiceConfig.Name, transpool.TransportPool.Count(), transpool.ActivedTransportCount);
                    tuples.Add(tuple);
                }
                monitor.Monitor(tuples);
            }

           此方法将每个服务连接池的空闲连接数、和激活的连接数传给前面提到的监视器。在连接等待超时和拿到连接池的运行参数时,最终进行什么动作还是由开发者去实现的。继承下面的接口,开发者可以自定义监视器。

     1     public interface IThriftFactoryMonitor
     2     {
     3         /// <summary>
     4         /// 监控连接池运行状态
     5         /// </summary>
     6         /// <param name="tuple">元组集合,第一个元素表示服务名称、第二个元素表示空闲连接数量、第三个元素表示激活连接数量</param>
     7         void Monitor(List<Tuple<string,int,int>> tuples);
     8 
     9         /// <summary>
    10         /// 等待连接超时
    11         /// </summary>
    12         void TimeoutNotify(string serviceName,int timeOut);
    13     }

           默认的监视器只是将连接池的运行状态记录到控制台:

     1     /// <summary>
     2     /// 默认连接池状态监控类
     3     /// </summary>
     4     public class ThriftFactoryMonitor : IThriftFactoryMonitor
     5     {
     6         public virtual void Monitor(List<Tuple<string, int, int>> tuples)
     7         {
     8             foreach (var t in tuples)
     9             {
    10                 Console.WriteLine(string.Format("{0}连接池,空闲连接数量:{1},激活连接数量:{2}", t.Item1, t.Item2, t.Item3));
    11             }
    12         }
    13 
    14         public virtual void TimeoutNotify(string serviceName, int timeOut)
    15         {
    16             Console.WriteLine(string.Format("{0}连接池等待连接超时{1}", serviceName, timeOut));
    17         }
    18     }

        开发者自己实现的监视器,如何能被连接池使用,其实在一开始的连接池初始化里,还有一段使用反射来初始化开发者定义的监视器的代码。开发者只需在上一篇介绍的Thrift.config里配置MonitorType,其他的就交给框架处理:

     1         static ThriftFactory()
     2         {
     3             ......
     4             if(!string.IsNullOrWhiteSpace(ConfigHelper.ThriftConfig.MonitorType))
     5             {
     6                 monitor = Invoker.CreateInstance(Type.GetType(ConfigHelper.ThriftConfig.MonitorType)) as IThriftFactoryMonitor;
     7                 if (monitor == null)
     8                 {
     9                     throw new ThriftException(string.Format("There Is No Monitor Implement Which Type Is  "{0}"", ConfigHelper.ThriftConfig.MonitorType));
    10                 }
    11             }
    12         }

       通过连接池与服务节点建立了Socket连接,下一篇将介绍客户端如何使用建立的Socket连接与服务端通信。

    Thrift微服务代码下载Thrift.Utility

       

  • 相关阅读:
    11
    10
    09
    08
    201621044079韩烨软件工程作业三
    软工作业二 201621044079韩烨
    软工作业一 201621044079韩烨
    14
    201621044079 week13 网络
    week12 201621044079 流与文件
  • 原文地址:https://www.cnblogs.com/DKSL/p/Thrift_Utiliry2.html
Copyright © 2011-2022 走看看