zoukankan      html  css  js  c++  java
  • Hystrix的工作原理

    一、简介

      hystrix经常被我们用于服务的熔断,降级等领域,基于RxJava(一种基于观察者模式的响应式编程框架)实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。

    二、基本原理

      当我们需要调用某个方法时(一般是远程调用),通过 Hystrix 将方法调用包裹起来,交由 Hystrix 来完成,从而享受 Hystrix 带来保护。

    Hystrix 提供了两个请求命令:HystrixCommand、HystrixObservableCommand,可以使用这两个对象来包裹待执行的任务。

    HystrixCommand用在依赖服务返回单个操作结果的时候:

      execute():同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。

      queue():异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

    HystrixObservableCommand用在依赖服务返回多个操作结果的时候:

      observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable。

      toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。

    三、基本用法

      hystrix可以使用手动自定义Command、注解、结合feign的方式来实现,手动创建和结合feign的这里就不介绍了,主要看一下注解的实现方式。

    1、开启hystrix,启动类上加上@EnableHystrix注解

    2、在需要降级的方法上使用@HystrixCommand

    public class UserServiceImpl{
       @Autowired    private UserDao userDao;
    @HystrixCommand(fallbackMethod
    = "getUserNameFallBack") @Override public String getUserName(String userId){ int i = 1/0 return userDao.getNameById(userId); } public String getUserNameFallBack(String userId){ return "服务暂时不可用,请稍后再试"; } }

    四、初始化

    从@EnableHystrix注解看起 

    //开启EnableCircuitBreaker
    @EnableCircuitBreaker
    public @interface EnableHystrix {
    }
    //导入EnableCircuitBreakerImportSelector
    @Import(EnableCircuitBreakerImportSelector.class)
    public @interface EnableCircuitBreaker {
    }

    在注解上又开启了一个注解@EnableCircuitBreaker,并导入了一个Selector

    @Order(Ordered.LOWEST_PRECEDENCE - 100)
    public class EnableCircuitBreakerImportSelector
            extends SpringFactoryImportSelector<EnableCircuitBreaker> {
    
        @Override
        protected boolean isEnabled() {
            return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                    Boolean.class, Boolean.TRUE);
        }
    
    }

    这是hystrix生效的一个关键点,继承了SpringFactoryImportSelector,此类在初始化后,会执行selectImports(AnnotationMetadata metadata)的方法。此方法会根据注解启动的注解(这里指@EnableCircuitBreaker)从spring.factories文件中获取其配置需要初始化@Configuration类,看下关键代码

            List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
                    .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

    看一下spring.factories文件

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
    org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
    
    org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
    org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

    配置类HystrixCircuitBreakerConfiguration

    @Configuration(proxyBeanMethods = false)
    public class HystrixCircuitBreakerConfiguration {
    
        @Bean
        public HystrixCommandAspect hystrixCommandAspect() {
            return new HystrixCommandAspect();
        }
    
    }

    向spring注入了HystrixCommandAspect

    @Aspect
    public class HystrixCommandAspect {
    
        private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
    
        static {
            META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                    .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                    .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                    .build();
        }
        
        //切点是所有使用了HystrixCommand注解的地方
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
        public void hystrixCommandAnnotationPointcut() {
        }
        //切点是所有使用了HystrixCollapser注解的地方
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
        public void hystrixCollapserAnnotationPointcut() {
        }
        
        //环绕通知
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
            Method method = getMethodFromTarget(joinPoint);
            Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
            //两个注解不能同时作用
            if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
                throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                        "annotations at the same time");
            }
            //META_HOLDER_FACTORY_MAP预先初始化了两个工厂类
            //@HystrixCommand:CommandMetaHolderFactory
            //@HystrixCollapser:CollapserMetaHolderFactory
            MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
            //创建,把切点封装进了MetaHolder
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            // 创建HystrixInvokable,只是一个空接口,没有任何方法,只是用来标记具备可执行的能力
            // 具体的执行由实现类来做
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            //执行类型
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                    metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
            Object result;
            try {
                if (!metaHolder.isObservable()) {
                    // 利用工具CommandExecutor来执行
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = executeObservable(invokable, executionType, metaHolder);
                }
            } catch (HystrixBadRequestException e) {
                throw e.getCause();
            } catch (HystrixRuntimeException e) {
                throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
            }
            return result;
        }
    
       ···
    
    }

    通过AOP编程,创建方法的代理,决定执行何种逻辑

    创建参数包装类

    public MetaHolder create(final ProceedingJoinPoint joinPoint) {
                //获取方法对象
                Method method = getMethodFromTarget(joinPoint);
                //目标对象
                Object obj = joinPoint.getTarget();
                //方法的参数列表
                Object[] args = joinPoint.getArgs();
                //代理对象
                Object proxy = joinPoint.getThis();
                //调用子类create方法
                return create(proxy, method, obj, args, joinPoint);
            }
            
            public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
                //获取HystrixCommand注解的信息
                HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
                //判断方法的返回值类型Future:异步,Observable:rxjava中的被观察者,其他:同步
                ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
                //获取MetaHolder的builder对象
                MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
                if (isCompileWeaving()) {
                    builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
                }
                //建造者模式,创建MetaHolder包装类
                return builder.defaultCommandKey(method.getName())
                                .hystrixCommand(hystrixCommand)
                                .observableExecutionMode(hystrixCommand.observableExecutionMode())
                                .executionType(executionType)
                                .observable(ExecutionType.OBSERVABLE == executionType)
                                .build();
            }

    创建命令执行器

        public HystrixInvokable create(MetaHolder metaHolder) {
            HystrixInvokable executable;
            //@HystrixCollapser
            if (metaHolder.isCollapserAnnotationPresent()) {
                executable = new CommandCollapser(metaHolder);
            //@HystrixCommand 并且 返回值类型是Observable
            } else if (metaHolder.isObservable()) {
                executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
            } else {
                //其他情况 把HystrixCommandBuilder封装进GenericCommand
                executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
            }
            return executable;
        }
        public GenericCommand(HystrixCommandBuilder builder) {
            super(builder);
        }
        
        protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
            super(builder.getSetterBuilder().build());
            //命令形式 包含需要执行的方法 fallback方法
            this.commandActions = builder.getCommandActions();
            this.collapsedRequests = builder.getCollapsedRequests();
            this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
            this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
            this.ignoreExceptions = builder.getIgnoreExceptions();
            //执行类型 ASYNCHRONOUS SYNCHRONOUS OBSERVABLE
            this.executionType = builder.getExecutionType();
        }
        
        public HystrixCommand.Setter build() throws HystrixPropertyException {
            //分组key:类名,命令key:方法名,线程池key
            HystrixCommand.Setter setter = HystrixCommand.Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
            if (StringUtils.isNotBlank(threadPoolKey)) {
                setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
            }
            try {
                //初始化线程池的配置
                setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
            } catch (IllegalArgumentException e) {
                throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
            }
            try {
                //初始化命令执行配置
                setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
            } catch (IllegalArgumentException e) {
                throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
            }
            return setter;
        }

    执行命令

        public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
                                               throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { //转换成子接口HystrixExecutable return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode()
                                    ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }

    HystrixCommand的四种执行方式

     

    测试类

    public class HystrixCommandTest {
    
        private <T> com.netflix.hystrix.HystrixCommand<T> createCommand(T message) {
            com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter
                    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("serviceA"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("methodA"));
            return new com.netflix.hystrix.HystrixCommand<T>(setter) {
                @Override
                protected T run() throws Exception {
                    System.out.println("HystrixCommand执行了!!!" + System.currentTimeMillis());
                    return message;
                }
            };
        }
    
        @Test
        public void test01() {
            com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test01");
            System.out.println(command.execute());
        }
    
    
        @Test
        public void test02() throws ExecutionException, InterruptedException {
            com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test02");
            Future<String> f = command.queue();
            System.out.println("queue之后,command执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            System.out.println(f.get());
        }
    
        @Test
        public void test03() throws InterruptedException {
            HystrixCommand<String> command = createCommand("this is test03");
            // observe直接执行run方法,称为Hot Observable
            Observable<String> observe = command.observe();
            System.out.println("observe之后,command执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            observe.subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println("通过订阅,获取执行结果:" + s);
                }
            });
        }
    
        @Test
        public void test04() throws InterruptedException {
            HystrixCommand<String> command = createCommand("this is test04");
            // toObservable不直接执行run方法
            Observable<String> observe = command.toObservable();
            System.out.println("未订阅,command不执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            observe.subscribe();
            System.out.println("订阅后,command执行了" + System.currentTimeMillis());
            Thread.sleep(1000);
        }
    
        @Test
        public void test05() throws InterruptedException, ExecutionException {
            HystrixCommand<String> command = createCommand("this is test05");
            // toObservable不直接执行run方法
            Future<String> f = command.toObservable().toBlocking().toFuture();
            System.out.println("转成future执行:" + System.currentTimeMillis());
            Thread.sleep(1000);
            System.out.println(f.get());
        }
    }

    回到hystrix源码中queue()方法

        public Future<R> queue() {
            // toObservable转换为Observable
            // toBlocking转换为BlockingObservable
            // toFuture转换为Future
            // 完成了Observable的创建和订阅
            final Future<R> delegate = toObservable().toBlocking().toFuture();
            // 代理future,对于cancel操作做特殊处理
            // 因为toObservable().toBlocking().toFuture()返回的future无法通过cancel方法中断执行线程。
            final Future<R> f = new Future<R>() {
    
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    if (delegate.isCancelled()) {
                        return false;
                    }
                    // 如果 execution.isolation.thread.interruptOnFutureCancel = true(默认false)
                    if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                        // 设置标志位
                        interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                    }
                    // 执行目标future的cancel
                    final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                    // 如果command还没执行完成 且 需要中断执行的线程
                    if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                        // 获取执行线程
                        final Thread t = executionThread.get();
                        // 执行线程非当前线程则中断线程
                        if (t != null && !t.equals(Thread.currentThread())) {
                            t.interrupt();
                        }
                    }
    
                    return res;
                }
    
                ```
                
            };
    
            //判断是否执行完成
            if (f.isDone()) {
                try {
                    //获取结果
                    f.get();
                    return f;
                } catch (Exception e) {
                    ···
                }
            }
    
            return f;
        }

    这里用到了RxJava框架的响应式编程,会执行到具体Command(之前封装的GenericCommand)的run方法

    public class GenericCommand extends AbstractHystrixCommand<Object> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
    
        public GenericCommand(HystrixCommandBuilder builder) {
            super(builder);
        }
    
        //执行目标的方法
        @Override
        protected Object run() throws Exception {
            LOGGER.debug("execute command: {}", getCommandKey().name());
            return process(new Action() {
                @Override
                Object execute() {
                    return getCommandAction().execute(getExecutionType());
                }
            });
        }
        
        //执行fallback方法
        @Override
        protected Object getFallback() {
            final CommandAction commandAction = getFallbackAction();
            if (commandAction != null) {
                try {
                    return process(new Action() {
                        @Override
                        Object execute() {
                            MetaHolder metaHolder = commandAction.getMetaHolder();
                            Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                            return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                        }
                    });
                } catch (Throwable e) {
                    LOGGER.error(FallbackErrorMessageBuilder.create()
                            .append(commandAction, e).build());
                    throw new FallbackInvocationException(unwrapCause(e));
                }
            } else {
                return super.getFallback();
            }
        }
    
    }

    参考链接:https://blog.csdn.net/alex_xfboy/article/details/89844066

    参考链接:https://juejin.cn/column/6960847703521624094

  • 相关阅读:
    POJ 3258 (NOIP2015 D2T1跳石头)
    POJ 3122 二分
    POJ 3104 二分
    POJ 1995 快速幂
    409. Longest Palindrome
    389. Find the Difference
    381. Insert Delete GetRandom O(1)
    380. Insert Delete GetRandom O(1)
    355. Design Twitter
    347. Top K Frequent Elements (sort map)
  • 原文地址:https://www.cnblogs.com/sglx/p/15771838.html
Copyright © 2011-2022 走看看