zoukankan      html  css  js  c++  java
  • [java] 线程池

    简单线程池的设计

    一个典型的线程池,应该包括如下几个部分:
    1、线程池管理器(ThreadPool),用于启动、停用,管理线程池
    2、工作线程(WorkThread),线程池中的线程
    3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行
    4、请求队列(RequestQueue),用于存放和提取请求
    5、结果队列(ResultQueue),用于存储请求执行后返回的结果

    线程池管理器,通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得人物,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。

    java代码:

    线程池基本代码:

    import com.google.common.collect.Queues;
    import com.google.common.util.concurrent.Monitor;
    import org.apache.commons.collections.CollectionUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class SimpleThreadPool extends Thread {
    
        private final Logger logger = LoggerFactory.getLogger(SimpleThreadPool.class);
    
        /**
         * 工作线程列表
         */
        private BlockingQueue<WorkerEntry> workers;
    
        /**
         * 工作线程的数量
         */
        private AtomicInteger workerCounter = new AtomicInteger(0);
    
        /**
         * 初始线程ID
         */
        private int initWorkID = 0;
    
        /**
         * 初始线程队列长度
         */
        private final static int initWorkQueueSize = 3;
    
        private Monitor monitor = new Monitor();
    
        private Monitor.Guard guard = new Monitor.Guard(monitor) {
            @Override
            public boolean isSatisfied() {
                return CollectionUtils.isNotEmpty(workers);
            }
        };
    
        private Monitor.Guard other = new Monitor.Guard(monitor) {
            @Override
            public boolean isSatisfied() {
                return !CollectionUtils.isFull(workers);
            }
        };
    
        private int queueSize = initWorkQueueSize;
    
        public SimpleThreadPool() {
            this(initWorkQueueSize);
        }
    
        public SimpleThreadPool(int initWorkQueueSize) {
            this.queueSize = initWorkQueueSize;
            workers = Queues.newArrayBlockingQueue(queueSize);
        }
    
        public void run() {
            try {
                for (; ; ) {
                    try {
                        monitor.enterWhen(guard);
                        WorkerEntry workerEntry = workers.remove();
                        // 可运行的任务
                        if (WorkerStatus.RUNNABLE == workerEntry.getWorkerStatus()
                                && -1 != workerEntry.getWorkerId()) {
                            logger.info("运行任务 ID:{}", workerEntry.getWorkerId());
                            workerEntry.setWorkerStatus(WorkerStatus.RUNNING);
                            workerEntry.setDaemon(true);
                            workerEntry.start();
                        }
    
                        // 当前任务已经运行结束或者被取消
                        if (workerEntry.isDone()) {
                            logger.info("任务ID:{}已运行完成,当前状态是:{}",
                                    workerEntry.getWorkerId(), workerEntry.getWorkerStatus());
                        }
                        workerCounter.decrementAndGet();
    
                        // 如果遇到workId为-1,则标识清空队列
                        if (-1 == workerEntry.getWorkerId()) {
                            destroy();
                            break;
                        }
                    } finally {
                        monitor.leave();
                    }
                }
            } catch (Exception ex) {
                logger.error("{}", ex);
                destroy();
            }
        }
    
        public void destroy() {
            while (CollectionUtils.isNotEmpty(workers)) {
                WorkerEntry workerEntry = workers.remove();
                workerCounter.decrementAndGet();
                logger.info("任务ID:{}已销毁,当前状态是:{}",
                        workerEntry.getWorkerId(), workerEntry.getWorkerStatus());
            }
        }
    
        public void addWorker(Runnable runnable) {
            WorkerEntry workerEntry = new WorkerEntry(runnable, WorkerStatus.RUNNABLE, initWorkID++);
            try {
                monitor.enterWhen(other);
                workers.offer(workerEntry);
                workerCounter.incrementAndGet();
            } catch (InterruptedException e) {
            } finally {
                monitor.leave();
            }
        }
    
        public void putDeathWorker() {
            workers.add(new WorkerEntry(null, WorkerStatus.RUNNABLE, -1));
        }
    
        public int getQueueSize() {
            return queueSize;
        }
    
        public static void main(String[] args) throws InterruptedException {
            SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
            simpleThreadPool.addWorker(new Runnable() {
                @Override
                public void run() {
                    int index = 0;
                    try {
                        while (index++ < 10) {
                            System.out.println("当前线程---Tom");
                            sleep(100L);
                        }
                    } catch (Exception ex) {
                    }
                }
            });
    
            simpleThreadPool.addWorker(new Runnable() {
                @Override
                public void run() {
                    int index = 0;
                    while (index++ < 10) {
                        try {
                            sleep(100L);
                            System.out.println("当前线程---Toms");
                        } catch (Exception ex) {
    
                        }
                    }
                }
            });
    
    
            simpleThreadPool.start();
    
            simpleThreadPool.addWorker(new Runnable() {
                @Override
                public void run() {
                    int index = 0;
                    while (index++ < 10) {
                        try {
                            sleep(100L);
                            System.out.println("当前线程---Death");
                        } catch (Exception ex) {
    
                        }
                    }
                }
            });
            Thread.currentThread().sleep(1000L);
    
            simpleThreadPool.putDeathWorker();
        }
    }
    
    import org.apache.commons.lang3.builder.ToStringBuilder;
    import org.apache.commons.lang3.builder.ToStringStyle;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class WorkerEntry extends Thread {
    
        private final Logger logger = LoggerFactory.getLogger(WorkerEntry.class);
    
        /**
         * 实际任务列表
         */
        private Runnable runnable;
    
        /**
         * 工作状态
         */
        private WorkerStatus workerStatus;
    
        /**
         * 线程ID
         */
        private int workerId;
    
        public WorkerEntry() {
        }
    
        public WorkerEntry(Runnable runnable, WorkerStatus workerStatus, int workerId) {
            this.runnable = runnable;
            this.workerStatus = workerStatus;
            this.workerId = workerId;
        }
    
        public Runnable getRunnable() {
            return runnable;
        }
    
        public void setRunnable(Runnable runnable) {
            this.runnable = runnable;
        }
    
        public WorkerStatus getWorkerStatus() {
            return workerStatus;
        }
    
        public void setWorkerStatus(WorkerStatus workerStatus) {
            this.workerStatus = workerStatus;
        }
    
        public int getWorkerId() {
            return workerId;
        }
    
        public void setWorkerId(int workerId) {
            this.workerId = workerId;
        }
    
        public boolean isDone() {
            return workerStatus == WorkerStatus.DONE
                    || workerStatus == WorkerStatus.SHUTDOWN
                    || workerStatus == WorkerStatus.EXCEPTION;
        }
    
        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }
    
        @Override
        public void run() {
            try {
                runnable.run();
                workerStatus = WorkerStatus.DONE;
            } catch (Exception ex) {
                logger.error("ID:{}运行异常", workerId, ex);
                workerStatus = WorkerStatus.EXCEPTION;
            }
        }
    }
    

      线程状态:

    import org.apache.commons.lang3.builder.ToStringBuilder;
    import org.apache.commons.lang3.builder.ToStringStyle;
    
    public enum WorkerStatus {
    
        RUNNABLE(-1, "可运行"), RUNNING(0, "运行中"), DONE(1, "运行结束"), SHUTDOWN(2, "取消"),
        EXCEPTION(-2, "运行异常");
        private final int code;
    
        private final String desc;
    
        WorkerStatus(int code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    
        public int getCode() {
            return code;
        }
    
        public String getDesc() {
            return desc;
        }
    
        @Override
        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
        }
    }
    

      

      

    参考文档:

    http://www.cnblogs.com/coser/archive/2012/03/10/2389264.html

  • 相关阅读:
    MFC 时钟 计算器 日期天数计算
    test10
    test9
    iOS 防止按钮快速点击造成多次响应的避免方法
    NSBundle读取图片 plist文件和txt文件
    按指定格式的子字符串,删除和分割字符串
    python批处理入门知识点
    命令行ffmpeg批量旋转视频
    NSData转化成十六进制字符串
    xcode里面使用Memory Leaks和Instruments检测内存泄漏
  • 原文地址:https://www.cnblogs.com/life91/p/4676619.html
Copyright © 2011-2022 走看看