先直接上代码
线程池调度类
1 import org.slf4j.Logger; 2 import org.slf4j.LoggerFactory; 3 4 import java.util.concurrent.LinkedBlockingQueue; 5 import java.util.concurrent.RejectedExecutionException; 6 import java.util.concurrent.ThreadPoolExecutor; 7 import java.util.concurrent.TimeUnit; 8 9 /** 10 * 线程池调度类 11 * User: zhaop 12 * Date: 15-2-3 13 * Time: 下午4:19 14 */ 15 public class ThreadScheduler { 16 private static final Logger logger = LoggerFactory.getLogger(ThreadScheduler.class); 17 18 private ThreadPoolExecutor producerPool = null; 19 20 private static ThreadScheduler threadScheduler = null; 21 22 private static final int THREAD_POOL_KEEP_ALIVE_TIME = 30; 23 private static final int THREAD_POOL_MAX_SIZE = 100000; 24 private static final int THREAD_POOL_INIT_SIZE = 100; 25 26 public ThreadScheduler() { 27 // 构造一个线程池 28 producerPool = new ThreadPoolExecutor(THREAD_POOL_INIT_SIZE, 29 THREAD_POOL_MAX_SIZE, 30 THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, 31 new LinkedBlockingQueue<Runnable>(), 32 new ThreadPoolExecutor.AbortPolicy()); 33 // producerPool.allowCoreThreadTimeOut(true); 34 } 35 36 public static ThreadScheduler getPool() { 37 if (null == threadScheduler || threadScheduler.producerPool.isShutdown()) 38 synchronized (ThreadScheduler.class) { 39 if (null == threadScheduler || threadScheduler.producerPool.isShutdown()) { 40 threadScheduler = new ThreadScheduler(); 41 } 42 } 43 return threadScheduler; 44 } 45 46 47 public void exeThread(Runnable tm) { 48 // 向线程池派送一个任务。 49 try { 50 producerPool.execute(tm); 51 //logger.info("线程池当前线程数为:"+producerPool.getPoolSize()); 52 } catch (RejectedExecutionException e) { 53 logger.error("线程池派送任务发生错误!当前线程数为:" + producerPool.getPoolSize() + ",错误信息为:" 54 + e.getMessage(), e); 55 } 56 } 57 58 public void shutdown() { 59 //关闭线程池 60 try { 61 threadScheduler.producerPool.shutdownNow(); 62 } catch (RejectedExecutionException e) { 63 logger.error("关闭线程池发生错误!错误信息为:" 64 + e.getMessage(), e); 65 } 66 } 67 68 public int getCurPoolSize(){ 69 return producerPool.getPoolSize(); 70 } 71 72 public static void setMaximumPoolSize(int maxPoolSize) { 73 if(null != threadScheduler){ 74 threadScheduler.producerPool.setMaximumPoolSize(maxPoolSize); 75 } 76 } 77 78 public static void setTHREAD_POOL_INIT_SIZE(int thread_pool_init_size) { 79 if (null != threadScheduler){ 80 threadScheduler.producerPool.setCorePoolSize(thread_pool_init_size); 81 } 82 } 83 84 85 }
线程分发器
1 import cn.ac.ict.xcloud.constants.ProjectConstants; 2 import cn.ac.ict.xcloud.msgqueue.threadPool.ThreadScheduler; 3 import cn.ac.ict.xcloud.zookeeperclient.ZookeeperClient; 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 /** 8 * User: zhaop 9 * Date: 15-2-3 10 * Time: 下午8:46 11 */ 12 public class HandlerDispatch { 13 private static final Logger log = LoggerFactory.getLogger(HandlerDispatch.class); 14 private static HandlerDispatch handlerDispatch = null; 15 private static Object lock = new Object(); 16 private boolean waiting = false; //线程是否被挂起 17 18 private int totalThread = ZookeeperClient.getInstance().getIntegerProperty(ProjectConstants.XCLOUD_MQ_PROJECT, "TOTAL_THREAD", 50); // 总线程数 19 private int curThread = 0; //当前线程数 20 21 22 public static HandlerDispatch getHandlerDispatch() { 23 if (null == handlerDispatch) 24 synchronized (HandlerDispatch.class) { 25 if (null == handlerDispatch) { 26 handlerDispatch = new HandlerDispatch(); 27 } 28 } 29 return handlerDispatch; 30 } 31 32 33 public void addWorker(WorkerThread abstractHanler) { 34 if (curThread < totalThread) { 35 synchronized (lock) { 36 ThreadScheduler.getPool().exeThread(abstractHanler); 37 curThread++; 38 log.info("正在运行线程数[" + curThread + "/" + totalThread + "]"); 39 } 40 } else { 41 try { 42 synchronized (this) { 43 log.info("运行线程数达到最大值,接收消息线程被挂起"); 44 if (!waiting){ 45 waiting = true; 46 this.wait(); //挂起 47 } 48 } 49 } catch (InterruptedException e) { 50 log.error(e.getMessage(), e); 51 } 52 } 53 } 54 55 public void finishWorker(){ 56 synchronized (lock) { 57 curThread--; 58 log.info("正在运行线程数[" + curThread + "/" + totalThread + "]"); 59 if (curThread < totalThread && waiting){ 60 log.info("唤醒接收消息线程"); 61 synchronized(this){ 62 waiting = false; 63 this.notifyAll(); 64 } 65 } 66 } 67 } 68 69 70 public int getCurThread() { 71 return curThread; 72 } 73 74 public void setCurThread(int curThread) { 75 this.curThread = curThread; 76 } 77 78 public int getTotalThread() { 79 return totalThread; 80 } 81 82 public void setTotalThread(int totalThread) { 83 this.totalThread = totalThread; 84 } 85 86 87 }
线程类
1 import org.slf4j.Logger; 2 import org.slf4j.LoggerFactory; 3 import org.springframework.stereotype.Component; 4 5 /** 6 * Created with IntelliJ IDEA. 7 * User: zhaop 8 * Date: 15-2-3 9 * Time: 下午4:36 10 */ 11 12 @Component 13 public class WorkerThread implements Runnable{ 14 private static final Logger logger = LoggerFactory.getLogger(WorkerThread.class); 15 16 private IHandler handler; 17 private String message; 18 19 public WorkerThread(){ 20 } 21 22 public WorkerThread(IHandler handler, String message){ 23 this.handler = handler; 24 this.message = message; 25 }; 26 27 public void callBack(){ 28 logger.info("线程工作结束,回调..."); 29 HandlerDispatch.getHandlerDispatch().finishWorker(); 30 } 31 32 @Override 33 public void run() { 34 handler.handleMessage(message); 35 callBack(); 36 } 37 }
调用方式
workerThread = new WorkerThread(userMsgHandler,userMsg); HandlerDispatch.getHandlerDispatch().addWorker(workerThread);