zoukankan      html  css  js  c++  java
  • Java并发编程核心方法与框架-CompletionService的使用

    接口CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离。使用submit()执行任务,使用take取得已完成的任务,并按照这些任务的时间顺序处理他们的结果。

    使用CompletionService解决Future的缺点
    public class MyCallable implements Callable<String> {
    	private String username;
    	private long sleepValue;
    	
    	public MyCallable(String username, long sleepValue) {
    		super();
    		this.username = username;
    		this.sleepValue = sleepValue;
    	}
    
    	@Override
    	public String call() throws Exception {
    		System.out.println(username + "  " + Thread.currentThread().getName() + System.currentTimeMillis());
    		Thread.sleep(sleepValue);
    		return "return " + username;
    	}
    
    	public static void main(String[] args) {
    		try {
    			MyCallable callable1 = new MyCallable("username1", 5000);
    			MyCallable callable2 = new MyCallable("username2", 4000);
    			MyCallable callable3 = new MyCallable("username3", 3000);
    			MyCallable callable4 = new MyCallable("username4", 2000);
    			MyCallable callable5 = new MyCallable("username5", 1000);
    			
    			List<Callable<String>> callables = new ArrayList<>();
    			callables.add(callable1);
    			callables.add(callable2);
    			callables.add(callable3);
    			callables.add(callable4);
    			callables.add(callable5);
    
    			int corePoolSize = 5;
    			int maximumPoolSize = 10;
    			int keepAliveTime = 5;
    			TimeUnit unit = TimeUnit.SECONDS;
    			LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>();
    			ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    			CompletionService completionService = new ExecutorCompletionService<>(executor);
    			for (int i = 0; i < 5; i++) {
    				completionService.submit(callables.get(i));
    			}
    			for (int i = 0; i < 5; i++) {
    			//take方法:获取并移除表示下一个已完成任务的Future,如果目前不存在这样的任务,则等待。
    				System.out.println(completionService.take().get() + " -- " + System.currentTimeMillis());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    控制台打印结果如下:

    username2  pool-1-thread-21470920687933
    username4  pool-1-thread-41470920687934
    username3  pool-1-thread-31470920687933
    username5  pool-1-thread-51470920687934
    username1  pool-1-thread-11470920687933
    return username5 -- 1470920688939
    return username4 -- 1470920689937
    return username3 -- 1470920690938
    return username2 -- 1470920691938
    return username1 -- 1470920692938
    

    从打印结果来看,CompletionService解决了Future阻塞的缺点,哪个任务先执行完,哪个任务就先返回值。在CompletionService接口中如果当前没有任务执行完,则completionService.take().get()方法还是呈阻塞特性。


    take()方法
    public class Run {
    	public static void main(String[] args) {
    		try {
    			ExecutorService executorService = Executors.newCachedThreadPool();
    			CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    			for (int i = 0; i < 10; i++) {
    				completionService.submit(new Callable<String>() {
    
    					@Override
    					public String call() throws Exception {
    						long sleepValue = (int)(Math.random() * 1000);
    						System.out.println("sleep=" + sleepValue + " " + Thread.currentThread().getName());
    						Thread.sleep(sleepValue);
    						return sleepValue + "-" + Thread.currentThread().getName();
    					}
    				});
    			}
    			for (int i = 0; i < 10; i++) {
    				//take()方法是按照任务执行的速度,从快到慢获得Future对象
    				System.out.println(completionService.take().get());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    打印结果如下:

    sleep=662 pool-1-thread-2
    sleep=476 pool-1-thread-6
    sleep=977 pool-1-thread-5
    sleep=175 pool-1-thread-7
    sleep=461 pool-1-thread-4
    sleep=836 pool-1-thread-3
    sleep=267 pool-1-thread-1
    sleep=82 pool-1-thread-8
    sleep=714 pool-1-thread-9
    sleep=946 pool-1-thread-10
    82-pool-1-thread-8
    175-pool-1-thread-7
    267-pool-1-thread-1
    461-pool-1-thread-4
    476-pool-1-thread-6
    662-pool-1-thread-2
    714-pool-1-thread-9
    836-pool-1-thread-3
    946-pool-1-thread-10
    977-pool-1-thread-5
    

    take()方法是按照任务执行的速度,从快到慢获得Future对象


    poll()方法
    public class Run1 {
    	public static void main(String[] args) {
    		ExecutorService executorService = Executors.newCachedThreadPool();
    		CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    		completionService.submit(new Callable<String>() {
    			
    			@Override
    			public String call() throws Exception {
    				TimeUnit.SECONDS.sleep(3);
    				System.out.println("3秒过了");
    				return "3S";
    			}
    		});
    		//poll()方法获取并移除表示下一个已完成任务的Future,
    		//如果不存在这样的任务则返回null,无阻塞效果
    		System.out.println(completionService.poll());
    	}
    }
    

    打印结果如下:

    null
    3秒过了
    

    方法poll()返回的Future为null,因为当前没有任何已完成任务的Future对象。poll()不像take()方法具有阻塞的效果。


    类CompletionService与异常
    public class MyCalableA implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		System.out.println("MyCalableA begin," + System.currentTimeMillis());
    		TimeUnit.SECONDS.sleep(1);
    		System.out.println("MyCalableA end," + System.currentTimeMillis());
    		return "MyCalableA";
    	}
    
    }
    
    public class MyCalableB implements Callable<String> {
    
    	@Override
    	public String call() throws Exception {
    		System.out.println("MyCalableB begin," + System.currentTimeMillis());
    		TimeUnit.SECONDS.sleep(2);
    		int i = 0;
    		if(i == 0){
    			throw new Exception("异常...");
    		}
    		System.out.println("MyCalableB end," + System.currentTimeMillis());
    		return "MyCalableB";
    	}
    }
    
    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableA);
    			completionService.submit(myCalableB);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.take());
    			}
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    控制台打印结果如下:

    MyCalableA begin,1471006680464
    MyCalableA end,1471006682467
    MyCalableB begin,1471006682467
    ----java.util.concurrent.FutureTask@3918d722
    ----java.util.concurrent.FutureTask@dd41677
    main end
    

    MyCalableB类中虽然跑出了异常,但是并没有调用GutureTask类的get()方法,所以不出现异常。

    对以上代码做如下修改:

    for (int i = 0; i < 2; i++) {
    	System.out.println("----" + completionService.take().get());
    }
    

    此时运行结果如下:

    MyCalableA begin,1471007865642
    MyCalableA end,1471007866647
    MyCalableB begin,1471007866647
    ----MyCalableA
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:19)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    MyCalableA先执行完,未抛出异常。MyCalableB线程抛出异常。

    对以上代码做如下修改:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableB);//先执行B任务
    			completionService.submit(myCalableA);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.take().get());
    			}
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    此时控制台输出结果如下:

    MyCalableB begin,1471008229367
    MyCalableA begin,1471008231370
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:19)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    MyCalableA end,1471008232373
    

    此时B任务抛出异常,A任务执行完但未返回值。

    对以上代码继续做如下修改:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableA);//先执行A任务
    			completionService.submit(myCalableB);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.poll());
    			}
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println("A处:" + completionService.poll());
    			System.out.println("B处:" + completionService.poll());
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    此时运行结果如下:

    ----null
    ----null
    MyCalableA begin,1471009161165
    MyCalableA end,1471009162166
    MyCalableB begin,1471009162166
    A处:java.util.concurrent.FutureTask@5f0ee5b8
    B处:java.util.concurrent.FutureTask@4b0bc3c9
    main end
    

    未调用get()方法,未抛出异常。

    继续修改以上代码:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableA);//先执行A任务
    			completionService.submit(myCalableB);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.poll());
    			}
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println("A处:" + completionService.poll().get());
    			System.out.println("B处:" + completionService.poll().get());
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    控制台打印结果如下:

    ----null
    ----null
    MyCalableA begin,1471009511872
    MyCalableA end,1471009512876
    MyCalableB begin,1471009512877
    A处:MyCalableA
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:24)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    此时A任务有返回值。B任务抛出异常,无返回值。

    继续修改以上代码:

    public class Main {
    
    	public static void main(String[] args) {
    		try {
    			MyCalableA myCalableA = new MyCalableA();
    			MyCalableB myCalableB = new MyCalableB();
    			Executor executor = Executors.newSingleThreadExecutor();//单线程
    			CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
    			completionService.submit(myCalableB);//先执行B任务
    			completionService.submit(myCalableA);
    			for (int i = 0; i < 2; i++) {
    				System.out.println("----" + completionService.poll());
    			}
    			TimeUnit.SECONDS.sleep(5);
    			System.out.println("A处:" + completionService.poll().get());
    			System.out.println("B处:" + completionService.poll().get());
    			System.out.println("main end");
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    运行结果如下:

    ----null
    ----null
    MyCalableB begin,1471009732036
    MyCalableA begin,1471009734037
    MyCalableA end,1471009735037
    java.util.concurrent.ExecutionException: java.lang.Exception: 异常...
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    	at com.concurrent.chapter6.concurrent02.Main.main(Main.java:23)
    Caused by: java.lang.Exception: 异常...
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14)
    	at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    此时任务B抛出异常,任务A未打印。

  • 相关阅读:
    字符编码相关
    函数之形参与实参
    文件操作模式
    函数对象,名称空间,作用域,和闭包
    吴裕雄天生自然SPRINGBOOT开发实战处理'spring.datasource.url' is not specified and no embedded datasource could be autoconfigured
    吴裕雄天生自然SPRINGBOOT开发实战处理XXXX that could not be found.
    吴裕雄天生自然SPRINGBOOT开发实战SpringBoot HTML表单登录
    吴裕雄天生自然SPRINGBOOT开发实战SpringBoot REST示例
    吴裕雄天生自然SpringBoot开发实战学习笔记处理 Could not write metadata for '/Servers'.metadata\.plugins\org.eclipse.core.resources\.projects\Servers\.markers.snap (系统找不到指定的路径。)
    吴裕雄天生自然SPRINGBOOT开发实战SpringBoot Tomcat部署
  • 原文地址:https://www.cnblogs.com/umgsai/p/5671662.html
Copyright © 2011-2022 走看看