zoukankan      html  css  js  c++  java
  • hystrix源码之请求缓存

    HystrixRequestCache

      请求缓存。内部是一个静态ConcurrentHashMap存储各个命令的缓存器,RequestCacheKey为key,HystrixRequestCache为value。

    private final static ConcurrentHashMap<RequestCacheKey, HystrixRequestCache> caches = new ConcurrentHashMap<RequestCacheKey, HystrixRequestCache>();

      RequestCacheKey由两部分组成:当前command的keyname,指定HystrixConcurrencyStrategy对象。

    private static class RequestCacheKey {
            private final short type; // used to differentiate between Collapser/Command if key is same between them
            private final String key;
            private final HystrixConcurrencyStrategy concurrencyStrategy;
    
            private RequestCacheKey(HystrixCommandKey commandKey, HystrixConcurrencyStrategy concurrencyStrategy) {
                type = 1;
                if (commandKey == null) {
                    this.key = null;
                } else {
                    this.key = commandKey.name();
                }
                this.concurrencyStrategy = concurrencyStrategy;
            }
     ...
    }

      通过getInstance来获取每个命令的缓冲器。

    public static HystrixRequestCache getInstance(HystrixCommandKey key, HystrixConcurrencyStrategy concurrencyStrategy) {
            return getInstance(new RequestCacheKey(key, concurrencyStrategy), concurrencyStrategy);
        }
    
        public static HystrixRequestCache getInstance(HystrixCollapserKey key, HystrixConcurrencyStrategy concurrencyStrategy) {
            return getInstance(new RequestCacheKey(key, concurrencyStrategy), concurrencyStrategy);
        }
    
        private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
            HystrixRequestCache c = caches.get(rcKey);
            if (c == null) {
                HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
                HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
                if (existing == null) {
                    // we won so use the new one
                    c = newRequestCache;
                } else {
                    // we lost so use the existing
                    c = existing;
                }
            }
            return c;
        }

      HystrixRequestCache提供了get来获取指定key缓存,提供了putIfAbsent来存储指定key的缓存内容。内部是一个ConcurrentHashMap存储缓存内容,ValueCacheKey为key,HystrixCachedObservable为value。缓存的内容是请求级别共享的,所以放在一个HystrixRequestVariableHolder中。

    private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>() {
    
            @Override
            public ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> initialValue() {
                return new ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>();
            }
    
            @Override
            public void shutdown(ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> value) {
                // nothing to shutdown
            }
    
        });

      ValueCacheKey由三部分组成:RequestCacheKey对象和指定String类型 key。

    private static class ValueCacheKey {
            private final RequestCacheKey rvKey;
            private final String valueCacheKey;
            private ValueCacheKey(RequestCacheKey rvKey, String valueCacheKey) {
                this.rvKey = rvKey;
                this.valueCacheKey = valueCacheKey;
            }
    
             ...
    
        }

      get和putIfAbsent方法

    @SuppressWarnings({ "unchecked" })
        /* package */<T> HystrixCachedObservable<T> get(String cacheKey) {
            ValueCacheKey key = getRequestCacheKey(cacheKey);
            if (key != null) {
                ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
                if (cacheInstance == null) {
                    throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
                }
                /* look for the stored value */
                return (HystrixCachedObservable<T>) cacheInstance.get(key);
            }
            return null;
        }
    
        
        @SuppressWarnings({ "unchecked" })
        /* package */<T> HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
            ValueCacheKey key = getRequestCacheKey(cacheKey);
            if (key != null) {
                /* look for the stored value */
                ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
                if (cacheInstance == null) {
                    throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
                }
                HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
                if (alreadySet != null) {
                    // someone beat us so we didn't cache this
                    return alreadySet;
                }
            }
            // we either set it in the cache or do not have a cache key
            return null;
        }

    HystrixCachedObservable

      内部使用了ReplaySubject缓存了originalObservable的结果

     protected final Subscription originalSubscription;
        protected final Observable<R> cachedObservable;
        private volatile int outstandingSubscriptions = 0;
    
        protected HystrixCachedObservable(final Observable<R> originalObservable) {
            ReplaySubject<R> replaySubject = ReplaySubject.create();
            this.originalSubscription = originalObservable
                    .subscribe(replaySubject);
    
            this.cachedObservable = replaySubject
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            outstandingSubscriptions--;
                            if (outstandingSubscriptions == 0) {
                                originalSubscription.unsubscribe();
                            }
                        }
                    })
                    .doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            outstandingSubscriptions++;
                        }
                    });
        }
    
        public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
            return new HystrixCommandResponseFromCache<R>(o, originalCommand);
        }
    
        public static <R> HystrixCachedObservable<R> from(Observable<R> o) {
            return new HystrixCachedObservable<R>(o);
        }
    
        public Observable<R> toObservable() {
            return cachedObservable;
        }
    
        public void unsubscribe() {
            originalSubscription.unsubscribe();
        }

    HystrixCommandResponseFromCache

      HystrixCachedObservable的子类,HystrixCommand实际使用来缓存结果的类。该类的作用是,当使用缓存结果时,会同步之前命令的运行结果

    public class HystrixCommandResponseFromCache<R> extends HystrixCachedObservable<R> {
        private final AbstractCommand<R> originalCommand;
    
        /* package-private */ HystrixCommandResponseFromCache(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
            super(originalObservable);
            this.originalCommand = originalCommand;
        }
    
        public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
            final AtomicBoolean completionLogicRun = new AtomicBoolean(false);
    
            return cachedObservable
                    .doOnError(new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            if (completionLogicRun.compareAndSet(false, true)) {
                                commandCompleted(commandToCopyStateInto);
                            }
                        }
                    })
                    .doOnCompleted(new Action0() {
                        @Override
                        public void call() {
                            if (completionLogicRun.compareAndSet(false, true)) {
                                commandCompleted(commandToCopyStateInto);
                            }
                        }
                    })
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            if (completionLogicRun.compareAndSet(false, true)) {
                                commandUnsubscribed(commandToCopyStateInto);
                            }
                        }
                    });
        }
    
        private void commandCompleted(final AbstractCommand<R> commandToCopyStateInto) {
            commandToCopyStateInto.executionResult = originalCommand.executionResult;
        }
    
        private void commandUnsubscribed(final AbstractCommand<R> commandToCopyStateInto) {
            commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.addEvent(HystrixEventType.CANCELLED);
            commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.setExecutionLatency(-1);
        }
    }
  • 相关阅读:
    UNIX网络编程读书笔记:原始套接口
    UNIX网络编程读书笔记:UNIX域协议
    UNIX网络编程读书笔记:名字与地址转换
    链表的游标(cursor)实现
    基数排序
    UNIX网络编程读书笔记:基本SCTP套接口编程
    Android系列之Fragment(三)----Fragment和Activity之间的通信(含接口回调)
    git版本控制工具(二)----本地版本库的常用操作
    Android系列之Fragment(二)----Fragment的生命周期和返回栈
    Git版本控制工具(一)----git的安装及创建版本库
  • 原文地址:https://www.cnblogs.com/zhangwanhua/p/7844900.html
Copyright © 2011-2022 走看看