zoukankan      html  css  js  c++  java
  • java多线程(7)实现一个线程池

    用到生产者--消费者模式

    一、测试类:

    package com.concurrent.chapter08;

    import java.util.concurrent.TimeUnit;

    /**
    * @description:
    * @author:
    * @create:
    **/

    public class ThreadPoolTest {


    public static void main(String[] args) throws InterruptedException {

    final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);

    for (int i = 0; i < 200; i++){

    threadPool.execute(() ->{

    try {
    TimeUnit.SECONDS.sleep(1);
    System.out.println(Thread.currentThread().getName() + " is running and done");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });

    }

    for (;;){

    System.out.println("getActiveCount: " + threadPool.getActiveCount());
    System.out.println("getQueueSize: " + threadPool.getQueueSize());
    System.out.println("getCoreSize: " + threadPool.getCoreSize());
    System.out.println("getMaxSize: " + threadPool.getMaxSize());
    System.out.println("=========================================");
    TimeUnit.SECONDS.sleep(1);
    }

    }
    }


    package com.concurrent.chapter08;

    import java.util.concurrent.TimeUnit;

    /**
    * @description:shutdown线程池测试类
    * @author:
    * @create:
    **/

    public class ThreadPoolTestShutdown {

    public static void main(String[] args) throws InterruptedException {

    //定义线程池
    //初始化线程数量为2,
    //核心线程数量为4,
    //最大线程数量为6,
    //任务队列最多容纳1000个任务
    final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);

    //定义20个任务并且提交到线程池
    for (int i = 0; i < 20; i++) {
    threadPool.execute(() -> {

    try {
    TimeUnit.SECONDS.sleep(1);
    System.out.println(Thread.currentThread().getName() + " is runnning and done.");

    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }


    TimeUnit.SECONDS.sleep(12);
    threadPool.shutdown();
    //Thread.currentThread().join();


    for (;;){

    System.out.println("getActiveCount: " + threadPool.getActiveCount());
    System.out.println("getQueueSize: " + threadPool.getQueueSize());
    System.out.println("getCoreSize: " + threadPool.getCoreSize());
    System.out.println("getMaxSize: " + threadPool.getMaxSize());
    System.out.println("=========================================");
    TimeUnit.SECONDS.sleep(1);
    }


    }
    }


    二、线程池实现类:
    package com.concurrent.chapter08;

    import java.util.ArrayDeque;
    import java.util.Queue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
    * @description: 初始化线程池
    * @author:
    * @create:
    **/

    public class BasicThreadPool extends Thread implements ThreadPool {

    //初始化线程数量
    private final int initSize;

    //线程池最大线程数
    private final int maxSize;

    //线程池核心线程数
    private final int coreSize;

    //当前活跃线程数
    private int activeCount;

    //线程工厂
    private final ThreadFactory threadFactory;

    //线程池是否已经shutdown
    private volatile boolean isShutdown = false;

    //任务队列
    private final RunnableQueue runnableQueue;

    //工作线程队列
    private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();

    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();

    private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

    private final long keepAliveTime;

    private final TimeUnit timeUnit;

    //构造时需要传递的参数:初始线程的数量,最大线程数,核心线程数,任务队列的最大数量
    public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize){

    this(initSize, maxSize, coreSize,
    DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);
    }

    //构造线程池
    public BasicThreadPool(int initSize, int maxSize, int coreSize,
    ThreadFactory threadFactory, int queueSize,
    DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit){

    this.initSize = initSize;
    this.maxSize = maxSize;
    this.coreSize = coreSize;
    this.threadFactory = threadFactory;
    this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy,this);
    this.keepAliveTime = keepAliveTime;
    this.timeUnit = timeUnit;
    this.init();
    }

    private void init() {

    start();

    for (int i = 0; i < initSize; i++){
    newThread();
    }
    }

    private void newThread(){

    //创建任务线程,并启动
    InternalTask internalTask = new InternalTask(runnableQueue);
    Thread thread = this.threadFactory.createThread(internalTask);
    ThreadTask threadTask = new ThreadTask(thread, internalTask);
    threadQueue.offer(threadTask);
    this.activeCount++;
    thread.start();

    }

    @Override
    public void execute(Runnable runnable) {

    if (this.isShutdown){
    throw new IllegalArgumentException("The thread pool is destory");

    }
    this.runnableQueue.offer(runnable);
    }

    private void removeThread(){

    ThreadTask threadTask = threadQueue.remove();
    threadTask.internalTask.stop();
    this.activeCount--;
    }

    @Override
    public void run(){

    //run 方法继承自thread,主要用于维护线程数量,比如扩容、回收等工作
    while (!isShutdown && !isInterrupted()){

    try {
    timeUnit.sleep(keepAliveTime);
    } catch (InterruptedException e) {
    isShutdown = true;
    break;
    }

    synchronized (this){

    if (isShutdown){

    break;
    }

    if (runnableQueue.size() > 0 && activeCount < coreSize){

    for (int i = initSize; i < coreSize; i++){
    newThread();
    }
    continue;
    }

    if (runnableQueue.size() > 0 && activeCount < maxSize){

    for (int i = coreSize; i < maxSize; i++){
    newThread();
    }
    }

    if (runnableQueue.size() == 0 && activeCount > coreSize){

    for (int i = coreSize; i < activeCount; i++){

    removeThread();
    }
    }
    }
    }
    }

    @Override
    public void shutdown() {

    synchronized (this){

    if (isShutdown) return;
    isShutdown = true;
    threadQueue.forEach(threadTask -> {

    threadTask.internalTask.stop();
    threadTask.thread.interrupt();
    });

    this.interrupt();
    }
    }

    @Override
    public int getInitSize() {

    if (isShutdown){
    throw new IllegalArgumentException("The thread pool is destory.");
    }

    return this.initSize;
    }

    @Override
    public int getMaxSize() {
    if (isShutdown){
    throw new IllegalArgumentException("The thread pool is destory.");
    }

    return this.maxSize;
    }

    @Override
    public int getCoreSize() {
    if (isShutdown){
    throw new IllegalArgumentException("The thread pool is destory.");
    }

    return this.coreSize;
    }

    @Override
    public int getQueueSize() {
    if (isShutdown){
    throw new IllegalArgumentException("The thread pool is destory.");
    }

    return runnableQueue.size();
    }

    @Override
    public int getActiveCount(){

    synchronized (this){
    return this.activeCount;
    }
    }

    @Override
    public boolean isShutdown() {
    return this.isShutdown;
    }

    private static class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger GROUT_COUNTER = new AtomicInteger(1);

    private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUT_COUNTER.getAndDecrement());

    private static final AtomicInteger COUNTER = new AtomicInteger(0);
    @Override
    public Thread createThread(Runnable runnable) {
    return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement());
    }
    }

    private class ThreadTask {

    Thread thread;
    InternalTask internalTask;

    public ThreadTask(Thread thread, InternalTask internalTask){
    this.thread = thread;
    this.internalTask = internalTask;
    }
    }
    }
    三、线程池接口:
    package com.concurrent.chapter08;

    /**
    * @description: 线程池的基本方法
    * @author:
    * @create:
    **/

    public interface ThreadPool {

    //提交任务到线程池
    void execute(Runnable runnable);

    //关闭线程池
    void shutdown();

    //获取线程池的初始大小
    int getInitSize();

    //获取线程池的最大线程数
    int getMaxSize();

    //获取线程池的核心线程数
    int getCoreSize();

    //获取线程池中用于缓存任务队列大小
    int getQueueSize();

    //获取线程池中活跃线程数
    int getActiveCount();

    //查看线程池是否已经被shutdown
    boolean isShutdown();
    }
    四、任务队列接口及实现类:
    package com.concurrent.chapter08;

    /**
    * @description: 任务队列
    * @author:
    * @create:
    **/

    public interface RunnableQueue {

    //当有新的任务进来时offer到队列中
    void offer(Runnable runnable);

    //工作线程通过take方法获取Runnable
    Runnable take() throws InterruptedException;

    //获取任务队列中任务的数量
    int size();
    }

    package com.concurrent.chapter08;

    import java.util.LinkedList;

    /**
    * @description: 任务队列实现类
    * @author:
    * @create:
    **/

    public class LinkedRunnableQueue implements RunnableQueue {

    //任务队列的最大容量,在构造时传入
    private final int limit;

    //任务队列满时的拒绝策略
    private final DenyPolicy denyPolicy;

    //存放任务的队列
    private final LinkedList<Runnable> runnableList = new LinkedList<>();

    private final ThreadPool threadPool;

    public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool){

    this.limit = limit;
    this.denyPolicy = denyPolicy;
    this.threadPool = threadPool;
    }


    @Override
    public void offer(Runnable runnable) {

    synchronized(runnableList){
    if (runnableList.size() >= limit){

    denyPolicy.reject(runnable, threadPool);
    }else {
    runnableList.addLast(runnable);
    runnableList.notifyAll();
    }
    }
    }

    @Override
    public Runnable take() throws InterruptedException {

    synchronized(runnableList){

    while (runnableList.isEmpty()){
    try {
    runnableList.wait();
    } catch (InterruptedException e) {
    throw e;
    }
    }
    }
    return runnableList.removeFirst();
    }

    @Override
    public int size() {

    synchronized (runnableList){

    return runnableList.size();
    }
    }
    }
    五、任务队列满时的拒绝策略接口类:
    package com.concurrent.chapter08;

    /**
    * @description: 任务队列满时的拒绝策略
    * @author:
    * @create:
    **/

    @FunctionalInterface
    public interface DenyPolicy {

    void reject(Runnable runnable, ThreadPool threadPool);

    //丢弃式拒绝
    class DiscardDenyPolicy implements DenyPolicy {
    @Override
    public void reject(Runnable runnable, ThreadPool threadPool) {
    //do nothing
    }
    }

    //抛出异常式拒绝
    class AbortDenyPolicy implements DenyPolicy {
    @Override
    public void reject(Runnable runnable, ThreadPool threadPool) {
    throw new RunnableDenyException("The runnable " + runnable + "will be abort.");
    }
    }

    //提交者线程中执行任务

    class RunnerDenyPolicy implements DenyPolicy {
    @Override
    public void reject(Runnable runnable, ThreadPool threadPool) {
    if (!threadPool.isShutdown()){
    runnable.run();
    }
    }
    }
    }
    六、线程池内部的创建线程工厂类:
    package com.concurrent.chapter08;

    /**
    * @description: 创建线程的接口
    * @author:
    * @create:
    **/

    @FunctionalInterface
    public interface ThreadFactory {

    Thread createThread(Runnable runnable);
    }
    七、线程池内线程执行类:
    package com.concurrent.chapter08;

    /**
    * @description: 从任务队列中取出某个runnable并执行run方法
    * @author:
    * @create:
    **/

    public class InternalTask implements Runnable{

    private final RunnableQueue runnableQueue;

    private volatile boolean running = true;

    public InternalTask(RunnableQueue runnableQueue){ this.runnableQueue = runnableQueue;}


    @Override
    public void run() {
    //如果当前任务为running并且没有被中断,则其将不断从queue中获取runnable,然后执行run方法
    while (running && !Thread.currentThread().isInterrupted()){

    try {
    Runnable task = runnableQueue.take();
    task.run();
    } catch (InterruptedException e) {
    running = false;
    break;
    }
    }
    }


    //停止当前任务, 主要会在线程池的shutdown方法中使用
    public void stop(){
    this.running = false;
    }


    }
    八、线程池的一个异常类:拒绝策略用到
    package com.concurrent.chapter08;

    /**
    * @description:通知任务提交者,任务队列无法接收新任务
    * @author:
    * @create:
    **/

    public class RunnableDenyException extends RuntimeException {

    public RunnableDenyException(String s) {
    super(s);
    }
    }


  • 相关阅读:
    WinCE数据库开发时整出来的一个致命的BUG
    WinCE下读取注册表获得SD卡路径
    Cookie的创建、读写和删除
    Http Module 介绍[转]
    贴几个从Dnt论坛代码里边扣出来的函数
    SqlServer判断数据库、表、存储过程、函数是否存在
    WinCE中C#WinForm利用Web Service查询数据库
    关于WinCE和PC中同一字符串的GetHashCode()结果不同的理解
    DataTable中数据记录的统计
    C#读取XML时自动过滤掉注释部分
  • 原文地址:https://www.cnblogs.com/herosoft/p/10754815.html
Copyright © 2011-2022 走看看