zoukankan      html  css  js  c++  java
  • hystrix总结之限流

      hystrix使用舱壁隔离模式来隔离和限制各个请求,设计了两种隔离方式:信号量和线程池。线程池隔离:对每个command创建一个自己的线程池,执行调用。通过线程池隔离来保证不同调用不会相互干扰和每一个调用的并发限制。信号量隔热:对每个command创建一个自己的计数器,当并发量超过计数器指定值时,直接拒绝。使用信号量和线程池的一个区别是,信号量没有timeout机制。

      线程池隔离的本质是,如果在线程池执行模式下,调用响应的线程池,如果执行数量超过指定限制,线程池就会抛出异常。

    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
                return Observable.defer(new Func0<Observable<R>>() {
                    @Override
                    public Observable<R> call() {
                            ...
                            try {
                              ...
                                executionHook.onExecutionStart(_cmd);
                                return getUserExecutionObservable(_cmd);
                            } catch (Throwable ex) {
                                return Observable.error(ex);
                            }
                       ...
                    }
                }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                    @Override
                    public Boolean call() {
                        return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                    }
                }));
            }

      信号量隔离的本质是,针对每一个command使用一个原子变量,定义当前其执行并发量,如果在SEMAPHORE执行时,会尝试获取这个原子变量,如果超过了限制执行fallback降级流程。

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
           ...
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    if (executionSemaphore.tryAcquire()) { try { ... return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } ... }
    protected TryableSemaphore getExecutionSemaphore() {
            if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
                if (executionSemaphoreOverride == null) {
                    TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
                    if (_s == null) {
                        // we didn't find one cache so setup
                        executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
                        // assign whatever got set (this or another thread)
                        return executionSemaphorePerCircuit.get(commandKey.name());
                    } else {
                        return _s;
                    }
                } else {
                    return executionSemaphoreOverride;
                }
            } else {
                return TryableSemaphoreNoOp.DEFAULT;
            }
        }

      TryableSemaphoreActual封装了一个原子

    static class TryableSemaphoreActual implements TryableSemaphore {
            protected final HystrixProperty<Integer> numberOfPermits;
            private final AtomicInteger count = new AtomicInteger(0);
    
            public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
                this.numberOfPermits = numberOfPermits;
            }
    
            @Override
            public boolean tryAcquire() {
                int currentCount = count.incrementAndGet();
                if (currentCount > numberOfPermits.get()) {
                    count.decrementAndGet();
                    return false;
                } else {
                    return true;
                }
            }
    
            @Override
            public void release() {
                count.decrementAndGet();
            }
    
            @Override
            public int getNumberOfPermitsUsed() {
                return count.get();
            }
    
        }
  • 相关阅读:
    java中级或者高级面试题分享
    java常使用的框架
    spring的定时器
    ArrayList源码理解
    缓存 Memached
    ORM框架
    Web处理方式
    git使用
    Entity Framework
    .net 学习笔记2
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/8250390.html
Copyright © 2011-2022 走看看