zoukankan      html  css  js  c++  java
  • hadoop09----线程池

    java并发包
    1.java并发包介绍
    线程不能无限制的new下去,否则系统处理不了的。
    使用线程池。任务来了就开一runable对象。
    concurrent 包开始不是jdk里面的,后来加入到jdk里面去了。
    
    当很多人来访问网站的时候,就要开启一个队列,排队处理请求,这个队列用JMS-ActiveMQ实现。有抢手机的队列,有抢电脑的队列,不同的队列。
    为不同的“主题”建立消息队列。
    
    主要包含原子量、并发集合、同步器、可重入锁。
    线程池的5中创建方式:
    
    1、Single Thread Executor : 只有一个线程的线程池,提交很多任务给线程池的时候,只有一个线程,就是单线程,因此所有提交的任务是顺序执行,
    代码: Executors.newSingleThreadExecutor()
    
    2、Cached Thread Pool : 线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行,如果线程超过60秒内没执行,那么将被终止并从池中删除,
    代码:Executors.newCachedThreadPool()
    
    3、Fixed Thread Pool : 拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待,
    代码: Executors.newFixedThreadPool(4)
    在构造函数中的参数4是线程池的大小,你可以随意设置,也可以和cpu的核数量保持一致,获取cpu的核数量int cpuNums = Runtime.getRuntime().availableProcessors(); getRuntime是获取运行环境。
    线程的数量与cpu、和任务是否IO密集型还是运算密集型有关。
    
    4、Scheduled Thread Pool : 用来调度即将执行的任务的线程池,可能不是直接执行, 每隔5分钟执行一次。策略型的
    代码:Executors.newScheduledThreadPool()
    
    5、Single Thread Scheduled Pool : 只有一个线程,用来调度任务在指定时间执行,代码:Executors.newSingleThreadScheduledExecutor()
    public class ExecutorDemo {
        
        public static void main(String[] args) {
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            
            int cpuNums = Runtime.getRuntime().availableProcessors();
            System.out.println(cpuNums);//核的数量
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(cpuNums);
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(8);
            
            
            ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        }
    }
    线程池的使用
    
    提交 Runnable ,任务完成后 Future 对象返回 null
    调用excute,提交任务, 匿名Runable重写run方法, run方法里是业务逻辑
    见代码:ThreadPoolWithRunable
    
    提交 Callable,该方法返回一个 Future 实例表示任务的状态
    调用submit提交任务, 匿名Callable,重写call方法, 有返回值, 获取返回值会阻塞,一直要等到线程任务返回结果
    见代码:ThreadPoolWithcallable
    package cn.itcast_01_mythread.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPoolWithRunable {
    
        
        /**
         * 通过线程池执行线程
         * @param args
         */
        public static void main(String[] args) {
            //创建一个线程池,可回收的,没任务就回收了。newCachedThreadPool可以很大。60秒没任务就回收。
            ExecutorService pool = Executors.newCachedThreadPool();//线程池
            for(int i = 1; i < 5; i++){//4个任务,一个任务就是一个Runnable
                pool.execute(new Runnable() {//没有返回值
                    @Override
                    public void run() {
                        System.out.println("thread name: " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            pool.shutdown();//任务执行完就关了。
        }
    
    }
    
    /*thread name: pool-1-thread-3
    thread name: pool-1-thread-1
    thread name: pool-1-thread-4
    thread name: pool-1-thread-2
    */
    package cn.itcast_01_mythread.pool;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    /**
     * callable 跟runnable的区别:
     * runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值,
     * 如果要用到结果,那么就用共享变量并且加锁。
     * 
     * callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果
     * @author
     *
     */
    public class ThreadPoolWithcallable {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService pool = Executors.newFixedThreadPool(4); //线程池
            
            //提交3个任务,Callable有返回值,从里面的任务返回回来的。
            for(int i = 0; i < 3; i++){
                
                //子线程什么时候执行完,主线程是不知道的,现在要从子线程拿到返回值,这里用的是回调。
                //所以submit只是一个句柄(一个引用,开始没值后来有值)
                Future<String> submit = pool.submit(new Callable<String>(){//子线程开始执行
                    @Override
                        public String call() throws Exception {//run方法。返回值不一定是string。
                        Thread.sleep(3000);
                        return "b--"+Thread.currentThread().getName();
                    }               
                });
                
                //submit.get()是在主线程执行的。主线程从Future中get结果,这个方法是会被阻塞的,
                //因为一直要等到子线程任务执行完才可以拿到结果,否则拿不到结果。
                //如果这个子线程卡死了,那么主线程就卡死了。少用返回结果。
                System.out.println(submit.get());
            }
            
            pool.shutdown();
    }}
    
    
    /*
    b--pool-1-thread-1 :等待3秒钟阻塞
    b--pool-1-thread-2 :等待3秒钟阻塞
    b--pool-1-thread-3 :等待3秒钟阻塞
    */
    package cn.itcast_01_mythread.pool;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry;
    
    public class TestPool {
    
        public static void main(String[] args) throws Exception {
            Future<?> submit = null;
            Random random = new Random();
            
            //创建固定数量线程池
    //        ExecutorService exec = Executors.newFixedThreadPool(4);
            
            //创建调度线程池
            ScheduledExecutorService exec = Executors.newScheduledThreadPool(4);
            
            //用来记录各线程的返回结果
            ArrayList<Future<?>> results = new ArrayList<Future<?>>();
            
            for (int i = 0; i < 5; i++) {
                //fixedPool提交线程,runnable无返回值,callable有返回值
                /*submit = exec.submit(new TaskRunnable(i));*/
                /*submit = exec.submit(new TaskCallable(i));*/
                
    
                //schedulerPool可以用submit和schedule提交
                //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致,没有时间间隔的调度执行。
                /*submit = exec.submit(new TaskCallable(i));*/
                //对于schedulerPool来说,调用schedule提交任务时,则可按延迟多少秒之后再启动,按间隔时长来调度线程的运行
                submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
                //存储线程执行结果,
                //先把所有句柄保存起来,submit是一个句柄一个引用,此时不会立即有值,只有子线程执行完之后句柄才有值,才可以get出来值。
                results.add(submit);
                System.out.println("完毕");
            }
            
            
            //打印结果
            for(Future f: results){
                boolean done = f.isDone();
                System.out.println(done?"已完成":"未完成");  //从结果的打印顺序可以看到,即使未完成,主线程也会阻塞等待
                System.out.println("" + f.get());
            }
            
            exec.shutdown();
        
    }}
    
     class TaskCallable implements Callable<String>{
        private int s;
        Random r = new Random();
        public TaskCallable(int s){
            this.s = s;
        }
        @Override
        public String call() throws Exception {
            String name = Thread.currentThread().getName();
            long currentTimeMillis = System.currentTimeMillis();
            System.out.println(name+" 开始启动-s-" + s);
            int rint = r.nextInt(3);
            try {
                Thread.sleep(rint*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    //        System.out.println(name + " 结束启动-s-"+s);
            return name + " 结束启动-s-"+s+ "-返回结果-" + s+"";
        }}
    
    
       class TaskRunnable implements Runnable{
            private int s;
            public TaskRunnable(int s){
                this.s = s;
            }
            Random r = new Random();
            @Override
            public void run() {
                String name = Thread.currentThread().getName();
                long currentTimeMillis = System.currentTimeMillis();
                System.out.println(name+" 启动时间:" + currentTimeMillis/1000);
                int rint = r.nextInt(3);
                try {
                    Thread.sleep(rint*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(name + " is working..."+s);
            }}
       
    /*完毕
    完毕
    完毕
    完毕
    完毕
    未完成
    pool-1-thread-1 开始启动-s-1
    pool-1-thread-2 开始启动-s-0
    pool-1-thread-2 结束启动-s-0-返回结果-0
    已完成
    pool-1-thread-1 结束启动-s-1-返回结果-1
    未完成
    pool-1-thread-3 开始启动-s-2
    pool-1-thread-4 开始启动-s-3
    pool-1-thread-1 开始启动-s-4 : 线程1结束后又开始执行新的任务
    pool-1-thread-3 结束启动-s-2-返回结果-2
    已完成
    pool-1-thread-4 结束启动-s-3-返回结果-3
    未完成
    pool-1-thread-1 结束启动-s-4-返回结果-4*/
  • 相关阅读:
    Spring RedisTemplate操作-注解缓存操作(11)
    Spring RedisTemplate操作-通道操作(10)
    Spring RedisTemplate操作-事务操作(9)
    Spring RedisTemplate操作-发布订阅操作(8)
    Spring RedisTemplate操作-HyperLogLog操作(7)
    Spring RedisTemplate操作-Set操作(5)
    Spring RedisTemplate操作-ZSet操作(6)
    Spring RedisTemplate操作-List操作(4)
    Spring RedisTemplate操作-哈希操作(3)
    Spring RedisTemplate操作-String操作(2)
  • 原文地址:https://www.cnblogs.com/yaowen/p/9017330.html
Copyright © 2011-2022 走看看