zoukankan      html  css  js  c++  java
  • guava中线程同步的代码和一个学习网站

      1 /**
      2  * 
      3  */
      4 package com.qunar.qfc.guanyingpiao.basic.concurrent;
      5 
      6 import com.google.common.base.Function;
      7 import com.google.common.util.concurrent.AsyncFunction;
      8 import com.google.common.util.concurrent.FutureCallback;
      9 import com.google.common.util.concurrent.Futures;
     10 import com.google.common.util.concurrent.ListenableFuture;
     11 import com.google.common.util.concurrent.ListeningExecutorService;
     12 import com.google.common.util.concurrent.MoreExecutors;
     13 import org.slf4j.Logger;
     14 import org.slf4j.LoggerFactory;
     15 
     16 import java.util.ArrayList;
     17 import java.util.List;
     18 import java.util.concurrent.Callable;
     19 import java.util.concurrent.ExecutionException;
     20 import java.util.concurrent.ExecutorService;
     21 import java.util.concurrent.Executors;
     22 import java.util.concurrent.ThreadLocalRandom;
     23 import java.util.concurrent.atomic.AtomicInteger;
     24 
     25 /**
     26  * 
     27  *
     28  * @author Guanying Piao
     29  *
     30  * 2018-06-30
     31  */
     32 public class FuturesDemo {
     33 
     34     private static Logger logger = LoggerFactory.getLogger(FuturesDemo.class);
     35     
     36     public static void main(String[] args) {
     37         AtomicInteger threadIndex = new AtomicInteger(0);//设置一个原子的自增变量
     38         ExecutorService executorService = Executors.newFixedThreadPool(5, r -> new Thread(r, "thread-" + threadIndex.getAndIncrement()));//创建线程池
     39         ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);//管理着线程池
     40         ExecutorService finishExecutorService = Executors.newSingleThreadExecutor(r -> new Thread(r, "finisher"));//这是一个单独的线程
     41         int jobCount = 5;
     42         List<ListenableFuture<String>> listenableFutures = new ArrayList<>(jobCount); 
     43         for (int i = 0; i < jobCount; i++) {
     44             //这里往线程池中循环的投递任务
     45             ListenableFuture<Long> firstFuture = listeningExecutorService.submit(() -> {
     46                 try {
     47                     Thread.sleep(ThreadLocalRandom.current().nextLong(1000L));//并发情况下这个随机数生成器效率更好
     48                 } catch (InterruptedException e) {
     49                     logger.warn("Interrupted while waiting for permit");
     50                 }
     51                 return 123L;
     52             });
     53 
     54            /* ListenableFuture<Long> firstFuture = listeningExecutorService.submit(new Callable<Long>() {
     55 
     56                 public Long call() throws Exception {
     57                     try {
     58                         Thread.sleep(ThreadLocalRandom.current().nextLong(1000L));//并发情况下这个随机数生成器效率更好
     59                     } catch (InterruptedException e) {
     60                         logger.warn("Interrupted while waiting for permit");
     61                     }
     62                     return System.currentTimeMillis();
     63                 }
     64             });*/
     65             try {
     66                 Thread.sleep(1000);
     67             }catch (InterruptedException e){
     68 
     69             }
     70             //对前面的firstFuture任务进行监听,如果firstFutrue任务结束了,在listeningExecutorService线程池中,调用中间的这个回调函数
     71             //由于是transformAsync,所以后面new的是AsyncFunction,同时返回值不能是Long,只能是ListenableFuture<Long>,所以我们下面要return listeningExecutorService.submit
     72             //相当于继续往线程池中投递任务
     73             ListenableFuture<Long> secondFuture = Futures.transformAsync(firstFuture, new AsyncFunction<Long, Long>() {
     74                 @Override
     75                 public ListenableFuture<Long> apply(Long input) throws Exception {
     76                     return listeningExecutorService.submit(() -> {
     77                         try {
     78                             Thread.sleep(ThreadLocalRandom.current().nextLong(1000L));
     79                         } catch (InterruptedException e) {
     80                             logger.warn("Interrupted while waiting for permit");
     81                         }
     82                         return input;
     83                     });
     84                 }                
     85             }, listeningExecutorService);
     86             //下面是对secondFuture进行监听,如果结束了,在finishExecutorService这个线程中执行回调函数
     87             //由于是同步的,所以返回值就可以是正常的值了
     88             ListenableFuture<String> thirdFuture = Futures.transform(secondFuture, new Function<Long, String>() {
     89                 @Override
     90                 public String apply(Long input) {
     91                     return String.valueOf(input);
     92                 }
     93             }, finishExecutorService);
     94             listenableFutures.add(thirdFuture);//将返回的ListenableFutrue装在一个链表中
     95         }
     96         //将这个链表中的每一个ListenableFuture判断,用null代替失败,然后将其装入ListenableFuture<List<String>>中
     97         //里层的链表应该就是代表之前成功的返回值
     98         ListenableFuture<List<String>> allFuture = Futures.successfulAsList(listenableFutures);
     99         //如果allFuture这里面里层的链表都执行完了,在finishExecutorService线程调用回调函数
    100         Futures.addCallback(allFuture, new FutureCallback<List<String>>() {
    101             @Override
    102             public void onSuccess(List<String> result) {
    103                 logger.info("job done times:{}", result);
    104             }
    105             @Override
    106             public void onFailure(Throwable t) {
    107                 logger.warn("Got exception:{}", t.getMessage());
    108             }            
    109         }, finishExecutorService);
    110         
    111         try {
    112             allFuture.get();
    113         } catch (InterruptedException | ExecutionException e) {
    114             logger.warn("Failed to wait for jobs to end");
    115         }
    116         executorService.shutdown();
    117         finishExecutorService.shutdown();
    118     }
    119 
    120 }

    CompletableFuture:https://www.jb51.net/article/51163.htm

  • 相关阅读:
    我的软件工程课目标
    软件工程课程的建议
    结对编程学习fault、error、failure三种状态
    结对编程项目的过程记录与收获
    “结对编程”之我见
    关于问卷调查
    我的软件工程课目标
    软件工程课程建议
    结对编程2
    结对编程之四则运算
  • 原文地址:https://www.cnblogs.com/TheQi/p/10636471.html
Copyright © 2011-2022 走看看