zoukankan      html  css  js  c++  java
  • 高并发编程基础(线程池基础)

    线程池简单基础介绍:

    Executor:

      Executor是Java工具类,执行提交给它的Runnable任务。该接口提供了一种基于任务运行机制的任务提交方法,包括线程使用详细信息,时序等等。Executor通常用于替代创建多线程。

      提供一个execute(Runnable command)方法;我们一般用它的继承接口ExecutorService。里面就只有一个执行任务的接口,源码如下:

    public interface Executor {
    
        void execute(Runnable command);
    }

    ExecutorService:

      它是线程池定义的一个接口,继承自Executor。有两个实现类,分别为ThreadPoolExecutor,ScheduledThreadPoolExecutor。除了继承自父类的 execute 执行方法 ,自己还定义了一系列方法,其中有两个重载提交任务的方法 submit 方法,参数可以是 Runnable 或者是 Callable类型的。

    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);

      这里既然提到了Callable 那就先来看一下这了接口跟Runnable:

    Callable

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    Runnable:

    public interface Runnable {
        public abstract void run();
    }

    相同点:

    1. 两者都是接口;
    2. 两者都可用来编写多线程程序;
    3. 两者都需要调用Thread.start()启动线程;

    不同点:

    1. 两者最大的不同点是:实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;
    2. Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

    注意点:

      Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞主线程直到获取‘将来’结果;当不调用此方法时,主线程不会阻塞!

    Executors:

      是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池的工具类,有点类似与Arrays。Executors一共可以创建下面这四类线程池:

    1. newCacheThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    3. newScheduledThreadPool 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
    4. newSingleThreadExecutor 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    ThreadPool:

      合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

      线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。用线程池来管理的好处是,可以保证系统稳定运行,适用与有大量线程,高工作量的情景下使用,假如要展示1000张图片如果创建1000个线程去加载,系统肯定会死掉。用线程池就可以避免这个问题,可以用5个线程轮流执行,5个一组,执行完的线程不直接回收而是等待下次执行,这样对系统的开销就可以减小不少。

    public class T05_ThreadPool {
    	public static void main(String[] args) {
    		//创建一个5个线程的线程池
    		ExecutorService service = Executors.newFixedThreadPool(5);
    		for(int i=0;i<6;i++) {//往池子里仍了6个任务
    			service.execute(()->{//睡500毫秒后打印线程名
    			try {
    				TimeUnit.MILLISECONDS.sleep(500);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println(Thread.currentThread().getName());
    			});
    		}
    		//java.util.concurrent.ThreadPoolExecutor@119d7047[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    		System.out.println(service);
    		service.shutdown();//关闭线程池 等待任务都执行完再关闭
    		System.out.println(service.isTerminated());//false 任务是否都执行完
    		System.out.println(service.isShutdown());//true 是不是关闭? 关闭了不代表任务执行完。
    		//java.util.concurrent.ThreadPoolExecutor@119d7047[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    		System.out.println(service);
    		try {
    			TimeUnit.SECONDS.sleep(5);//睡5秒
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println(service.isTerminated());//true
    		System.out.println(service.isShutdown());//true
    		//java.util.concurrent.ThreadPoolExecutor@119d7047[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    		System.out.println(service);
    		
    	}
    }
    

    Futrue: 

      在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承thread类还是实现runnable接口,都无法保证获取到之前的执行结果。通过实现Callback接口,并用Future可以来接收多线程的执行结果。
            Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
            举个例子:比如去吃早点时,点了包子和凉菜,包子需要等3分钟,凉菜只需1分钟,如果是串行的一个执行,在吃上早点的时候需要等待4分钟,但是因为你在等包子的时候,可以同时准备凉菜,所以在准备凉菜的过程中,可以同时准备包子,这样只需要等待3分钟。那Future这种模式就是后面这种执行模式。
      
    public class T06_Future {
    
    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    		//FutureTask 区分 RunnableTask  也是实现了Runnable接口的
    		FutureTask<Integer> task = new FutureTask<>(()-> {
    			try {// 该任务将来会有个返回值是Integer类型  
    				TimeUnit.MILLISECONDS.sleep(500);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			return 1000;
    		}); // new Callable()
    		
    		new Thread(task).start();
    		
    		System.out.println(task.get());//阻塞
    		
    		//************************************
    		ExecutorService service = Executors.newFixedThreadPool(5);
    		Future<Integer> f = service.submit(()->{// callable
    			try {
    				TimeUnit.MILLISECONDS.sleep(500);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			return 1;
    		});
    		System.out.println(f.isDone());// 任务执行完没有啊?
    		System.out.println(f.get()); // 阻塞  1
    		System.out.println(f.isDone());
    	}
    }
    

    ParallerComputing(并行计算)

    newFixedThreadPool:(固定线程的线程池)

      测试例子:计算1-20W之间的质数数量,用一个线程的话计算时间会很长。我们可以用线程池来解决:

    public class T07_ParallerComputing {
    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    		long start = System.currentTimeMillis();
    		// 获取 1-200000的质数,只能被1跟自身整除
    		List<Integer> results = getPrime(1,200000);
    		long end = System.currentTimeMillis();
    		System.out.println(end - start);// 2000 左右
    		
    		final int cpuCoreNum =4;
    		
    		ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
    		MyTask task1 = new MyTask(1, 80000);
    		MyTask task2 = new MyTask(80001, 130000);
    		MyTask task3 = new MyTask(130001, 170000);
    		MyTask task4 = new MyTask(170001, 200000);
    		
    		start = System.currentTimeMillis();
    		Future<List<Integer>> submit1 = service.submit(task1);
    		Future<List<Integer>> submit2 = service.submit(task2);
    		Future<List<Integer>> submit3 = service.submit(task3);
    		Future<List<Integer>> submit4 = service.submit(task4);
    		
    		submit1.get();
    		submit2.get();
    		submit3.get();
    		submit4.get();
    		
    		end = System.currentTimeMillis();
    		System.out.println(end - start); // 800左右
    	}
    
    	static class MyTask implements Callable<List<Integer>> {
    		int startPos, endPos;
    
    		private MyTask(int startPos, int endPos) {
    			this.startPos = startPos;
    			this.endPos = endPos;
    		}
    
    		@Override
    		public List<Integer> call() throws Exception {
    			List<Integer> results = getPrime(startPos, endPos);
    			return results;
    		}
    	}
    
    	static boolean isPrime(int num) {// 是否是质数
    		for (int i = 2; i < num / 2; i++) {
    			if (num % i == 0)
    				return false;
    		}
    		return true;
    	}
    
    	static List<Integer> getPrime(int start, int end) {
    		List<Integer> results = new ArrayList<Integer>();
    		for (int i = start; i < end; i++) {
    			if (isPrime(i))
    				results.add(i);
    		}
    		return results;
    	}
    }
    

      两次执行结果大致为 2067  ,698 .这说明用线程池会大大提高计算效率。

    newCachedThreadPool:

      刚刚开始线程池里面没有线程,来一个任务启动一个线程,如果有空闲的线程就直接执行任务,没有空闲就另起一个线程,每个线程超过 60 秒的空闲时间,线程消失,可以自己指定生存时间。

    public class T08_CachedPool {
    
    	public static void main(String[] args) throws InterruptedException {
    		ExecutorService service = Executors.newCachedThreadPool();
    		//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    		System.out.println(service);
    		for(int i=0;i<2;i++) {
    			service.execute(()->{//睡500毫秒后打印线程名
    				try {
    					TimeUnit.MILLISECONDS.sleep(500);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println(Thread.currentThread().getName());
    				});
    		}
    		//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
    		System.out.println(service);
    		
    		TimeUnit.SECONDS.sleep(80);
    		
    		//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
    		System.out.println(service);
    	}
    }
    

    newSingleThreadExecutor:

      线程池里面只有一个线程,代码如下:

    public class T09_SingleThreadPool {
    
    	public static void main(String[] args) throws InterruptedException {
    		ExecutorService service = Executors.newSingleThreadExecutor();
    		for (int i = 0; i < 5; i++) {
    			final int j = i;
    			service.execute(() -> { 
    				System.out.println(j + " " + Thread.currentThread().getName());
    			});
    //			0 pool-1-thread-1
    //			1 pool-1-thread-1
    //			2 pool-1-thread-1
    //			3 pool-1-thread-1
    //			4 pool-1-thread-1
    		}
    	}
    }
    

      这个可以保证任务的先后执行顺序,打印出来的结果是按顺序的,且只有一个线程去执行。

    newScheduledThreadPool:定时器线程池

      以下小程序是启动后0秒开始执行,每隔500毫秒执行一次任务

    public class T09_SingleThreadPool {
    
    	public static void main(String[] args) throws InterruptedException {
    		  ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
    		  service.scheduleAtFixedRate(()->{//以固定的频率来执行任务
    			  try {
    				TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			  System.out.println(Thread.currentThread().getName());
    		  }, 0, 500, TimeUnit.MILLISECONDS);// 起始延迟多久后执行 ,每隔500毫秒执行一次,时间单位
    	}
    }
    

    newWorkStealingPool:精灵线程(守护线程,后台线程)

      任务窃取:线程池中每个线程都维护着自己的任务队列,当某一个线程队列执行空了,他会去另外的线程中去拿一个任务来执行,不用去分配:

    public class T11_WorkStealingPool {
    
    	public static void main(String[] args) throws InterruptedException, IOException {
    		ExecutorService service = Executors.newWorkStealingPool();
    		//查看CPU是几核 我这里是8  默认启动8个线程java.util.concurrent.ForkJoinPool@55f96302[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
    		System.out.println(Runtime.getRuntime().availableProcessors());
    		//java.util.concurrent.ForkJoinPool@55f96302[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
    		System.out.println(service);
    		
    		service.execute(new R(1000));
    		service.execute(new R(1000));
    		service.execute(new R(1000));
    		service.execute(new R(1000));//daemon 精灵线程Debug 可以查看
    		service.execute(new R(1000));
    		
    		//由于产生的是精灵线程(守护线程,后台线程),主线程不阻塞的话看不到输出
    		System.in.read();
    	}
    	static class R implements Runnable{
    		
    		int time;
    		
    		public R(int time) {
    			this.time =time;
    		}
    
    		@Override
    		public void run() {
    			try {
    				TimeUnit.MILLISECONDS.sleep(time);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			  System.out.println(time +" "+ Thread.currentThread().getName());
    		}
    	}
    }
    

       newWorkStealingPool的实现是ForkJoinPool;

    ForkJoinPool :

      ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

      下面例子是从 0 加到 100W .:

    public class T12_ForkJoinPool {
        
        static int[] nums= new int[1000000];
        static final int MAX_NUM =50000;
        static Random r=new Random();
        
        static {
            for(int i=0;i<nums.length;i++) {
                nums[i] = r.nextInt(100);
            }
            System.out.println(Arrays.stream(nums).sum());
        }
        //无返回值
        static class AddTask extends RecursiveAction{
            int start, end;
    
            private AddTask(int start, int end) {
                this.start = start;
                this.end = end;
            }
            @Override
            protected void compute() {
                if(end -start <MAX_NUM) {
                    long sum =0L;
                    for(int i=start;i<end ;i++) sum += nums[i];
                    System.out.println("from "+start +" to "+end+" = "+sum);
                }else {
                    int middle =start +(end-start)/2;
                    AddTask task1 = new AddTask(start, middle);
                    AddTask task2 = new AddTask(middle, end);
                    task1.fork();
                    task2.fork();
                }
                
            }
            
        }
            //有返回值
        static class AddTask2 extends RecursiveTask<Long>{
            int start, end;
            
            private AddTask2(int start, int end) {
                this.start = start;
                this.end = end;
            }
            @Override
            protected Long compute() {
                if(end -start <MAX_NUM) {
                    long sum =0L;
                    for(int i=start;i<end ;i++) sum += nums[i];
                    return sum;
                } 
                    int middle =start +(end-start)/2;
                    AddTask2 task1 = new AddTask2(start, middle);
                    AddTask2 task2 = new AddTask2(middle, end);
                    task1.fork();//启动新线程
                    task2.fork();
                    return task1.join() + task2.join();
                
            }
            
        }
        public static void main(String[] args) throws IOException {
            ForkJoinPool fjp = new ForkJoinPool();
            
    //        AddTask task = new AddTask(0, nums.length);
    //        fjp.execute(task);
            AddTask2 task2 = new AddTask2(0, nums.length);
            fjp.execute(task2);
            long result= task2.join();//阻塞的
            System.out.println(result);
            System.in.read();
        }
    
    }

    ThreadPoolExecutor:

      在上诉讲到的线程池中 :newFixedThreadPool ,newCachedThreadPool,newSingleThreadExecutor,newScheduledThreadPool 的底层实现中都是用到了 ThreadPoolExecutor 来创建线程池。而newWorkStealingPool 的底层用的是 ForkJoinPool ,ForkJoinPool是1.8以后才加入的。

    简单看一下几种线程池构造函数的简单实现:

    newFixedThreadPool :new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 指定的线程数,最大线程数,多长时间空闲消失,时间单位,队列。 0L代表永远不会消失。

    newCachedThreadPool:new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

    newSingleThreadExecutor:new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

    newScheduledThreadPool: super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());

    newWorkStealingPool:new ForkJoinPool (Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)第一个是CPU核数

     

  • 相关阅读:
    EcShop二次开发学习方法
    [ 产品经理 ] 互联网产品经理常用软件及工作平台
    Tengine – Nginx衍生版
    把PHP大牛记下来,方便以后关注
    看了极光推送技术原理的几点思考
    centos磁盘满了,查找大文件并清理
    LNMP一键安装包 PHP自动升级脚本
    微信红包系统设计 & 优化
    程序员每天每周每月每年该做的事
    php中$_REQUEST、$_POST、$_GET的区别和联系小结
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/9928639.html
Copyright © 2011-2022 走看看