2012-10-08更新内容
1,因为对象池采用的是Stack,并且没有“对象在使用中,但仍在池中”这种情况,都是直接pop出来的,所以就废弃掉了idle状态,但是加入了“工作对象数量”的概念。
2,顺便把Stack改成了线程安全的ConcurrentStack,但这不是很重要,因为lock还是保留了下来
3,一些不需要lock的地方都去掉了
4,归还对象时把连接关闭了(但不销毁,第一版连接没关闭,总是出问题),这就使得ValidateOnBorrow一定要置为true了
5,示例中的process方法把Service对象用using包了起来,以及时释放连接
原文:
项目决定采用Thrift,所有测试通过后,那就要解决连接池的问题了,网上一通海搜,难道是我用的关键词不对?居然没有C#对Thrift连接池的实现,没办法,只有根据对象池的模式自己写一个,经过多人多线程测试后,基本没有问题了,比较粗糙,欢迎拍砖
几个原则:
1,能控制对象池中对象的总数
2,能控制对象池中空闲对象的总数(改为控制工作对象的总数)
3,提供一个借出对象的接口
4,提供一个归还对象的接口
5,借出和归还的对象有检验机制
这样,大致就可以用了,未来可以加上超时自动释放的机制,目前未实现。
分如下几部分
1:配置文件/配置对象
配置文件肯定要有,配置对象仅仅是为了操作方便(比如别的地方要用),如果没有特殊的需求,直接把配置项写到私有属性里就可以了
public class ThriftConfig
{
/// <summary>
/// 服务器地址
/// </summary>
public string Host { get; set; }
/// <summary>
/// 服务端口
/// </summary>
public int Port { get; set; }
/// <summary>
/// 传输编码
/// </summary>
public Encoding Encode { get; set; }
/// <summary>
/// 是否启用压缩
/// </summary>
public bool Zipped { get; set; }
/// <summary>
/// 连接超时
/// </summary>
public int Timeout { get; set; }
/// <summary>
/// 可以从缓存池中分配对象的最大数量
/// </summary>
public int MaxActive { get; set; }
/// <summary>
/// 缓存池中最大空闲对象数量
/// </summary>
public int MaxIdle { get; set; }
/// <summary>
/// 缓存池中最小空闲对象数量
/// </summary>
public int MinIdle { get; set; }
/// <summary>
/// 阻塞的最大数量
/// </summary>
public int MaxWait { get; set; }
/// <summary>
/// 从缓存池中分配对象时是否验证对象
/// </summary>
public bool ValidateOnBorrow { get; set; }
/// <summary>
/// 从缓存池中归还对象时是否验证对象
/// </summary>
public bool ValidateOnReturn { get; set; }
/// <summary>
/// 从缓存池中挂起对象时是否验证对象
/// </summary>
public bool ValidateWhiledIdle { get; set; }配置文件不过就是上述内容的xml形式,不赘述
2,连接池类
public class ThriftPool
{
#region 属性
private ThriftConfig config;
/// <summary>
/// 对象缓存池
/// </summary>
//private static Stack<TTransport> objectPool { get; set; }
private static ConcurrentStack<TTransport> objectPool { get; set; }
/// <summary>
/// 同步对象
/// </summary>
private static AutoResetEvent resetEvent;
/// <summary>
/// 每取走一例,表示激活对象加1,此属性可控制对象池容量
/// </summary>
private static volatile int activedCount = 0;
/// <summary>
/// 同步对象锁
/// </summary>
private static object locker = new object();
#endregion
#region 构造函数
public ThriftPool()
{
config = GetConfig();
CreateResetEvent();
CreateThriftPool();
}
#endregion
#region 公有方法
/// <summary>
/// 从对象池取出一个对象
/// </summary>
/// <returns></returns>
public TTransport BorrowInstance()
{
lock (locker)
{
//Console.WriteLine("借前对象池个数:{0},工作对象个数:{1}", objectPool.Count(), activedCount);
TTransport transport;
//对象池无空闲对象
if (objectPool.Count() == 0)
{
//对象池已激活对象数达上限
if (activedCount == config.MaxActive)
{
resetEvent.WaitOne();
}
else
{
PushObject(CreateInstance());
}
}
if (!objectPool.TryPop(out transport)) throw new Exception("连接池异常");
//transport = objectPool.Pop();
activedCount++;
//检查对象池存量
//对象池存量小于最小空闲数,并且激活数小于最大激活数,添加一个对象到对象池
if (objectPool.Count() < config.MinIdle && activedCount < config.MaxActive)
{
PushObject(CreateInstance());
}
if (config.ValidateOnBorrow)
{
ValidateOnBorrow(transport);
}
return transport;
}
}
/// <summary>
/// 归还一个对象
/// </summary>
/// <param name="instance"></param>
public void ReturnInstance(TTransport instance)
{
//对象池容量达到上限,不再返回线程池,直接销毁
if (objectPool.Count() == config.MaxIdle)
{
DestoryInstance(instance);
}
else
{
if (config.ValidateOnReturn)
{
ValidateOnReturn(instance);
}
PushObject(instance);
activedCount--;
//发通知信号,有对象归还到对象池
resetEvent.Set();
}
//Console.WriteLine("归还后对象池个数:{0},归还后工作对象个数:{1}", objectPool.Count(), activedCount);
}
#endregion
#region 私有方法
/// <summary>
/// 创建线程同步对象
/// </summary>
private void CreateResetEvent()
{
if (resetEvent == null)
{
resetEvent = new AutoResetEvent(false);
}
}
/// <summary>
/// 创建对象池
/// </summary>
private void CreateThriftPool()
{
if (objectPool == null)
{
objectPool = new ConcurrentStack<TTransport>();// new Stack<TTransport>();
}
}
/// <summary>
/// 添加对象到对象池
/// </summary>
/// <param name="transport"></param>
private void PushObject(TTransport transport)
{
objectPool.Push(transport);
}
/// <summary>
/// 创建一个对象
/// </summary>
/// <returns></returns>
private TTransport CreateInstance()
{
TTransport transport = new TSocket(config.Host, config.Port);
transport.Open();
return transport;
}
/// <summary>
/// 取出对象时校验对象
/// </summary>
private void ValidateOnBorrow(TTransport instance)
{
if (!instance.IsOpen)
{
instance.Open();
}
}
/// <summary>
/// 归还对象时校验对象
/// </summary>
private void ValidateOnReturn(TTransport instance)
{
if (instance.IsOpen)
{
instance.Close();
}
}
/// <summary>
/// 销毁对象
/// </summary>
/// <param name="instance"></param>
private void DestoryInstance(TTransport instance)
{
instance.Flush();
if (instance.IsOpen)
{
instance.Close();
}
instance.Dispose();
}
/// <summary>
/// 得到配置参数
/// </summary>
/// <returns></returns>
private ThriftConfig GetConfig()
{
return Utility.GetConfig();
}
#endregion
}封装了一个服务类来取Thrift连接对象,把对象池模式隐藏起来
要注意的是,这个对象必须实现IDisposable接口,因为要手动在连接完毕后归还对象
internal class Service : IDisposable
{
ThriftPool pool;
TTransport transport;
MyThriftTest.Client client;//我写的测试服务器就叫MyThriftTest
bool disposed;
public ThriftConfig config { get; set; }
public Service()
{
disposed = false;
pool = new ThriftPool();
transport = pool.BorrowInstance();//从对象池取出一个对象
TProtocol protocol = new TBinaryProtocol(transport);
client = new MyThriftTest.Client(protocol);
}
public string Invoke(string arg1, string arg2)
{
//我写的测试方法就叫invoke
return client.invoke(arg1, arg2);
}
~Service()
{
Dispose(false);
}
protected void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
pool.ReturnInstance(transport);//归还当前对象到对象池
}
// Release unmanaged resources
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}测试:
static void Main(string[] args)
{
for (int i = 0; i < 1000; i++)
{
Thread t = new Thread(new ThreadStart(process));
t.Name = "thread:" + i;
t.Start();
}
}
private static void process()
{
try
{
using(Service svr=new Service())
{
string o = svr.Invoke("hello", "world");
log(o);
}
}
catch (Exception ex)
{
log(ex);
}
}
static object o = new object();
private static void log(string msg)
{
lock (o)
{
using (StreamWriter sm = new StreamWriter(@"D:\thrift.txt", true))
{
sm.WriteLine(msg);
}
}
}测试代码中写文件而不是输出到控制台,主要是因为控制台可显示的条目过少,一旦出了错就滚过去了
测试良好,有发现的问题再改吧,再次声明,欢迎拍砖
参考链接:
C# http://www.cnblogs.com/RushSykes/archive/2012/04/16/2451880.html
Javahttp://www.cnblogs.com/51cto/archive/2010/08/18/Thrift_Connection_pool.html