zoukankan      html  css  js  c++  java
  • Java并发编程核心方法与框架-CompletionService的使用

    接口CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离。使用submit()执行任务,使用take取得已完成的任务,并按照这些任务的时间顺序处理他们的结果。

    使用CompletionService解决Future的缺点
    public class MyCallable implements Callable<String> {
    	private String username;
    	private long sleepValue;
    	
    	public MyCallable(String username, long sleepValue) {
    		super();
    		this.username = username;
    		this.sleepValue = sleepValue;
    	}
    
    	@Override
    	public String call() throws Exception {
    		System.out.println(username + "  " + Thread.currentThread().getName() + System.currentTimeMillis());
    		Thread.sleep(sleepValue);
    		return "return " + username;
    	}
    
    	public static void main(String[] args) {
    		try {
    			MyCallable callable1 = new MyCallable("username1", 5000);
    			MyCallable callable2 = new MyCallable("username2", 4000);
    			MyCallable callable3 = new MyCallable("username3", 3000);
    			MyCallable callable4 = new MyCallable("username4", 2000);
    			MyCallable callable5 = new MyCallable("username5", 1000);
    			
    			List<Callable<String>> callables = new ArrayList<>();
    			callables.add(callable1);
    			callables.add(callable2);
    			callables.add(callable3);
    			callables.add(callable4);
    			callables.add(callable5);
    
    			int corePoolSize = 5;
    			int maximumPoolSize = 10;
    			int keepAliveTime = 5;
    			TimeUnit unit = TimeUnit.SECONDS;
    			LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>();
    			ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    			CompletionService completionService = new ExecutorCompletionService<>(executor);
    			for (int i = 0; i < 5; i++) {
    				completionService.submit(callables.get(i));
    			}
    			for (int i = 0; i < 5; i++) {
    			//take方法:获取并移除表示下一个已完成任务的Future,如果目前不存在这样的任务,则等待。
    				System.out.println(completionService.take().get() + " -- " + System.currentTimeMillis());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    控制台打印结果如下:

    username2  pool-1-thread-21470920687933
    username4  pool-1-thread-41470920687934
    username3  pool-1-thread-31470920687933
    username5  pool-1-thread-51470920687934
    username1  pool-1-thread-11470920687933
    return username5 -- 1470920688939
    return username4 -- 1470920689937
    return username3 -- 1470920690938
    return username2 -- 1470920691938
    return username1 -- 1470920692938
    

    从打印结果来看,CompletionService解决了Future阻塞的缺点,哪个任务先执行完,哪个任务就先返回值。在CompletionService接口中如果当前没有任务执行完,则completionService.take().get()方法还是呈阻塞特性。


    take()方法
    public class Run {
    	public static void main(String[] args) {
    		try {
    			ExecutorService executorService = Executors.newCachedThreadPool();
    			CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    			for (int i = 0; i < 10; i++) {
    				completionService.submit(new Callable<String>() {
    
    					@Override
    					public String call() throws Exception {
    						long sleepValue = (int)(Math.random() * 1000);
    						System.out.println("sleep=" + sleepValue + " " + Thread.currentThread().getName());
    						Thread.sleep(sleepValue);
    						return sleepValue + "-" + Thread.currentThread().getName();
    					}
    				});
    			}
    			for (int i = 0; i < 10; i++) {
    				//take()方法是按照任务执行的速度,从快到慢获得Future对象
    				System.out.println(completionService.take().get());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    打印结果如下:

    sleep=662 pool-1-thread-2
    sleep=476 pool-1-thread-6
    sleep=977 pool-1-thread-5
    sleep=175 pool-1-thread-7
    sleep=461 pool-1-thread-4
    sleep=836 pool-1-thread-3
    sleep=267 pool-1-thread-1
    sleep=82 pool-1-thread-8
    sleep=714 pool-1-thread-9
    sleep=946 pool-1-thread-10
    82-pool-1-thread-8
    175-pool-1-thread-7
    267-pool-1-thread-1
    461-pool-1-thread-4
    476-pool-1-thread-6
    662-pool-1-thread-2
    714-pool-1-thread-9
    836-pool-1-thread-3
    946-pool-1-thread-10
    977-pool-1-thread-5
    

    take()方法是按照任务执行的速度,从快到慢获得Future对象


    poll()方法
    public class Run1 {
    	public static void main(String[] args) {
    		ExecutorService executorService = Executors.newCachedThreadPool();
    		CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    		completionService.submit(new Callable<String>() {
    			
    			@Override
    			public String call() throws Exception {
    				TimeUnit.SECONDS.sleep(3);
    				System.out.println("3秒过了");
    				return "3S";
    			}
    		});
    		//poll()方法获取并移除表示下一个已完成任务的Future,
    		//如果不存在这样的任务则返回null,无阻塞效果
    		System.out.println(completionService.poll());
    	}
    }
    

    打印结果如下:

    null
    3秒过了
    

    方法poll()返回的Future为null,因为当前没有任何已完成任务的Future对象。poll()不像take()方法具有阻塞的效果。


    类CompletionService与异常
    public class MyCalableA implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		System.out.println("MyCalableA begin," + System.currentTimeMillis());
    		TimeUnit.SECONDS.sleep(1);
    		System.out.println("MyCalableA end," + System.currentTimeMillis());
    		return "MyCalableA";
    	}
    
    }
    
    public class MyCalableB implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		System.out.println("MyCalableB begin," + System.currentTimeMillis());
    		TimeUnit.SECONDS.sleep(2);
    		int i = 0;
    		if(i == 0){
    			throw new Exception("异常...");
    		}
    		System.out.println("MyCalableB end," + System.currentTimeMillis());
    		return "MyCalableB";
    	}
    }
    
    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableA);
    			completionService.submit(myCalableB);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.take());
    			}
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    控制台打印结果如下:

    MyCalableA begin,1471006680464
    MyCalableA end,1471006682467
    MyCalableB begin,1471006682467
    ----java.util.concurrent.FutureTask@3918d722
    ----java.util.concurrent.FutureTask@dd41677
    main end
    

    MyCalableB类中虽然跑出了异常,但是并没有调用GutureTask类的get()方法,所以不出现异常。

    对以上代码做如下修改:

    for (int i = 0; i < 2; i++) {
    	System.out.println("----" + completionService.take().get());
    }
    

    此时运行结果如下:

    MyCalableA begin,1471007865642
    MyCalableA end,1471007866647
    MyCalableB begin,1471007866647
    ----MyCalableA
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:19)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    MyCalableA先执行完,未抛出异常。MyCalableB线程抛出异常。

    对以上代码做如下修改:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableB);//先执行B任务
    			completionService.submit(myCalableA);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.take().get());
    			}
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    此时控制台输出结果如下:

    MyCalableB begin,1471008229367
    MyCalableA begin,1471008231370
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:19)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    MyCalableA end,1471008232373
    

    此时B任务抛出异常,A任务执行完但未返回值。

    对以上代码继续做如下修改:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableA);//先执行A任务
    			completionService.submit(myCalableB);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.poll());
    			}
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println("A处:" + completionService.poll());
    			System.out.println("B处:" + completionService.poll());
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    此时运行结果如下:

    ----null
    ----null
    MyCalableA begin,1471009161165
    MyCalableA end,1471009162166
    MyCalableB begin,1471009162166
    A处:java.util.concurrent.FutureTask@5f0ee5b8
    B处:java.util.concurrent.FutureTask@4b0bc3c9
    main end
    

    未调用get()方法,未抛出异常。

    继续修改以上代码:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableA);//先执行A任务
    			completionService.submit(myCalableB);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.poll());
    			}
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println("A处:" + completionService.poll().get());
    			System.out.println("B处:" + completionService.poll().get());
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    控制台打印结果如下:

    ----null
    ----null
    MyCalableA begin,1471009511872
    MyCalableA end,1471009512876
    MyCalableB begin,1471009512877
    A处:MyCalableA
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:24)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    此时A任务有返回值。B任务抛出异常,无返回值。

    继续修改以上代码:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableB);//先执行B任务
    			completionService.submit(myCalableA);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.poll());
    			}
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println("A处:" + completionService.poll().get());
    			System.out.println("B处:" + completionService.poll().get());
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    运行结果如下:

    ----null
    ----null
    MyCalableB begin,1471009732036
    MyCalableA begin,1471009734037
    MyCalableA end,1471009735037
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:23)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    此时任务B抛出异常,任务A未打印。

  • 相关阅读:
    082、Java数组之数组传递之简化理解
    081、Java数组之数组传递
    080、Java数组之二维数组的定义及使用
    079、Java数组之数组的静态初始化
    078、Java数组之数组的引用传递
    077、Java数组之分步实现数组操作
    076、Java数组之定义数组
    075、Java面向对象之定义匿名对象
    074、Java面向对象之构造方法重载
    073、Java面向对象之利用构造方法为属性赋值
  • 原文地址:https://www.cnblogs.com/umgsai/p/5671662.html
Copyright © 2011-2022 走看看