zoukankan      html  css  js  c++  java
  • 线程池

    线程池的作用:
    1、减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务;
    2、可以根据系统的承受能力,调整线程池中工作线程的数据,防止因为消耗过多的内存导致服务器奔溃。
    使用线程池,要根据系统的环境情况,手动或自动设置线程数目。少了系统运行效率不高,多了系统拥挤、占用内存多。用线程池控制数量,其他线程排队等候。一个任务执行完毕,再从队列中取最前面的任务开始执行。若任务重没有等待任务,线程池浙一资源处于等待。当一个新任务需要运行,如果线程池中有等待的工作线程,就可以开始运行,否则进入等待队列。
    ThreadPoolExecutor

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    

    ThreadPoolExecutor类的另外三个构造器都是调用上面这个构造器进行初始化工作的。
    int corePoolSize:核心池的大小。创建线程池后,默认情况下,线程池中没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建线程池后,线程池钟的线程数为0,当有任务到来后就会创建一个线程去执行任务。
    maximumPoolSize:池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量,当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务,maxmumPoolSize表示的就是workQueue满了,线程池中最多可以创建的线程数量。
    keepAliveTime:表示线程没有任务执行时最多保持多久时间终止。默认情况下,只有当线程池中的数量大于corePoolSize时,才会起作用,直到线程池中的线程数不大于corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
    unit:参数keepAliveTime的时间单位,有七种取值:

    TimeUnit.DAYS; //天
    TimeUnit.HOURS; //小时
    TimeUnit.MINUTES; //分钟
    TimeUnit.SECONDS; //秒
    TimeUnit.MILLSECONDS; //毫秒
    TimeOut.MICROSECONDS; //微秒
    TimeOut.NANOSECONDS; //纳秒
    

    workQueue:一个阻塞队列,用来存储等待执行的任务。一般来说,这里的阻塞队列有一下几个选择:

    ArrayBlockingQueue; //基于数组的先进先出队列,此队列创建时必须指定大小
    LinkedBlockingQueue; //基于链表的先进先出队列,如果没有指定队列大小,默认Integer.MAX_VALUE
    SynchronousQueue; //不会保存提交的任务,直接创建一个线程来执行新来的任务
    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关
    

    ThreadFactory:线程工厂,主要用来创建线程

    class SimpleThreadFactory implements ThreadFactory {
       public Thread newThread(Runnable r) {
         return new Thread(r);
       }
    
    

    handler:当拒绝处理任务时的策略,有四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认这种拒绝策略)。 
    ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
    

    Executor
    线程池的重点不是类怎么用,而是在合适的场景下使用合适的线程池,所谓合适的线程池就是ThreadPoolExecutor的构造方法传入不同的参数,构造出不同的线程池,以满足实际需求。
    单线程线程池:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    示例:

    //单线程线程池
    /*	核心池大小--1 线程池容量--1 workqueue--LinkedBlockingQueue
     *     public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
     */
    public class ThreadPoolExectorJava {
    	public static void main(String[] args) {
    		//注意:这里返回的是ExecutorService接口,不是ThreadPoolExecutor
    		//从Executors得到的是ExecutorService或ScheduledExecutorService接口,而不是这两个接口的实现类
    		ExecutorService tpe = Executors.newSingleThreadExecutor();
    		tpe.execute(new ThreadPoolRunnable());
    	}
    }
    

    单线程线程池运行的线程数是1,选择无界的LinkedBlockingQueue。不管传入多少任务都排序,前面一个任务执行完毕,再执行队列中的线程。这里第二个参数maximumPoolSize是没有意义的,因为maximumPoolSize描述的排队的任务多过workQueue的容量,线程池中对多只能容纳maximumPoolSize个任务,现在workQueue是无界的,也就是说排队的任务永远不会多过workQueue的容量,maximumPoolSize设置成多少都没有意义。
    固定大小线程池:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    程序示例:

    //固定大小线程池
    public class ThreadPoolExectorJava {
    	public static void main(String[] args) {
    		ExecutorService tpe = Executors.newFixedThreadPool(3);
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    	}
    }
    

    固定大小的线程池和单线程的线程池差不多,无非是让线程池中能运行的线程手动制定了nThread。选择的LinkedBlockingQueue,所以maximumPoolSize是无意义的
    无界线程池:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    程序示例:

    //无界线程池
    public class ThreadPoolExectorJava {
    	public static void main(String[] args) {
    		ExecutorService tpe = Executors.newCachedThreadPool();
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    	}
    }
    

    无界线程池,就是不管多少任务提交进行,都直接运行。无界线程池采用SynchronousQueue,采用这个线程池就没有workQueue容量一说了,只要添加进去的线程就会被拿去用。无界线程池的线程数是没有上限的,所以以maximumPoolSize为主了,设置为一个近似无限大的Integer.MAX_VALUE。另,单线程线程池和固定大小线程池线程是不会自动回收的,也即是说保证提交进来的任务最后会被处理,但不保证什么时候处理。无界线程池是设置了回收时间的,由于corePoolSize为0,所以只要60秒没有被用到的线程都会被直接移除。
    workQueue:
      workQueue就是排队策略。排队策略描述的是当前线程大于corePoolSize时,线程以什么样的方式等待被运行。
      排队有三种策略:直接提交、有界队列、无界队列。JDK使用了无界队列LinkedBlockingQueue作为workQueue而不是有界队列ArrayBlockingQueue,尽管后者可以对资源进行控制,但是有界队列相比无界队列有三个缺点:
      1、使用有界队列,corePoolSize、maximumPoolSize两个参数势必要根据实际场景不断调整以求达到一个最佳,这会给开发带来极大的麻烦,必须经过大量的性能测试。所以干脆用无界,永远添加到队列中,不会溢出,自然maximumPoolSize就没啥用了,只需要根据系统处理能出调整corePoolSize就可以;
      2、放置业务突刺,尤其是在Web应用中,某些时候突然大量请求的到来。使用无界队列,不管什么时候,至少保证所有任务能够被处理,有界的超过maximumPoolSize的任务就可能被丢弃掉。
      3、有界队列大小和maximumPoolSize也需要相互折中,这又是一个难搞的地方。
    线程池容量的动态调整

    setCorePoolSize:设置核心池大小
    setMaximumPoolSize:设置线程池最大能创建的线程数目大小
    

    线程池的关闭

    shutdown(): 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但不再接受新任务
    shutdownNow(): 立即终止线程池,并尝试打算正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
    


    线程工厂接口ThreadFactory

    import java.util.concurrent.ThreadFactory;
    
    public class ThreadFactorJava {
    
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		//工厂类对象
    		MyThreadFactory mtf = new MyThreadFactory();
    		//线程类对象
    		FactoryRunnable tpr = new FactoryRunnable();
    		//创建线程
    		Thread t = mtf.newThread(tpr);
    		t.setName("线程一");
    		Thread t2 = mtf.newThread(tpr);
    		t2.setName("线程二");
    		//执行线程
    		t.start();
    		t2.start();
    	}
    
    }
    //创建类实现接口
    class MyThreadFactory implements ThreadFactory{	
    	public Thread newThread(Runnable r) {
    		int i = 0;
    		Thread  t = new Thread(r, "线程工厂创建" );
    		return t;
    	}	
    }
    
    //线程类
    class FactoryRunnable implements Runnable{
    
    	@Override
    	public void run() {
    		while(true) {
    			System.out.println(Thread.currentThread().getName() + "执行");
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}
    		
    	}
    	
    }
    

    线程池的使用

    public class ThreadPoolRunnable implements Runnable{
    	private static int i= 20;
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		while(true) {
    			if(i > 0) {
    				System.out.println(Thread.currentThread().getName() + "------" +(i--));
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    
    }
    
    public class ThreadPoolExectorJava {
    	public static void main(String[] args) {
    		//SynchronousQueue
    		BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
    		ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 2, 60L, TimeUnit.SECONDS, workQueue);
    		tpe.execute(new ThreadPoolRunnable());
    		tpe.execute(new ThreadPoolRunnable());
    
    	}
    }
    

    自定义线程池

    package newJava.newRunnable;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolExectorJava {
    	public static void main(String[] args) {
    		// 注意:最大线程数 >= worjQueue + 核心线程数
    		BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(3);
    		ThreadPoolFactor tpf = new ThreadPoolFactor();
    		// ThreadPoolFactor tpf = Executors.defaultThreadFactory();
    		// 当线程装满等待列表,抛出异常
    		// ThreadPoolExecutor.AbortPolicy handler = new
    		// ThreadPoolExecutor.AbortPolicy();
    		// 不抛出异常
    		// RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
    		RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
    		ThreadPoolExecutor tpe = new ThreadPoolExecutor(3, 6, 1, TimeUnit.SECONDS, workQueue, tpf, handler);
    		ThreadPoolRunnable tpr = new ThreadPoolRunnable();
    		ThreadPoolRunnable2 tpr2 = new ThreadPoolRunnable2();
    		// 设置了等待列表装满就抛出异常,这里会抛出异常
    		// for(int i = 0; i < 10; i++) {
    		// tpe.execute(tpf.newThread(tpr));
    		// }
    		// 这里不会抛出异常
    		tpe.execute(tpf.newThread(tpr));
    		tpe.execute(tpf.newThread(tpr));
    		tpe.execute(tpf.newThread(tpr));
    		tpe.execute(tpf.newThread(tpr));
    		tpe.execute(tpf.newThread(tpr2));
    		tpe.execute(tpf.newThread(tpr2));
    		tpe.execute(tpf.newThread(tpr2));
    		tpe.execute(tpf.newThread(tpr2));
    		tpe.execute(tpf.newThread(tpr2));
    		// 关闭线程
    		tpe.shutdown();
    	}
    }
    
    class ThreadPoolRunnable2 implements Runnable {
    	private int i = 0;
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		b: while (true) {
    			if (i < 5) {
    				System.out.println("ThreadPoolRunnable2正在执行--------" + Thread.currentThread().getName());
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			} else {
    				break b;
    			}
    		}
    
    	}
    
    }
    
    
  • 相关阅读:
    AngularJS方法 —— angular.bind
    rails 部署 nginx + passenger
    查找并删除
    rails 网站跨域
    From Ruby array to JS array in Rails- 'quote'?
    rails 网站字体
    form_tag
    bundle install 老是中断,可以在gemfile里面把source换成taobao源,可以自动安装
    ruby 功力修炼
    bootstrap 移动自适应界面
  • 原文地址:https://www.cnblogs.com/changzuidaerguai/p/9315800.html
Copyright © 2011-2022 走看看