zoukankan      html  css  js  c++  java
  • JAVA实现一个低性能的WEB服务器(一)——线程池

     参考资料

    1. 《Java并发编程的艺术》
    2. 《深入理解JAVA虚拟机》
    3. GitHub:https://github.com/libinkai/eagle

     目标

      自己实现一个线程池用于执行多线程任务,并且可以平滑关闭(当准备关闭线程池时,不接受新的任务,等待已提交的任务执行完毕之后再关闭线程池)

     线程池V1.0

      有问题的一个线程池,当关闭线程池时,程序并没有按预期退出。原因是线程死锁,借此记录简单的线程死锁排查方法

      线程池接口

       

    public interface EagleThreadPool<Job extends Runnable> {
        // 执行一个Job
        void execute(Job job);
    
        // 获取线程池当前待执行任务数目
        int getJobSize();
    
        // 添加工作线程
        int addWorkers(int num);
    
        // 减少工作线程
        int removeWorkers(int num);
    
        // 关闭线程池
        void shutdown();
    }
    线程池接口定义

      线程池接口实现

    @Slf4j
    public class DefaultThreadPoolLocked<Job extends Runnable> implements EagleThreadPool<Job> {
        // 线程池工作者数目
        private static final int maxWorkerNumber = 16;
        private static final int defaultWorkerNumber = 4;
        private static final int minWorkerNumber = 1;
        // 工作者编号(线程名)
        private AtomicInteger workerId = new AtomicInteger();
        // 工作者队列
        private final List<Worker> workerList = new LinkedList<>();
        // 工作任务队列
        private final List<Job> jobList = new LinkedList<>();
        // 是否接受新的任务
        private volatile boolean isWorking = true;
    
        // 工作者内部类
        class Worker implements Runnable {
            private volatile boolean isRunning = true;
            private volatile boolean isHandling = false;
    
            @Override
            public void run() {
                while (isRunning) {
                    Job job = null;
                    synchronized (jobList) {
                        while (isRunning && jobList.isEmpty()) {
                            try {
                                // 当前线程在jobList上等待,问题的根源
                                jobList.wait();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                        // 取一个任务
                        job = jobList.remove(0);
                    }
                    if (job != null) {
                        try {
                            isHandling = true;
                            job.run();
                        } catch (Exception e) {
                        } finally {
                            isHandling = false;
                        }
                    }
                }
            }
    
            public void close() {
                this.isRunning = false;
            }
        }
    
        public DefaultThreadPoolLocked() {
            initWorkers(defaultWorkerNumber);
        }
    
        public DefaultThreadPoolLocked(int initialWorkerNumber) {
            initWorkers(initialWorkerNumber);
        }
    
        // 初始化工作者线程
        public int initWorkers(int num) {
            if (num < minWorkerNumber) {
                num = minWorkerNumber;
            }
            int freeCapacity = maxWorkerNumber - workerList.size();
            if (num >= freeCapacity) {
                num = freeCapacity;
            }
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workerList.add(worker);
                Thread thread = new Thread(worker, "Worker-" + workerId.incrementAndGet());
                thread.start();
            }
            return num;
        }
    
        @Override
        public void execute(Job job) {
            if (isWorking && job != null) {
                synchronized (jobList) {
                    jobList.add(job);
                    jobList.notify();
                }
            } else {
                log.debug("thread pool is waiting to close or job is null");
            }
        }
    
        @Override
        public int getJobSize() {
            return jobList.size();
        }
    
        @Override
        public int addWorkers(int num) {
            synchronized (jobList) {
                return initWorkers(num);
            }
        }
    
        @Override
        public int removeWorkers(int num) {
            int count = 0;
            synchronized (jobList) {
                for (int i = 0; i < num; i++) {
                    Worker worker = workerList.get(i);
                    if (!worker.isHandling) {
                        worker.close();
                        count++;
                    }
                }
            }
            return count;
        }
    
        @Override
        public void shutdown() {
            isWorking = false;
            while (!jobList.isEmpty()) {
                log.debug("sorry, jobList is not null, jobList size :{}, waiting to close", jobList.size());
            }
            for (Worker worker : workerList) {
                worker.close();
            }
        }
    
        // 测试线程池
        public static void main(String[] args) {
            DefaultThreadPoolLocked defaultThreadPool = new DefaultThreadPoolLocked();
            int count = 1000;
            while (count > 0) {
                int finalCount = count;
                defaultThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        int flag = 10;
                        while (flag > 0) {
                            log.debug("job{} say {}", finalCount, flag);
                            flag--;
                        }
                        log.debug("job{} done", finalCount);
                    }
                });
                if (count == 500) {
                    defaultThreadPool.shutdown();
                }
                count--;
            }
        }
    }
    线程池实现

      发现&解决问题

      程序关闭线程池,已提交任务执行完毕之后,JVM并没有退出。而JVM规定当JVM中不存在非守护线程时,JVM退出,故程序可能出现了死锁

      

      执行jps,查看所有的HotSpot进程,注意是“进程”

      

       执行jstack 4936 (4936为jps结果中DefaultThreadPoolLocked的进程号),查看进程堆栈信息

      

       发现存在工作者线程一直在任务队列上等待,故使用超时等待方案或许可以解决问题

      线程池接口实现改进

    package com.equator.eagle.threadpool;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Author: Equator
     * @Date: 2019/12/4 20:39
     **/
    
    @Slf4j
    public class DefaultThreadPool<Job extends Runnable> implements EagleThreadPool<Job> {
        // 线程池工作者数目
        private static final int maxWorkerNumber = 16;
        private static final int defaultWorkerNumber = 4;
        private static final int minWorkerNumber = 1;
        // 工作者编号(线程名)
        private AtomicInteger workerId = new AtomicInteger();
        // 工作者队列
        private final List<Worker> workerList = new LinkedList<>();
        // 工作任务队列
        private final List<Job> jobList = new LinkedList<>();
        // 是否接受新的任务
        private volatile boolean isWorking = true;
    
        // 工作者内部类
        class Worker implements Runnable {
            private volatile boolean isRunning = true;
            private volatile boolean isHandling = false;
    
            @Override
            public void run() {
                while (isRunning) {
                    Job job = null;
                    synchronized (jobList) {
                        while (isRunning && jobList.isEmpty()) {
                            try {
                                // 超时等待
                                jobList.wait(1000);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                        // 取一个任务,这里需要判断一下任务队列是否为空,否则会发生NPE(空指针异常)
                        if (jobList.size() > 0) {
                            job = jobList.remove(0);
                        }
                    }
                    if (job != null) {
                        try {
                            isHandling = true;
                            job.run();
                        } catch (Exception e) {
                        } finally {
                            isHandling = false;
                        }
                    }
                }
            }
    
            public void close() {
                this.isRunning = false;
            }
        }
    
        public DefaultThreadPool() {
            initWorkers(defaultWorkerNumber);
        }
    
        public DefaultThreadPool(int initialWorkerNumber) {
            initWorkers(initialWorkerNumber);
        }
    
        public int initWorkers(int num) {
            int freeCapacity = maxWorkerNumber - workerList.size();
            if (num >= freeCapacity) {
                num = freeCapacity;
            }
            if (num < minWorkerNumber) {
                num = minWorkerNumber;
            }
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workerList.add(worker);
                Thread thread = new Thread(worker, "Worker-" + workerId.incrementAndGet());
                thread.start();
            }
            return num;
        }
    
        @Override
        public void execute(Job job) {
            if (isWorking && job != null) {
                synchronized (jobList) {
                    jobList.add(job);
                    jobList.notify();
                }
            } else {
                log.debug("thread pool is waiting to close or job is null");
            }
        }
    
        @Override
        public int getJobSize() {
            return jobList.size();
        }
    
        @Override
        public int addWorkers(int num) {
            synchronized (jobList) {
                return initWorkers(num);
            }
        }
    
        @Override
        public int removeWorkers(int num) {
            int count = 0;
            synchronized (jobList) {
                for (int i = 0; i < num; i++) {
                    Worker worker = workerList.get(i);
                    if (!worker.isHandling) {
                        worker.close();
                        count++;
                    }
                }
            }
            return count;
        }
    
        @Override
        public void shutdown() {
            isWorking = false;
            while (!jobList.isEmpty()) {
                log.debug("sorry, jobList is not null, jobList size :{}, waiting to close", jobList.size());
            }
            for (Worker worker : workerList) {
                worker.close();
            }
        }
    
        public static void main(String[] args) {
            DefaultThreadPool defaultThreadPool = new DefaultThreadPool();
            int count = 1000;
            while (count > 0) {
                int finalCount = count;
                defaultThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        int flag = 10;
                        while (flag > 0) {
                            log.debug("job{} say {}", finalCount, flag);
                            flag--;
                        }
                        log.debug("job{} done", finalCount);
                    }
                });
                if (count == 500) {
                    defaultThreadPool.shutdown();
                }
                count--;
            }
        }
    }
    线程池接口实现改进版

      

       至此,线程池就可以正常工作啦!

      

  • 相关阅读:
    Gogh-位图编纂次序
    Skype 1.4 for Linux 掉掉更新
    Jokosher 0.9
    [推荐]实用网址大全
    创建异形窗口[2]
    创建异形窗口[3]
    动态列表
    给系统菜单添加菜单项
    演示控件的 Anchors 属性
    使用 WM_NCHITTEST 消息判断鼠标所在窗口的部位
  • 原文地址:https://www.cnblogs.com/Libinkai/p/11987533.html
Copyright © 2011-2022 走看看