zoukankan      html  css  js  c++  java
  • 转 Hystrix超时实现机制

    HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。

    1.主入口:executeCommandAndObserve

    #com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
            ···省略部分代码···
            Observable<R> execution;
    
            //判断是否开启超时监测
            if (properties.executionTimeoutEnabled().get()) {
                execution = executeCommandWithSpecifiedIsolation(_cmd)
                        .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
            } else {
                execution = executeCommandWithSpecifiedIsolation(_cmd);
            }
    
            return execution.doOnNext(markEmits)
                    .doOnCompleted(markOnCompleted)
                    .onErrorResumeNext(handleFallback)
                    .doOnEach(setRequestContext);
        }
    

    executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

    可以简单的认为lift 里面的对前面的Observable包含,类似装饰者,后面的parent就是指上层的Observable。其中 HystrixObservableTimeoutOperator 就是关键的部分。

    2.关键点: HystrixObservableTimeoutOperator

    先看下HystrixObservableTimeoutOperator.call(),TimerListener的实现

    TimerListener listener = new TimerListener() {
    
                    @Override
                    public void tick() {
                       
                        if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                            // 标记事件,可以认为是开的hook,这里暂忽略
                            originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
    
                            //取消原Obserable的订阅
                            s.unsubscribe();
    
                            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
    
                                @Override
                                public void run() {
                                    child.onError(new HystrixTimeoutException());
                                }
                            });
                            timeoutRunnable.run();
                        }
                    }
    
                    //获取配置的超时时间配置
                    @Override
                    public int getIntervalTimeInMilliseconds() {
                        return originalCommand.properties.executionTimeoutInMilliseconds().get();
                    }
                };
    

    这段代码的意思就是,给当前command的超时状态置为超时,如果设置成功就抛出HystrixTimeoutException异常,紧接着被command的 doOnErron接收走 fallback逻辑

    fallback
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
            final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
    
            .................................
    
            final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    circuitBreaker.markNonSuccess();
                    Exception e = getExceptionFromThrowable(t);
                    executionResult = executionResult.setExecutionException(e);
                    if (e instanceof RejectedExecutionException) {
                        return handleThreadPoolRejectionViaFallback(e);
                    } else if (t instanceof HystrixTimeoutException) {
                        //此处catch到超时异常
                        return handleTimeoutViaFallback();
                    } else if (t instanceof HystrixBadRequestException) {
                        return handleBadRequestByEmittingError(e);
                    } else {
                        /*
                         * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                         */
                        if (e instanceof HystrixBadRequestException) {
                            eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                            return Observable.error(e);
                        }
    
                        return handleFailureViaFallback(e);
                    }
                }
            };
    
            .................................
    
            return execution.doOnNext(markEmits)
                    .doOnCompleted(markOnCompleted)
                    .onErrorResumeNext(handleFallback)
                    .doOnEach(setRequestContext);
        }
    

    同时s.unsubscribe()通知正在执行的线程,终止任务。如何终止呢?

    executeCommandWithSpecifiedIsolation.subscribeOn()

    subscribeOne的参数就是HystrixContextScheduler, Rxjava里 scheduler具体干活的是 worker,我们先看下Hystrix自定义scheduler的结构示意图


    那么我们直奔主题,直接看 ThreadPoolWorker
    //ThreadPoolWorker.schedule
    @Override
    public Subscription schedule(final Action0 action) {
        if (subscription.isUnsubscribed()) {
            return Subscriptions.unsubscribed();
        }
    
        ScheduledAction sa = new ScheduledAction(action);
    
        subscription.add(sa);
        sa.addParent(subscription);
    
        ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
        FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
        sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
    
        return sa;
    }
    

    1.开始的时候判断observable是否被订阅
    2.被订阅后,将任务 submit到线程池
    3.FutureCompleterWithConfigurableInterrupt scheduler在执行的时候,增加了observable的中断探测

    private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
        private final FutureTask<?> f;
        private final Func0<Boolean> shouldInterruptThread;
        private final ThreadPoolExecutor executor;
    
        private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
            this.f = f;
            this.shouldInterruptThread = shouldInterruptThread;
            this.executor = executor;
        }
    
        @Override
        public void unsubscribe() {
            executor.remove(f);
            if (shouldInterruptThread.call()) {
                f.cancel(true);
            } else {
                f.cancel(false);
            }
        }
    
        .....省略代码.......
    }
    

    当observable 取消订阅时,就会把当前任务移除,并中断任务

    到这里只是讲说了超时后的处理,如何认定执行超时呢?

    3.匠心之巧

    这里有个很巧妙的设计,再探HystrixObservableTimeoutOperator

    final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
    
    #com.netflix.hystrix.util.HystrixTimer#addTimerListener
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
            startThreadIfNeeded();
            // add the listener
    
            Runnable r = new Runnable() {
    
                @Override
                public void run() {
                    try {
                        listener.tick();
                    } catch (Exception e) {
                        logger.error("Failed while ticking TimerListener", e);
                    }
                }
            };
    
            ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
            return new TimerReference(listener, f);
        }
    

    利用了ScheduledThreadPoolExecutor,延迟执行,延迟时间就是我们设定的超时时间,我们再看下

    #HystrixObservableTimeoutOperator
    Subscriber<R> parent = new Subscriber<R>() {
    
                    @Override
                    public void onCompleted() {
                        if (isNotTimedOut()) {
                            // stop timer and pass notification through
                            tl.clear();
                            child.onCompleted();
                        }
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        if (isNotTimedOut()) {
                            // stop timer and pass notification through
                            tl.clear();
                            child.onError(e);
                        }
                    }
    
                    .....  .....  .....  .....  .....  .....  .....  .....  .....
    
                    private boolean isNotTimedOut() {
                        // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                        return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                                originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                    }
    
                };
    

    这里parent就是指上层的obserable,这里可以抽象的认为是我们的HystrixCommand执行线程, 当command执行线程执行完成的时候或异常的时候,会执行 tl.clear(), 也就是Future.cancel()会中断 TimerListener 的ScheduledFuture 线程,迫使超时机制失效。

    // tl.clear()
    private static class TimerReference extends SoftReference<TimerListener> {
            private final ScheduledFuture<?> f;
            ....        ....        ....        ....        ....
            @Override
            public void clear() {
                super.clear();
                // stop this ScheduledFuture from any further executions
                f.cancel(false);
            }
        }
    

    4.回归文字

    HystrixCommand里有个 TimedOutStatus 超时状态


    现在可以认为有两个线程,一个是hystrixCommand任务执行线程,一个是等着给hystrixCommand判定超时的线程,现在两个线程看谁能先把hystrixCommand的状态置换,只要任何一个线程对hystrixCommand打上标就意味着超时判定结束。

    系列文章推荐



    作者:青芒v5
    链接:https://www.jianshu.com/p/60074fe1bd86
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    Java EE 在网页输出九九乘法表、三角形、菱形
    Java EE 在网页输出九九乘法表
    格式化时间(SimpleDateFormat)
    Java代码规范性
    Scanner
    数据库怎么删除相同的内容
    连接池 ----单例模式
    多态和接口
    第一个JAVA应用
    继承
  • 原文地址:https://www.cnblogs.com/duanxz/p/10950012.html
Copyright © 2011-2022 走看看