zoukankan      html  css  js  c++  java
  • 多线程与高并发8之线程池

    (1)Executor接口   执行器 执行Runnable 任务的定义和运行可以分开可。 不像new Thread(Runnable).start()

    public interface Executor {
    void execute(Runnable command);
    }

      (2) ExecutorService 继承Executor 除了执行一个任务外,还完善了整个执行任务的生命周期,线程池是在ExecutorService基础上实现的。

    有submit方法。  

    Future<T> submit(Callable<T> task);

    Future<T> submit(Runnable task, T result);      

    Future<?> submit(Runnable task);

    (3)Callable   与Runnable相比有返回值

    public interface Callable<V> {
    V call() throws Exception;
    }

    (4) Future 代表执行结果 将Callable任务交给线程池执行,执行完后返回的结果。
    public class T03_Callable {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Callable<String> c = new Callable() {
                @Override
                public String call() throws Exception {
                    return "Hello Callable";
                }
            };
    
            ExecutorService service = Executors.newCachedThreadPool();
            Future<String> future = service.submit(c); //异步 主线程继续往下执行。
    
            System.out.println(future.get());// get方法 是阻塞的 
      service.shutdown(); } }

    (5)FutureTask 既是一个future 又是一个task  实现了这两个接口

    public FutureTask(Callable<V> callable) 
    public FutureTask(Runnable runnable, V result) 


    FutureTask<Integer> task = new FutureTask<>(()->{
                TimeUnit.MILLISECONDS.sleep(500);
                return 1000;
            }); //new Callable () { Integer call();}   创建一个任务
            
            new Thread(task).start();
            
            System.out.println(task.get()); //阻塞  任务返回结果

    (6)CompletableFuture 并行执行多个任务 并拿到所有结果后产生一个结果  是一个管理

    public class T06_00_sync_wait_notify {
        public static void main(String[] args) {
            final Object o = new Object();
    
            char[] aI = "1234567".toCharArray();
            char[] aC = "ABCDEFG".toCharArray();
    
            new Thread(()->{
                synchronized (o) {
                    for(char c : aI) {
                        System.out.print(c);
                        try {
                            o.notify();
                            o.wait(); //让出锁
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    o.notify(); //必须,否则无法停止程序
                }
    
            }, "t1").start();
    
            new Thread(()->{
                synchronized (o) {
                    for(char c : aC) {
                        System.out.print(c);
                        try {
                            o.notify();
                            o.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    o.notify();
                }
            }, "t2").start();
        }
    }
    
    //如果我想保证t2在t1之前打印,也就是说保证首先输出的是A而不是1,这个时候该如何做?

    线程池:为什么要引入线程池
    当我们需要的并发执行线程数量很多时,且每个线程执行很短的时间就结束了,这样,我们频繁的创建、销毁线程就大大降低了工作效率(创建和销毁线程需要时间、资源)。
    java中的线程池可以达到这样的效果:一个线程执行完任务之后,继续去执行下一个任务,不被销毁,这样线程利用率提高了。

    ThreadPoolExecutor类是java线程池中的核心类。他的实现方式有四种:

    public class ThreadPoolExecutor extends AbstractExecutorService {
      public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
      }
     
       public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        threadFactory, defaultHandler);
      }
     
      public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), handler);
      }
     
      public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
          maximumPoolSize <= 0 ||
          maximumPoolSize < corePoolSize ||
          keepAliveTime < 0)
          throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
          throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    corePoolSize:核心线程数的大小。永远不会归还给操作系统。;线程池创建之后不会立即去创建线程,而是等待线程的到来。当前执行的线程数大于该值是,线程会加入到缓冲队列;aximumPoolSize:线程池中创建的最大线程数;如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务  
    keepAliveTime:空闲的线程多久时间后被销毁。默认情况下,该值在线程数大于corePoolSize时,对超出corePoolSize值得这些线程起作用
    unit:TimeUnit枚举类型的值,代表keepAliveTime时间单位,可以取下列值:
    TimeUnit.DAYS; //天
      TimeUnit.HOURS; //小时
      TimeUnit.MINUTES; //分钟
      TimeUnit.SECONDS; //秒
      TimeUnit.MILLISECONDS; //毫秒
      TimeUnit.MICROSECONDS; //微妙
      TimeUnit.NANOSECONDS; //纳秒
    workQueue:阻塞队列,用来存储等待执行的任务,决定了线程池的排队策略,有以下取值:
      ArrayBlockingQueue;  任务数有限
      LinkedBlockingQueue;  无界队列,最大值是Integer.MAXVALUE
      SynchronousQueue;  0 有任务必须被处理,其他任务只能等待
      threadFactory:线程工厂,是用来创建线程的。默认new Executors.DefaultThreadFactory();-------生产线程的  

     DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }

    handler:线程拒绝策略。当创建的线程超出maximumPoolSize,且缓冲队列已满时,新任务会拒绝,有以下取值:
      ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
      ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
      ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
      ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务--main线程

    import java.io.IOException;
    import java.util.concurrent.*;
    
    public class T05_00_HelloThreadPool {
    
        static class Task implements Runnable {
            private int i;
    
            public Task(int i) {
                this.i = i;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " Task " + i);
                try {
                    System.in.read();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public String toString() {
                return "Task{" +
                        "i=" + i +
                        '}';
            }
        }
    
        public static void main(String[] args) {
            ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
                    60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(4),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());//new ThreadPoolExecutor.DiscardOldestPolicy  --当第7个线程来时 扔掉第三个线程
    
            for (int i = 0; i < 8; i++) {
                tpe.execute(new Task(i));  //两个任务时 核心线程数满了  第三个往后加入任务队列  任务队列也满了 第7、8个启动新线程处理 又满了。执行拒绝策略(第七个参数)
            }
    
            System.out.println(tpe.getQueue());
    
            tpe.execute(new Task(100));
    
            System.out.println(tpe.getQueue());
    
            tpe.shutdown();
        }
    }

    Excutors 线程池的工厂:

    newSingleThreadExecutor: 线程池里只有一个线程。 为啥有单线程的线程池呢?1 线程池是有任务队列的。 2 线程的生命周期管理。

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,   //0L 核心线程只有一个 所以keepalivrtime=0
                                        new LinkedBlockingQueue<Runnable>())); //任务队列(无界)。。
        }
    public class T07_SingleThreadPool {
        public static void main(String[] args) {
            ExecutorService service = Executors.newSingleThreadExecutor();
            for(int i=0; i<5; i++) {
                final int j = i;
                service.execute(()->{
                    
                    System.out.println(j + " " + Thread.currentThread().getName());
                });
            }
                
        }
    }

    newCachedThreadPool 来一个任务启动一个线程  前提是线程池里不存在还没有达到60s回收时间未被回收的线程,否则用现有的线程。

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());  //用的是SynchronousQueue 所以不存在任务队列 必须有线程去执行,否则提交任务的线程就阻塞了。 
        }
    public class T08_CachedPool {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService service = Executors.newCachedThreadPool();
            System.out.println(service);
            for (int i = 0; i < 2; i++) {
                service.execute(() -> {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                });
            }
            System.out.println(service);
            
            TimeUnit.SECONDS.sleep(80);
            
            System.out.println(service);
            
            
        }
    }

    new FixedThreadPool  与CachedThreadPool选择: 任务来的忽高忽低,不能确定任务量的稳定,用CachedThreadPool。如果任务量很平稳,可以预算出线程数的多少,用FixedThreadPool 。

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,   //核心数和最大线程数一致
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class T09_FixedThreadPool {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            long start = System.currentTimeMillis();
            getPrime(1, 200000); 
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            
            final int cpuCoreNum = 4;
            
            ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
            
            MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
            MyTask t2 = new MyTask(80001, 130000);
            MyTask t3 = new MyTask(130001, 170000);
            MyTask t4 = new MyTask(170001, 200000);
            
            Future<List<Integer>> f1 = service.submit(t1);
            Future<List<Integer>> f2 = service.submit(t2);
            Future<List<Integer>> f3 = service.submit(t3);
            Future<List<Integer>> f4 = service.submit(t4);
            
            start = System.currentTimeMillis();
            f1.get();
            f2.get();
            f3.get();
            f4.get();
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
        
        static class MyTask implements Callable<List<Integer>> {
            int startPos, endPos;
            
            MyTask(int s, int e) {
                this.startPos = s;
                this.endPos = e;
            }
            
            @Override
            public List<Integer> call() throws Exception {
                List<Integer> r = getPrime(startPos, endPos);
                return r;
            }
            
        }
        
        static boolean isPrime(int num) {
            for(int i=2; i<=num/2; i++) {
                if(num % i == 0) return false;
            }
            return true;
        }
        
        static List<Integer> getPrime(int start, int end) {
            List<Integer> results = new ArrayList<>();
            for(int i=start; i<=end; i++) {
                if(isPrime(i)) results.add(i);
            }
            
            return results;
        }
    }

    ScheduledThreadPoolExecutor--执行定时任务时,一般用定时任务框架 quartz---https://www.cnblogs.com/kyleinjava/p/10432168.html

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());  //用delayedworkqueue 
        }
    public class T10_ScheduledPool {
        public static void main(String[] args) {
            ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
            service.scheduleAtFixedRate(()->{
                try {
                    TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }, 0, 500, TimeUnit.MILLISECONDS);
            
        }
    }

     

  • 相关阅读:
    CodeForcesGym 100517B Bubble Sort
    CodeForcesGym 100517H Hentium Scheduling
    BZOJ 1208: [HNOI2004]宠物收养所
    BZOJ 1503: [NOI2004]郁闷的出纳员
    BZOJ 1588: [HNOI2002]营业额统计
    sublime 3 user Settings
    sublime 3 注册码
    Why does this json4s code work in the scala repl but fail to compile?
    cat 显示指定行
    Spark Kill Application
  • 原文地址:https://www.cnblogs.com/zdcsmart/p/12570802.html
Copyright © 2011-2022 走看看