zoukankan      html  css  js  c++  java
  • 固定频率调用接口方案

    本文来源于一次工作中,对某第三方平台进行二次开发。这个第三方平台提供的接口有一个很扯淡的限制:限制每秒调用频率为最多2次。解决这个问题,有下面几个问题需要解决:

    • 第三方调用频率策略修改,能快速修改,适应其策略
    • 在具体的业务逻辑不需要关注调用频率的变化

    由此,想到使用自定义线程池来解决,调用时候,只需要忘线程池提交任务。其他接口调用频率交给线程池来处理。我使用单线程池,同时修改等待队列为DelayQueue,DelayQueue支持延迟固定时间take一个元素。

     1     public E take() throws InterruptedException {
     2         final ReentrantLock lock = this.lock;
     3         lock.lockInterruptibly();
     4         try {
     5             for (;;) {
     6                 E first = q.peek();
     7                 if (first == null)
     8                     available.await();
     9                 else {
    10                     long delay = first.getDelay(NANOSECONDS);
    11                     if (delay <= 0)
    12                         return q.poll();
    13                     first = null; // don't retain ref while waiting
    14                     if (leader != null)
    15                         available.await();
    16                     else {
    17                         Thread thisThread = Thread.currentThread();
    18                         leader = thisThread;
    19                         try {
    20                             available.awaitNanos(delay);
    21                         } finally {
    22                             if (leader == thisThread)
    23                                 leader = null;
    24                         }
    25                     }
    26                 }
    27             }
    28         } finally {
    29             if (leader == null && q.peek() != null)
    30                 available.signal();
    31             lock.unlock();
    32         }
    33     }
    DelayQueue的take方法

    其中有最关键的是:

    1  long delay = first.getDelay(NANOSECONDS);
    2                     if (delay <= 0)
    3                         return q.poll();
    View Code

    所以最为关键的试试实现自定义的getDelay方法。由于该对列只支持Delay类型,而普通的线程提交只支持Runnable和Callble,需要做一个适配。

     1 public class DelayedThreadPoolFactoryBean extends ThreadPoolExecutorFactoryBean {
     2     Logger logger = LoggerFactory.getLogger(this.getClass());
     3     private TimeUnit timeUnit;
     4     private long delayed;
     5 
     6     public void setTimeUnit(TimeUnit timeUnit) {
     7         this.timeUnit = timeUnit;
     8     }
     9 
    10     public void setDelayed(long delayed) {
    11         this.delayed = delayed;
    12     }
    13     @Override
    14     protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
    15         return new DelegateDelayedQueue();
    16     }
    17 
    18     @Override
    19     protected ThreadPoolExecutor createExecutor(int corePoolSize,int maxPoolSize,int keepAliveSeconds,final BlockingQueue<Runnable> queue,ThreadFactory threadFactory,RejectedExecutionHandler rejectedExecutionHandler) {
    20 
    21         return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
    22                 keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler){
    23             @Override
    24             public void execute(Runnable command) {
    25                 final DelegateDelayedQueue queue1 = (DelegateDelayedQueue)queue;
    26                 DelegateDelaye delegateDelaye = new DelegateDelaye(command,queue1,timeUnit,delayed);
    27                 super.execute(delegateDelaye);
    28             }
    29         };
    30     }
    31     private class DelegateDelaye implements Delayed,Runnable{
    32         private Runnable runnable;
    33         private DelegateDelayedQueue delegateDelayedQueue;
    34         private TimeUnit timeUnit;
    35         private long delayed;
    36         public DelegateDelaye(Runnable runnable,DelegateDelayedQueue delegateDelayedQueue,TimeUnit unit,long delayed){
    37             this.runnable = runnable;
    38             this.delegateDelayedQueue = delegateDelayedQueue;
    39             this.timeUnit = timeUnit;
    40             this.delayed = delayed;
    41         }
    42 
    43         @Override
    44         public long getDelay(TimeUnit unit) {
    45             if(delegateDelayedQueue.getLast() == 0){
    46                 return -1;
    47             }
    48             return unit.convert(delegateDelayedQueue.getLast() +
    49                     TimeUnit.MILLISECONDS.convert(5,TimeUnit.SECONDS) -
    50                     System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    51         }
    52 
    53         @Override
    54         public int compareTo(Delayed o) {
    55             return 0;
    56         }
    57 
    58         @Override
    59         public void run() {
    60             runnable.run();
    61         }
    62     }
    63     private class DelegateDelayedQueue extends DelayQueue{
    64         private long last = 0;//默认为0
    65 
    66         @Override
    67         public Delayed poll() {
    68             Delayed delayed =  super.poll();
    69             if(delayed != null){
    70                 this.last = System.currentTimeMillis();
    71             }
    72             return delayed;
    73         }
    74 
    75         @Override
    76         public Delayed take() throws InterruptedException {
    77             Delayed delayed =  super.take();
    78             if(delayed != null){
    79                 this.last = System.currentTimeMillis();
    80             }
    81             return delayed;
    82         }
    83 
    84         @Override
    85         public Delayed poll(long timeout,TimeUnit unit) throws InterruptedException {
    86             Delayed delayed = super.poll(timeout,unit);
    87             if(delayed != null){
    88                 this.last = System.currentTimeMillis();
    89             }
    90             return delayed;
    91         }
    92 
    93         public long getLast() {
    94             return last;
    95         }
    96     }
    97 }
    完整实现的线程池

    简单测试:

     1 @Test
     2     public void testSinglePool(){
     3         ExecutorService executorService = executoryServiceFactory.executorService(ExecutoryServiceFactory.Pool.DELAYED);
     4 
     5         executorService.submit(new Callable<String>() {
     6            @Override
     7            public String call() throws Exception {
     8                System.out.println(TimeUnit.SECONDS.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS));
     9                return null;
    10            }
    11        });
    12         executorService.execute(new Runnable() {
    13             @Override
    14             public void run() {
    15                 System.out.println(TimeUnit.SECONDS.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS));
    16             }
    17         });
    18         executorService.execute(new Runnable() {
    19             @Override
    20             public void run() {
    21                 System.out.println(TimeUnit.SECONDS.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS));
    22             }
    23         });
    24         executorService.execute(new Runnable() {
    25             @Override
    26             public void run() {
    27                 System.out.println(TimeUnit.SECONDS.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS));
    28             }
    29         });
    30         executorService.execute(new Runnable() {
    31             @Override
    32             public void run() {
    33                 System.out.println(TimeUnit.SECONDS.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS));
    34             }
    35         });
    36         executorService.shutdown();
    37         try {
    38             executorService.awaitTermination(5,TimeUnit.MINUTES);
    39         } catch (Exception e){}
    40     }
    View Code

    结果符合预期的效果,固定频率执行任务。

  • 相关阅读:
    LeetCode 172:阶乘后的零
    Ubuntu12.04更新出现 The system is running in low-graphics mode解决方法
    不加参数的存储过程
    PCC-S-02201, Encountered the symbol "DB_USER_OPER_COUNT"
    该思考
    关于export环境变量生存期
    会话临时表 ORA-14452
    如何创建守护进程--及相关概念
    2014年10月末
    6个月
  • 原文地址:https://www.cnblogs.com/dragonfei/p/5908900.html
Copyright © 2011-2022 走看看