zoukankan      html  css  js  c++  java
  • Guava并发:ListenableFuture与RateLimiter示例

    ListenableFuture顾名思义就是可以监听的Future,它是对java原生Future的扩展增强 RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数,本文介绍RateLimiter使用

    Guava并发 ListenableFuture RateLimiter

    目录[-]

     源码:http://www.jinhusns.com/Products/Download/?type=xcj

    概念

            ListenableFuture 顾名思义就是可以监听的Future,它是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算 结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用 ListenableFuture Guava帮我们检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。      

            推荐使用第二种方法,因为第二种方法可以直接得到Future的返回值,或者处理错误情况。本质上第二种方法是通过调动第一种方法实现的,做了进一步的封装。

    另外ListenableFuture还有其他几种内置实现:

    1. SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息

    2. CheckedFuture: 这是一个继承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future执行发生异常时,可以抛出指定类型的异常。

       

        RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数,本文介绍RateLimiter使用

    代码示例

    ?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
     
    import com.google.common.util.concurrent.FutureCallback;
    import com.google.common.util.concurrent.Futures;
    import com.google.common.util.concurrent.ListenableFuture;
    import com.google.common.util.concurrent.ListeningExecutorService;
    import com.google.common.util.concurrent.MoreExecutors;
    import com.google.common.util.concurrent.RateLimiter;
     
    public class ListenableFutureDemo {
        public static void main(String[] args) {
            testRateLimiter();
            testListenableFuture();
        }
     
        /**
         * RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
         */
        public static void testRateLimiter() {
            ListeningExecutorService executorService = MoreExecutors
                    .listeningDecorator(Executors.newCachedThreadPool());
     
            RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
     
            for (int i = 0; i < 10; i++) {
                limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
     
                final ListenableFuture<Integer> listenableFuture = executorService
                        .submit(new Task("is "+ i));
            }
        }
     
        public static void testListenableFuture() {
            ListeningExecutorService executorService = MoreExecutors
                    .listeningDecorator(Executors.newCachedThreadPool());
     
            final ListenableFuture<Integer> listenableFuture = executorService
                    .submit(new Task("testListenableFuture"));
     
             
            //同步获取调用结果
            try {
                System.out.println(listenableFuture.get());
            catch (InterruptedException e1) {
                e1.printStackTrace();
            catch (ExecutionException e1) {
                e1.printStackTrace();
            }
             
            //第一种方式
            listenableFuture.addListener(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("get listenable future's result "
                                + listenableFuture.get());
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }, executorService);
     
            //第二种方式
            Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer result) {
                    System.out
                            .println("get listenable future's result with callback "
                                    + result);
                }
     
                @Override
                public void onFailure(Throwable t) {
                    t.printStackTrace();
                }
            });
        }
    }
     
    class Task implements Callable<Integer> {
        String str;
        public Task(String str){
            this.str = str;
        }
        @Override
        public Integer call() throws Exception {
            System.out.println("call execute.." + str);
            TimeUnit.SECONDS.sleep(1);
            return 7;
        }
    }

     

    Guava版本

    ?

    1
    2
    3
    4
    5
    <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>14.0.1</version>
            </dependency>
  • 相关阅读:
    EOJ 2743 Stock Exchange
    POJ-3468 A Simple Problem with Integers
    EOJ-1104 bitmap
    【转】旋转卡壳——凸多边形间对踵点对(定义)
    Ring 3层枚举进程的四种方法
    XX-Net项目,免费浏览谷歌的伟大项目
    浅析Java中的内存机制
    Ubuntu下eclipse中安装Scala插件
    注入(5)---导入表注入(HookINT)
    Linux下MySQL导入文件出错ERROR 1290 (HY000)
  • 原文地址:https://www.cnblogs.com/xiaoxiaojia/p/5457348.html
Copyright © 2011-2022 走看看