zoukankan      html  css  js  c++  java
  • C#实现Thrift连接池[新]

    2012-10-08更新内容

    1,因为对象池采用的是Stack,并且没有“对象在使用中,但仍在池中”这种情况,都是直接pop出来的,所以就废弃掉了idle状态,但是加入了“工作对象数量”的概念。

    2,顺便把Stack改成了线程安全的ConcurrentStack,但这不是很重要,因为lock还是保留了下来

    3,一些不需要lock的地方都去掉了

    4,归还对象时把连接关闭了(但不销毁,第一版连接没关闭,总是出问题),这就使得ValidateOnBorrow一定要置为true了

    5,示例中的process方法把Service对象用using包了起来,以及时释放连接

    原文:

    项目决定采用Thrift,所有测试通过后,那就要解决连接池的问题了,网上一通海搜,难道是我用的关键词不对?居然没有C#对Thrift连接池的实现,没办法,只有根据对象池的模式自己写一个,经过多人多线程测试后,基本没有问题了,比较粗糙,欢迎拍砖

    几个原则:

    1,能控制对象池中对象的总数

    2,能控制对象池中空闲对象的总数(改为控制工作对象的总数)

    3,提供一个借出对象的接口

    4,提供一个归还对象的接口

    5,借出和归还的对象有检验机制

    这样,大致就可以用了,未来可以加上超时自动释放的机制,目前未实现。

    分如下几部分

    1:配置文件/配置对象

    配置文件肯定要有,配置对象仅仅是为了操作方便(比如别的地方要用),如果没有特殊的需求,直接把配置项写到私有属性里就可以了

    public class ThriftConfig
    {
    	/// <summary>
    	/// 服务器地址
    	/// </summary>
    	public string Host { get; set; }
    	/// <summary>
    	/// 服务端口
    	/// </summary>
    	public int Port { get; set; }
    	/// <summary>
    	/// 传输编码
    	/// </summary>
    	public Encoding Encode { get; set; }
    	/// <summary>
    	/// 是否启用压缩
    	/// </summary>
    	public bool Zipped { get; set; }
    	/// <summary>
    	/// 连接超时
    	/// </summary>
    	public int Timeout { get; set; }
    	/// <summary>
    	/// 可以从缓存池中分配对象的最大数量
    	/// </summary>
    	public int MaxActive { get; set; }
    	/// <summary>
    	/// 缓存池中最大空闲对象数量
    	/// </summary>
    	public int MaxIdle { get; set; }
    	/// <summary>
    	/// 缓存池中最小空闲对象数量
    	/// </summary>
    	public int MinIdle { get; set; }
    	/// <summary>
    	/// 阻塞的最大数量
    	/// </summary>
    	public int MaxWait { get; set; }
    	/// <summary>
    	/// 从缓存池中分配对象时是否验证对象
    	/// </summary>
    	public bool ValidateOnBorrow { get; set; }
    	/// <summary>
    	/// 从缓存池中归还对象时是否验证对象
    	/// </summary>
    	public bool ValidateOnReturn { get; set; }
    	/// <summary>
    	/// 从缓存池中挂起对象时是否验证对象
    	/// </summary>
    	public bool ValidateWhiledIdle { get; set; }

    配置文件不过就是上述内容的xml形式,不赘述

    2,连接池类

    public class ThriftPool
    {
    	#region 属性
    	private ThriftConfig config;
    
    	/// <summary>
    	/// 对象缓存池
    	/// </summary>
    	//private static Stack<TTransport> objectPool { get; set; }
    	private static ConcurrentStack<TTransport> objectPool { get; set; }
    	/// <summary>
    	/// 同步对象
    	/// </summary>
    	private static AutoResetEvent resetEvent;
    	/// <summary>
    	/// 每取走一例,表示激活对象加1,此属性可控制对象池容量
    	/// </summary>
    	private static volatile int activedCount = 0;
    	/// <summary>
    	/// 同步对象锁
    	/// </summary>
    	private static object locker = new object();
    
    	#endregion
    
    	#region 构造函数
    
    	public ThriftPool()
    	{
    		config = GetConfig();
    		CreateResetEvent();
    		CreateThriftPool();
    	}
    
    	#endregion
    
    	#region 公有方法
    
    	/// <summary>
    	/// 从对象池取出一个对象
    	/// </summary>
    	/// <returns></returns>
    	public TTransport BorrowInstance()
    	{
    		lock (locker)
    		{
    			//Console.WriteLine("借前对象池个数:{0},工作对象个数:{1}", objectPool.Count(), activedCount);
    			TTransport transport;
    			//对象池无空闲对象
    			if (objectPool.Count() == 0)
    			{
    				//对象池已激活对象数达上限
    				if (activedCount == config.MaxActive)
    				{
    					resetEvent.WaitOne();
    				}
    				else
    				{
    					PushObject(CreateInstance());
    				}
    			}
    			if (!objectPool.TryPop(out transport)) throw new Exception("连接池异常");
    			//transport = objectPool.Pop();
    			activedCount++;
    			//检查对象池存量
    			//对象池存量小于最小空闲数,并且激活数小于最大激活数,添加一个对象到对象池
    			if (objectPool.Count() < config.MinIdle && activedCount < config.MaxActive)
    			{
    				PushObject(CreateInstance());
    			}
    			if (config.ValidateOnBorrow)
    			{
    				ValidateOnBorrow(transport);
    			}
    			return transport;
    		}
    	}
    
    	/// <summary>
    	/// 归还一个对象
    	/// </summary>
    	/// <param name="instance"></param>
    	public void ReturnInstance(TTransport instance)
    	{
    		//对象池容量达到上限,不再返回线程池,直接销毁
    		if (objectPool.Count() == config.MaxIdle)
    		{
    			DestoryInstance(instance);
    		}
    		else
    		{
    			if (config.ValidateOnReturn)
    			{
    				ValidateOnReturn(instance);
    			}
    			PushObject(instance);
    			activedCount--;
    			//发通知信号,有对象归还到对象池
    			resetEvent.Set();
    		}
    		//Console.WriteLine("归还后对象池个数:{0},归还后工作对象个数:{1}", objectPool.Count(), activedCount);
    	}
    
    	#endregion
    
    	#region 私有方法
    
    	/// <summary>
    	/// 创建线程同步对象
    	/// </summary>
    	private void CreateResetEvent()
    	{
    		if (resetEvent == null)
    		{
    			resetEvent = new AutoResetEvent(false);
    		}
    	}
    
    	/// <summary>
    	/// 创建对象池
    	/// </summary>
    	private void CreateThriftPool()
    	{
    		if (objectPool == null)
    		{
    			objectPool = new ConcurrentStack<TTransport>();// new Stack<TTransport>();
    		}
    	}
    
    	/// <summary>
    	/// 添加对象到对象池
    	/// </summary>
    	/// <param name="transport"></param>
    	private void PushObject(TTransport transport)
    	{
    		objectPool.Push(transport);
    	}
    
    	/// <summary>
    	/// 创建一个对象
    	/// </summary>
    	/// <returns></returns>
    	private TTransport CreateInstance()
    	{
    		TTransport transport = new TSocket(config.Host, config.Port);
    		transport.Open();
    		return transport;
    	}
    
    	/// <summary>
    	/// 取出对象时校验对象
    	/// </summary>
    	private void ValidateOnBorrow(TTransport instance)
    	{
    		if (!instance.IsOpen)
    		{
    			instance.Open();
    		}
    	}
    
    	/// <summary>
    	/// 归还对象时校验对象
    	/// </summary>
    	private void ValidateOnReturn(TTransport instance)
    	{
    		if (instance.IsOpen)
    		{
    			instance.Close();
    		}
    	}
    
    	/// <summary>
    	/// 销毁对象
    	/// </summary>
    	/// <param name="instance"></param>
    	private void DestoryInstance(TTransport instance)
    	{
    		instance.Flush();
    		if (instance.IsOpen)
    		{
    			instance.Close();
    		}
    		instance.Dispose();
    	}
    
    	/// <summary>
    	/// 得到配置参数
    	/// </summary>
    	/// <returns></returns>
    	private ThriftConfig GetConfig()
    	{
    		return Utility.GetConfig();
    	}
    
    	#endregion
    
    }

    封装了一个服务类来取Thrift连接对象,把对象池模式隐藏起来

    要注意的是,这个对象必须实现IDisposable接口,因为要手动在连接完毕后归还对象

        internal class Service : IDisposable
        {
            ThriftPool pool;
            TTransport transport;
            MyThriftTest.Client client;//我写的测试服务器就叫MyThriftTest
            bool disposed;
            public ThriftConfig config { get; set; }
            public Service()
            {
                disposed = false;
                pool = new ThriftPool();
                transport = pool.BorrowInstance();//从对象池取出一个对象
                TProtocol protocol = new TBinaryProtocol(transport);
                client = new MyThriftTest.Client(protocol);
            }
            public string Invoke(string arg1, string arg2)
            {
    			//我写的测试方法就叫invoke
                return client.invoke(arg1, arg2);
            }
            ~Service()
            {
                Dispose(false);
            }
            protected void Dispose(bool disposing)
            {
                if (!disposed)
                {
                    if (disposing)
                    {
                        pool.ReturnInstance(transport);//归还当前对象到对象池
                    }
                    // Release unmanaged resources
                    disposed = true;
                }
            }
            public void Dispose()
            {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
        }

    测试:

    static void Main(string[] args)
    {
    	for (int i = 0; i < 1000; i++)
    	{
    		Thread t = new Thread(new ThreadStart(process));
    		t.Name = "thread:" + i;
    		t.Start();
    	}
    
    }
    	
    private static void process()
    {
    	try
    	{
    		using(Service svr=new Service())
    		{
    			string o = svr.Invoke("hello", "world");
    			log(o);
    		}
    	}
    	catch (Exception ex)
    	{
    		log(ex);
    	}
    }
    
    static object o = new object();
    private static void log(string msg)
    {
    	lock (o)
    	{
    		using (StreamWriter sm = new StreamWriter(@"D:\thrift.txt", true))
    		{
    			sm.WriteLine(msg);
    		}
    	}
    }

    测试代码中写文件而不是输出到控制台,主要是因为控制台可显示的条目过少,一旦出了错就滚过去了

    测试良好,有发现的问题再改吧,再次声明,欢迎拍砖

    参考链接:

    C# http://www.cnblogs.com/RushSykes/archive/2012/04/16/2451880.html 

    Javahttp://www.cnblogs.com/51cto/archive/2010/08/18/Thrift_Connection_pool.html

  • 相关阅读:
    [软件工程 2018西北师范大学]实验一 软件工程准备 评分
    【集美大学1411_助教博客】助教总结
    【集美大学1411_助教博客】团队作业10——项目复审与事后分析(Beta版本)
    【集美大学1411_助教博客】团队作业9——测试与发布(Beta版本)
    【集美大学1411_助教博客】团队作业8——第二次项目冲刺(Beta阶段)
    【集美大学1411_助教博客】alpha阶段后 成绩
    Javascript 异步加载详解
    复选框,全选或者全不选
    Java六大必须理解的问题
    写了一个Java的简单缓存模型
  • 原文地址:https://www.cnblogs.com/walkerwang/p/2715735.html
Copyright © 2011-2022 走看看