zoukankan      html  css  js  c++  java
  • hystrix源码小贴士之中断

      execution.isolation.thread.interruptOnCancel可以设置当cancellation发生时是否需要中断。通过Future的cancel方法和线程的中断方法来实现是否需要中断。

    public Future<R> queue() {
            /*
             * The Future returned by Observable.toBlocking().toFuture() does not implement the
             * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
             * thus, to comply with the contract of Future, we must wrap around it.
             */
            final Future<R> delegate = toObservable().toBlocking().toFuture();
            
            final Future<R> f = new Future<R>() {
    
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    if (delegate.isCancelled()) {
                        return false;
                    }
    
                    if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                        /*
                         * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                         * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                         * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                         * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                         * than that interruption request cannot be taken back.
                         */
                        interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                    }
    
                    final boolean res = delegate.cancel(interruptOnFutureCancel.get());
    
                    if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                        final Thread t = executionThread.get();
                        if (t != null && !t.equals(Thread.currentThread())) {
                            t.interrupt();
                        }
                    }
    
                    return res;
                }
    
                @Override
                public boolean isCancelled() {
                    return delegate.isCancelled();
                }
    
                @Override
                public boolean isDone() {
                    return delegate.isDone();
                }
    
                @Override
                public R get() throws InterruptedException, ExecutionException {
                    return delegate.get();
                }
    
                @Override
                public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                    return delegate.get(timeout, unit);
                }
                
            };

       execution.isolation.thread.interruptOnTimeout可以设置当发生timeout时,是否需要中断。通过getScheduler实现。

    threadPool.getScheduler(new Func0<Boolean>() {
                    @Override
                    public Boolean call() {
                        return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                    }
                })
    private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
            private final FutureTask<?> f;
            private final Func0<Boolean> shouldInterruptThread;
            private final ThreadPoolExecutor executor;
    
            private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
                this.f = f;
                this.shouldInterruptThread = shouldInterruptThread;
                this.executor = executor;
            }
    
            @Override
            public void unsubscribe() {
                executor.remove(f);
                if (shouldInterruptThread.call()) {
                    f.cancel(true);
                } else {
                    f.cancel(false);
                }
            }
    
            @Override
            public boolean isUnsubscribed() {
                return f.isCancelled();
            }
        }
  • 相关阅读:
    常见面试题1
    勒索病毒防范方法
    VMware虚拟机打开后不兼容
    win10桌面显示我的电脑设置
    scala集合和Java集合对应转换操作
    scala中使用redis
    爬虫调研
    hadoop命令
    IDEA打jar包
    spark.mllib
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/8116997.html
Copyright © 2011-2022 走看看