zoukankan      html  css  js  c++  java
  • Hystrix框架3--线程池

    线程池

    在Hystrix中Command默认是运行在一个单独的线程池中的,线程池的名称是根据设定的ThreadPoolKey定义的,如果没有设置那么会使用CommandGroupKey作为线程池。
    这样每个Command都可以拥有自己的线程池而不会互相影响,同时线程池也可以很好地控制Command的并发量。

    设置线程池配置

    可以使用Setter来初始化Command在Setter中可以配置线程池的大小和等待队列的长度:

    public CommandHelloWorld(String name) {
    	super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
    			//配置线程池相关属性,队列大小和线程数
    			.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withMaxQueueSize(10).withCoreSize(10))
    			//配置运行相关参数如运行超时时间
    			.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100000000)));
        //super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
    }
    

    在运行时Hystrix会使用ConcurrencyStrategy来创建线程池以及对应的队列,下面是默认的HystrixConcurrencyStrategy

    //线程池中的等待队列
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        /*
         * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
         * <p>
         * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
         * <p>
         * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
         * and rejecting is the preferred solution.
         */
        if (maxQueueSize <= 0) {
    		//当配置的queue大小小于等于0使用SynchronousQueue,也就是如果没有空闲线程就导致失败
            return new SynchronousQueue<Runnable>();
        } else {
    		//其他情况使用阻塞Queue来配置等待队列
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }
    //创建线程池的方法
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        ThreadFactory threadFactory = null;
        if (!PlatformSpecific.isAppEngine()) {
            threadFactory = new ThreadFactory() {
                protected final AtomicInteger threadNumber = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
    
            };
        } else {
            threadFactory = PlatformSpecific.getAppEngineThreadFactory();
        }
    	//通过配置的信息创建线程池
        return new ThreadPoolExecutor(corePoolSize.get(), maximumPoolSize.get(), keepAliveTime.get(), unit, workQueue, threadFactory);
    }
    

    当然以上是默认的ConcurrencyStrategy,Hystrix中可以通过Plugin配置自定义的Strategy:

    HystrixPlugins.getInstance().registerConcurrencyStrategy
    

    但这个Plugin是单例的且register方法只能调用一次,也就是无法设置多个Strategy,如果想要使用不同的Strategy只能在方法内部使用一定逻辑来完成。

    Semaphore

    还有一种不用ThreadPool的方法,是配置SEMAPHORE

    HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10).withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
    

    这种模式会在主线程自身运行(在调用queue时就会执行)。同时可以通过withExecutionIsolationSemaphoreMaxConcurrentRequests设置并发的数量。

    fallback

    当配置的等待队列满了的时候Hystrix会直接调用Command的fallback方法。
    下面来看下各种情况下代码是执行在哪个线程中的

    ThreadPool模式下

    超时调用getFallback:Timer线程
    线程池队列满调用getFallback:主线程
    Command出错调用getFallback:Command线程池

    Semaphore模式下

    超时调用getFallback:Timer线程
    并发数满调用getFallback:主线程
    Command出错调用getFallback:主线程

    总结

    在使用Hystrix时要注意各个代码是运行在哪个线程中的,防止在不必要的地方有阻塞的调用,如在fallback中如果有阻塞耗时操作,那么在队列满时会导致主线程阻塞,可以考虑在Fallback中再调用新Command,这时还要考虑使用不同的线程池防止任务互相排队。

  • 相关阅读:
    Informatica_(6)性能调优
    Informatica_(5)高级应用
    Informatica_(4)工作流
    Informatica_(3)组件
    Informatica_(2)第一个例子
    Informatica_(1)安装
    Linux_(4)Shell编程(下)
    Linux_(3)Shell编程(上)
    Linux_(2)基本命令(下)
    B
  • 原文地址:https://www.cnblogs.com/resentment/p/5893677.html
Copyright © 2011-2022 走看看