zoukankan      html  css  js  c++  java
  • 自己动手写线程池

    一、线程池源码如下

    1、阻塞任务队列 BlockingQueue

    public interface BlockingQueue<E> {
      boolean offer(E e);
      public E take();
    }

    阻塞任务队列实现类  LinkedBlockingQueue

    import java.util.concurrent.atomic.AtomicInteger;

    public class LinkedBlockingQueue<E> implements BlockingQueue<E> {
      private final AtomicInteger count = new AtomicInteger();
      private final int capacity;

      transient Node<E> head;
      private transient Node<E> last;

      public LinkedBlockingQueue(int capacity) {
        this.capacity = capacity;
        last = head = new Node<E>(null);
      }
      //向阻塞队列的尾部添加一个任务
      @Override
      public boolean offer(E e) {
        //如果已达到队列的容量,则拒接添加,返回false
        if(count.get() == capacity) {
          return false;
        }
        Node<E> node = new Node<E>(e);
        last = last.next = node;
        count.getAndIncrement();
        return true;
      }

      //从阻塞队列的头部取出一个任务
      @Override
      public E take() {
        Node<E> h = head.next;
        if(h == null) {
          return null;
        }
        E x = h.item;
        head = h;
        return x;
      }

      static class Node<E>{
        E item;
        Node<E> next;

        Node(E x){
          item = x;
        }
      }
    }

    2、创建线程的工厂  ThreadFactory

    public interface ThreadFactory {
      Thread newThread(Runnable r);
    }

    创建线程的工厂实现类    MyThreadFactory

    import java.util.concurrent.atomic.AtomicInteger;

    public class MyThreadFactory implements ThreadFactory {
      private final AtomicInteger threadNumber = new AtomicInteger(1);

      @Override
      public Thread newThread(Runnable r) {
        Thread t = new Thread(r, "demo" + threadNumber.getAndIncrement());
        return t;
      }
    }

    3、线程池接口   ExecutorService

    public interface ExecutorService {
      void execute(Runnable command);
    }

    线程池实现类    ThreadPoolExecutor

    import java.util.HashSet;
    import java.util.concurrent.atomic.AtomicInteger;

    public class ThreadPoolExecutor implements ExecutorService{
      private final HashSet<Worker> workers = new HashSet<Worker>();
      private static final AtomicInteger ctl = new AtomicInteger(0);

      private final BlockingQueue<Runnable> workQueue;
      private volatile ThreadFactory threadFactory;
      private volatile int corePoolSize;
      private volatile int maximumPoolSize;

      public ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   BlockingQueue<Runnable> workQueue,
                  ThreadFactory threadFactory) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
      }

      @Override
      public void execute(Runnable command) {
        //如果线程池当前线程数小于核心线程数,则调用addWorker方法增加一个工作线程
        if(ctl.get() < corePoolSize) {
          if(addWorker(command, true)) {
            return;
          }
        }
        //否则尝试将该任务添加到任务队列
        if(workQueue.offer(command)) {
          workQueue.offer(command);
        }else{//如果添加该任务到任务队列失败则说明任务队列已满,新开启线程执行该任务
          //如果新开启线程失败
          if(!addWorker(command, false)) {
            System.out.println("任务" + command + "被线程池拒绝");
          }
        }
      }

      private boolean addWorker(Runnable firstTask, boolean core) {
        int c = ctl.get();
        if(c >= (core ? corePoolSize : maximumPoolSize)) {
          return false;
        }
        ctl.compareAndSet(c, c+1);

        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        workers.add(w);
        t.start();

        return true;
      }

      private final class Worker implements Runnable{
        final Thread thread;
        Runnable firstTask;

        Worker(Runnable firstTask) {
          this.firstTask = firstTask;
          this.thread = threadFactory.newThread(this);
        }

        @Override
        public void run() {
          Runnable task = this.firstTask;
          while (task != null || (task = workQueue.take()) != null) {
            task.run();
            task = null;
          }
        }
      }
    }

    4、创建线程池的工具类   Executors

    public class Executors {
      public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, new LinkedBlockingQueue<Runnable>(2), new MyThreadFactory());
      }
    }

    二、测试线程池的功能

    public class ThreadTest {
      public static void main(String[] args) {
        /**
        * 1、使用Executors创建线程池
        */
        // ExecutorService executors = Executors.newFixedThreadPool(2);
        /**
        * 2、直接new ThreadPoolExecutor,并指定corePoolSize和maximumPoolSize不一样
        */
        ExecutorService executors = new ThreadPoolExecutor(2, 4,
              new LinkedBlockingQueue<Runnable>(3), new MyThreadFactory());

        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 111");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 222");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 333");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 444");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 555");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 666");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 777");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });
        executors.execute(new Runnable() {
          @Override
          public void run() {
            int i = 0;
            while(i++ < 5) {
              System.out.println(Thread.currentThread().getName() + "-- 888");
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }
          }
        });

        System.out.println("主函数结束");
      }
    }

    从输出结果可以看出,启动了4个线程,并且有线程执行了多个任务,有任务被拒绝

  • 相关阅读:
    SQL——UPDATE(改)
    SQL——INSERT INTO(增)
    SQL——SELECT(查)
    Python——raise引发异常
    Python——异常处理
    Python——多态、检查类型
    Python——继承
    Python——封装
    popitem()方法
    pop(D)方法
  • 原文地址:https://www.cnblogs.com/jiangwangxiang/p/9246770.html
Copyright © 2011-2022 走看看