zoukankan      html  css  js  c++  java
  • wait/notify模拟线程池

    线程创建和销毁会消耗很多的资源,当我们创建线程时,会发现cpu利用率很高,为了节省资源的使用,使用线程池是一个比较好的选择,当有任务需要执行时,随机分配给一条线程去执行,也可以删除任务,获取任务数量等。下面使用springboot构建一个简单的线程池。

    自定义线程池

    package com.demo.bootdemo.threadpool;
    
    import java.util.LinkedList;
    import java.util.List;
    
    public class MyThreadPool {
        // 线程用于执行任务,所以要具有执行任务,添加线程,减少线程,获取任务数量,释放线程,定时执行任务等操作
        private LinkedList<Thread> threadPool = new LinkedList<Thread>();
        // 存放需要执行的任务
        private LinkedList<Job> jobs = new LinkedList<Job>();
        // 线程池容量
        private int capacity;
    
        public MyThreadPool() {
            capacity = 10;
            init();
        }
    
        public MyThreadPool(int capacity) {
            this.capacity = capacity;
            init();
        }
    
        /**
         * 初始化
         */
        private void init() {
            for (int i = 0; i < capacity; i++) {
                Thread th = new Thread(new MyRunnable(), "th_" + i);
                threadPool.add(th);
                th.start();
            }
        }
    
        /**
         * 执行单个任务
         * 
         * @param job
         */
        public void executeJob(Job job) {
            addJob(job);
        }
    
        /**
         * 批量执行任务
         * 
         * @param jobList
         */
        public void executeJobs(List<Job> jobList) {
            addJobs(jobList);
        }
    
        public void deleteJob(String jobKey) {
            synchronized (jobs) {
                Job delJob = null;
                for (Job j : jobs) {
                    if (jobKey.equals(j.getJobKey())) {
                        delJob = j;
                        break;
                    }
                }
                // 删除
                jobs.remove(delJob);
            }
        }
    
        private void addJobs(List<Job> jobList) {
            synchronized (jobs) {
                if (jobList != null && jobList.size() > 0) {
                    jobs.addAll(jobList);
                    jobs.notifyAll();
                }
            }
        }
    
        private void addJob(Job job) {
            synchronized (jobs) {
                jobs.add(job);
                jobs.notify();
            }
        }
    
        /**
         * 获取任务数量
         * 
         * @return
         */
        public int getJobSize() {
            return jobs.size();
        }
    
        private class MyRunnable implements Runnable {
            public void run() {
                // 任务列表中没有任务,则等待,否则取出任务执行
                while (true) {
                    Job job = null;
                    synchronized (jobs) {
                        if (jobs.isEmpty()) {
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        } else {
                            job = jobs.removeFirst();
                        }
                    }
                    if (job != null) {
                        job.execute();
                    }
                }
            }
        }
    }
    View Code

    Job接口

    package com.demo.bootdemo.threadpool;
    
    public abstract class Job {
    
        String jobKey;
    
        public String getJobKey() {
            return jobKey;
        }
    
        public void setJobKey(String jobKey) {
            this.jobKey = jobKey;
        }
        public abstract void execute();
    }
    View Code

    Job实现类,这里模仿了三种不同类型的Job, PrintJob,SayHelloJob和WriteFileJob

    PrintJob

    package com.demo.bootdemo.threadpool.myjob;
    
    import java.util.LinkedList;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.bootdemo.threadpool.Job;
    
    public class PrintJob extends Job {
        private Logger logger = LoggerFactory.getLogger(PrintJob.class);
        private LinkedList<String> ls = new LinkedList<String>();
    
        public PrintJob(String jobKey, LinkedList<String> ls) {
            this.setJobKey(jobKey);
            this.ls = ls;
        }
    
        @Override
        public void execute() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info(ls.toString() +", " + getJobKey());
        }
    
    }
    View Code

    SayHelloJob

    package com.demo.bootdemo.threadpool.myjob;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.bootdemo.threadpool.Job;
    
    public class SayHelloJob extends Job {
        private Logger logger = LoggerFactory.getLogger(SayHelloJob.class);
    
        public SayHelloJob(String jobKey) {
            this.setJobKey(jobKey);
        }
    
        @Override
        public void execute() {
            logger.info("Just say hello. " + getJobKey());
        }
    
    }
    View Code

    WriteFileJob

    package com.demo.bootdemo.threadpool.myjob;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.demo.bootdemo.threadpool.Job;
    
    public class WriteFileJob extends Job {
    
        private Logger logger = LoggerFactory.getLogger(WriteFileJob.class);
    
        public WriteFileJob(String jobKey) {
            this.setJobKey(jobKey);
        }
    
        @Override
        public void execute() {
            String fileName = "./" + System.currentTimeMillis();
            File f = new File(fileName);
            FileOutputStream fos = null;
            try {
                fos = new FileOutputStream(f);
                fos.write(String.valueOf(System.currentTimeMillis()).getBytes());
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (fos != null) {
                    try {
                        fos.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                logger.info(String.format("write file. fileName: %s", fileName) + ", " + getJobKey());
            }
        }
    
    }
    View Code

    配置类

    package com.demo.bootdemo.threadpool.properties;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ConfigurationProperties(prefix = "pool.thread")
    public class ThreadPoolProperties {
    
        private int count;
    
        public int getCount() {
            return count;
        }
    
        public void setCount(int count) {
            this.count = count;
        }
    
    }
    View Code

    配置文件applicaiton.properties

    pool.thread.count=5

    测试类入口

    package com.demo.bootdemo.threadpool.listeners;
    
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    
    import com.demo.bootdemo.threadpool.Job;
    import com.demo.bootdemo.threadpool.MyThreadPool;
    import com.demo.bootdemo.threadpool.myjob.PrintJob;
    import com.demo.bootdemo.threadpool.myjob.SayHelloJob;
    import com.demo.bootdemo.threadpool.myjob.WriteFileJob;
    import com.demo.bootdemo.threadpool.properties.ThreadPoolProperties;
    
    public class ThreadPoolListeners implements ApplicationListener<ContextRefreshedEvent> {
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            ThreadPoolProperties bean = event.getApplicationContext().getBean(ThreadPoolProperties.class);
            // 初始化线程池
            MyThreadPool pool = new MyThreadPool(bean.getCount());
            // 新建PrintJob任务
            LinkedList<String> ls = new LinkedList<String>();
            ls.add("a");
            ls.add("b");
            PrintJob printJob = new PrintJob("PrintJobKey000", ls);
    
            // 新建sayhellojob
            SayHelloJob sayHelloJob = new SayHelloJob("SayHelloJobKey000");
    
            // 新建writeFileJob
            WriteFileJob writeFileJob = new WriteFileJob("WriteFileJobKey000");
    
            List<Job> jobList = new ArrayList<>();
            jobList.add(printJob);
            jobList.add(sayHelloJob);
            jobList.add(writeFileJob);
    
            // 执行以上三个任务
            pool.executeJobs(jobList);
    
            jobList.clear();
            for (int i = 0; i < 10; i++) {
                sayHelloJob = new SayHelloJob("sayhellojobkey" + i);
                jobList.add(sayHelloJob);
            }
            pool.executeJobs(jobList);
    
            // 删除任务
            pool.deleteJob("sayhellojobkey7");
            pool.deleteJob("sayhellojobkey8");
    
            // 单独执行一个任务
            writeFileJob = new WriteFileJob("writeJobkey_alone");
            pool.executeJob(writeFileJob);
    
        }
    
    }
    View Code

    springboot启动类

    package com.demo.bootdemo;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import com.demo.bootdemo.threadpool.listeners.ThreadPoolListeners;
    
    @SpringBootApplication
    public class MythreadpoolApplication {
    
        public static void main(String[] args) {
            SpringApplication springApplication = new SpringApplication(MythreadpoolApplication.class);
            springApplication.addListeners(new ThreadPoolListeners());
            springApplication.run(args);
    
        }
    
    }
    View Code

    输出结果

      .   ____          _            __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )\___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::        (v2.1.1.RELEASE)
    
    20:39:18.382 [main] Starting MythreadpoolApplication on admin-PC with PID 27632 (D:Programseclipseworkplacespringbootmythreadpool	argetclasses started by admin in D:Programseclipseworkplacespringbootmythreadpool)
    20:39:18.385 [main] No active profile set, falling back to default profiles: default
    20:39:18.855 [th_3] Just say hello. SayHelloJobKey000
    20:39:18.855 [th_3] Just say hello. sayhellojobkey1
    20:39:18.855 [th_3] Just say hello. sayhellojobkey2
    20:39:18.855 [th_0] Just say hello. sayhellojobkey0
    20:39:18.855 [th_3] Just say hello. sayhellojobkey3
    20:39:18.855 [th_0] Just say hello. sayhellojobkey5
    20:39:18.855 [th_0] Just say hello. sayhellojobkey6
    20:39:18.855 [th_0] Just say hello. sayhellojobkey9
    20:39:18.855 [th_1] Just say hello. sayhellojobkey4
    20:39:18.858 [main] Started MythreadpoolApplication in 0.722 seconds (JVM running for 1.432)
    20:39:19.854 [th_4] [a, b], PrintJobKey000
    20:39:19.857 [th_2] write file. fileName: ./1558355958854, WriteFileJobKey000
    20:39:19.858 [th_0] write file. fileName: ./1558355958855, writeJobkey_alone

    从述打印日志看,sayhellojobkey7和sayhellojobkey8对应jobkey未打印出来,删除任务生效,所有任务都正常执行,两个WriteFileJob分别生成名称为1558355958854和1558355958855的文件,查看当前路径,也确实可以找到这样的文件。

    将本案例打包,放入linux执行,使用jstack查看所有线程状态

     

    这些线程基本上都处于WAITING状态,不具有锁,待Job任务被添加到集合中时,将唤醒这些线程,处理Job

  • 相关阅读:
    Google I/O 官方应用中的动效设计
    浪院长 | spark streaming的使用心得
    Kubernetes 1.12公布:Kubelet TLS Bootstrap与Azure虚拟机规模集(VMSS)迎来通用版本号
    安卓自己定义View进阶-Path基本操作
    2014编程之美初赛第二场
    2015年,即将结束
    查看JVM运行时参数
    使用ThreadPoolExecutor线程池实现并发操作并返回结果
    mysql数据库将查询的多条结果的某些字段合并为一个字段处理
    mysql数据库使用mybatis新增操作返回自增主键的值
  • 原文地址:https://www.cnblogs.com/qq931399960/p/10896339.html
Copyright © 2011-2022 走看看