背景
最近做了一个需求,为了控制数据库连接数,自己写了个简易的数据库连接池。跟F哥聊天的时候提到了,说可以搞个通用的对象连接池,So……
目的
- 让对象池相关的需求开发起来更便捷,不用依赖apache的common包;
- 讨论了简易数据库连接池的缺点,想弥补并通用;
- 造个轮子感受一下。
线程池的作用
- 减少对象创建销毁的性能消耗;
- 控制资源使用数量
代码实现
1.对象生成工厂统一接口:
/**
* 对象生成工厂统一接口
* @param <T>
*
* @Author 刘世杰
* @Date 2015-09-09 23:49:37
*/
public interface IObjectFactory<T> {
public boolean validate(T t);
public T newInstance();
public void destroy(T t);
}
2.对象工厂的抽象实现,方便开发用的而已:
/**
* 对象工厂的抽象实现:
* 主要作用是为提供默认实现,以避免强制子类实现不需要修改的方法
* @param <T>
*
* @Author 刘世杰
* @Date 2015-09-09 23:47:59
*
*/
public abstract class AbstractObjectFactory<T> implements IObjectFactory<T> {
@Override
public boolean validate(T t) {
return t != null;
}
}
3.数据库连接对象工厂实现:
/**
* 数据库连接池
*
* @Author 刘世杰
* @Date 2015-09-09 23:48:28
*/
public class ConnectionFactory extends AbstractObjectFactory<Connection> implements IObjectFactory<Connection> {
private static ConnectionFactory factory;
private ConnectionFactory() {
// lol :]
}
public static ConnectionFactory getInstance() {
if (factory == null) {
synchronized (ConnectionFactory.class) {
if (factory == null) {
factory = new ConnectionFactory();
}
}
}
return factory;
}
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
@Override
public boolean validate(Connection connection) {
try {
if (connection == null) {
return false;
}
if (connection.isClosed()) {
return false;
}
return true;
} catch (Exception e) {
// e.printStackTrace();
System.out.println(e.getMessage());
return false;
}
}
@Override
public Connection newInstance() {
try {
String url = "jdbc://~~~";
String user = "root";
String password = "root";
return DriverManager.getConnection("abc","123","123");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void destroy(Connection connection) {
try {
if (connection != null && !connection.isClosed()) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
4.具体字符串连接池工厂实现
/**
* 字符串连接池工厂
* @Author 刘世杰
* @Date 2015-09-09 23:46:31
*/
public class StringFactory extends AbstractObjectFactory<String> implements IObjectFactory<String> {
private static StringFactory factory;
private StringFactory() {
// lol :]
}
public static StringFactory getInstance() {
if (factory == null) {
synchronized (StringFactory.class) {
if (factory == null) {
factory = new StringFactory();
}
}
}
return factory;
}
@Override
public boolean validate(String string) {
return (string != null && string.trim().length() > 0);
}
@Override
public String newInstance() {
return "" + System.nanoTime();
}
@Override
public void destroy(String string) {
string = null;
}
}
5.对象池:
import java.util.Collections;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 对象池
* @param <T>
*
* @Author 刘世杰
* @Date 2015-09-09 23:48:19
*/
public class ObjectPoll<T> {
// 锁
private final Byte lock = new Byte((byte)0);
//
private final Set<T> locker;
private final Queue<T> queue;
private static final int DEFAULT_INITIAL_CAPACITY = 5;
private static final int DEFAULT_MAXIMUM_CAPACITY = 10;
private final int initialCapacity;
private final int maximumCapacity;
// 多余对象空闲时间
private final int keepAliveTime = 60000;
// 对象实例工厂
private IObjectFactory factory;
/**
* 构造器
* @param factory 对象实例工厂
*/
public ObjectPoll(IObjectFactory factory) throws Exception {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAXIMUM_CAPACITY, factory);
}
/**
* 构造器
* @param initialCapacity 初始对象数
* @param factory 对象实例工厂
*/
public ObjectPoll(int initialCapacity, IObjectFactory factory) throws Exception {
this(initialCapacity, initialCapacity > DEFAULT_MAXIMUM_CAPACITY ? DEFAULT_MAXIMUM_CAPACITY : initialCapacity, factory);
}
/**
* 构造器
* @param initialCapacity 初始对象数
* @param maximumCapacity 最大对象数
* @param factory 对象实例工厂
*/
public ObjectPoll(int initialCapacity, int maximumCapacity, IObjectFactory factory) {
if (factory == null) {
throw new IllegalArgumentException("IObjectFactory is null...");
}
if (initialCapacity <= 0) {
throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity);
}
if (maximumCapacity <= 0) {
throw new IllegalArgumentException("Illegal maximum capacity: " + maximumCapacity);
}
if (initialCapacity > maximumCapacity) {
throw new IllegalArgumentException("initial capacity bigger than maximum capacity...");
}
this.initialCapacity = initialCapacity;
this.maximumCapacity = maximumCapacity;
this.factory = factory;
locker = Collections.synchronizedSet(new HashSet<T>());
queue = new ConcurrentLinkedQueue<T>();
for (int i=0; i < initialCapacity; i++) {
queue.offer((T) factory.newInstance());
}
}
/**
* 获取一个对象
* 如果有对象直接返回
* 如果池中没有空闲对象,如果对象总数达到maximumCapacity,当前请求阻塞等待,否则实例化并记录对象返回
*
* @return
*/
public T get() {
T t = queue.poll();
if (!factory.validate(t)) {
locker.remove(t);
factory.destroy(t);
synchronized (lock) {
if (queue.size() + locker.size() >= maximumCapacity) {
while (true) {
// try {
// System.out.println("wating...");
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
t = queue.poll();
if (factory.validate(t)) {
break;
}
}
} else {
t = (T) factory.newInstance();
if (t == null) {
throw new NullPointerException(factory.getClass().getName() + ".newInstance is null!");
}
}
}
}
locker.add(t);
return t;
}
/**
* 可用对象数
* @return
*/
public int size() {
return queue.size();
}
/**
* 还回对象
* @param t
*/
public void back(T t) {
if (locker.contains(t)) {
if (factory.validate(t)) {
// TODO:需要实现多余对象的存活期,防止重复频繁创建和销毁
if (queue.size() + locker.size() < initialCapacity) {
queue.offer(t);
}
}
locker.remove(t);
}
}
/**
* 关闭对象池
*/
public void close() {
while (true) {
T t = queue.poll();
factory.destroy(t);
if (t == null) {
break;
}
}
// TODO:也许定制一个策略
locker.clear();
}
}
6.来测试一下,简单起见我用了StringFactory:
/**
* TestDriver
*
* @Author 刘世杰
* @Date 2015-09-09 23:47:40
*/
public class Run {
public static void main(String[] args) throws InterruptedException {
int thread_num = 100;
ObjectPoll<String> poll = new ObjectPoll<String>(20, 40, StringFactory.getInstance());
CountDownLatch signal = new CountDownLatch(thread_num);
try {
for (int i = 0; i < thread_num; i++) {
new Thread(new StringRunnable(poll, signal)).start();
}
} finally {
signal.await();
if (poll != null) {
poll.close();
}
}
}
private static class StringRunnable implements Runnable {
private ObjectPoll<String> poll;
private CountDownLatch signal;
StringRunnable(ObjectPoll<String> poll, CountDownLatch singal) {
this.poll = poll;
this.signal = singal;
}
@Override
public void run() {
String str = poll.get();
print(str + "======" + poll.size());
try {
// 慢一点归还
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
poll.back(str);
signal.countDown();
}
}
private static void print(Object object) {
System.out.println(object);
}
}
总结
一个基本的对象连接池已经实现了,还有两个点可以优化一下:
- 为了降低繁忙期间的频繁创建和销毁需要有一个空闲对象的存活时间,类似线程池里的keepAliveTime;
- 关闭线程池时,没有回来的对象如何处理?不接受新请求,等待归来?还是开发一个回调接口?