zoukankan      html  css  js  c++  java
  • 跨线程池传递线程变量,使用阿里的transmittable-thread-local

    https://blog.csdn.net/gududedabai/article/details/83059226?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4

    https://blog.csdn.net/gududedabai/article/details/83059381

    加入以下pom依赖:

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>transmittable-thread-local</artifactId>
        <version>2.2.0</version>
    </dependency>
    

      

    转载改造hystrix线程池方法:

    改造线程池方式

    上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:

    在启动之前调用进行注册自定义实现的逻辑:

    HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

    ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。

    我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.netflix.hystrix.HystrixThreadPoolKey;
    import com.netflix.hystrix.HystrixThreadPoolProperties;
    import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
    import com.netflix.hystrix.strategy.properties.HystrixProperty;
    import com.netflix.hystrix.util.PlatformSpecific;
    
    public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    	private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);
    
    	@Override
    	public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
    			HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue) {
    		final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
    
    		final int dynamicCoreSize = corePoolSize.get();
    		final int dynamicMaximumSize = maximumPoolSize.get();
    
    		if (dynamicCoreSize > dynamicMaximumSize) {
    			logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()
    					+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize
    					+ ".  Maximum size will be set to " + dynamicCoreSize
    					+ ", the coreSize value, since it must be equal to or greater than the coreSize value");
    			return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit,
    					workQueue, threadFactory);
    		} else {
    			return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit,
    					workQueue, threadFactory);
    		}
    	}
    
    	@Override
    	public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
    			HystrixThreadPoolProperties threadPoolProperties) {
    		final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
    
    		final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties
    				.getAllowMaximumSizeToDivergeFromCoreSize().get();
    		final int dynamicCoreSize = threadPoolProperties.coreSize().get();
    		final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
    		final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
    		final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
    
    		if (allowMaximumSizeToDivergeFromCoreSize) {
    			final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
    			if (dynamicCoreSize > dynamicMaximumSize) {
    				logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()
    						+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize
    						+ ".  Maximum size will be set to " + dynamicCoreSize
    						+ ", the coreSize value, since it must be equal to or greater than the coreSize value");
    				return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime,
    						TimeUnit.MINUTES, workQueue, threadFactory);
    			} else {
    				return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime,
    						TimeUnit.MINUTES, workQueue, threadFactory);
    			}
    		} else {
    			return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES,
    					workQueue, threadFactory);
    		}
    	}
    
    	private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
    		if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
    			return new ThreadFactory() {
    				private final AtomicInteger threadNumber = new AtomicInteger(0);
    
    				@Override
    				public Thread newThread(Runnable r) {
    					Thread thread = new Thread(r,
    							"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
    					thread.setDaemon(true);
    					return thread;
    				}
    
    			};
    		} else {
    			return PlatformSpecific.getAppEngineThreadFactory();
    		}
    	}
    }
    

    ThreadLocalThreadPoolExecutor的代码:

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import com.alibaba.ttl.TransmittableThreadLocal;
    import com.alibaba.ttl.TtlRunnable;
    
    public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {
    	private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    
    	public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>();
    
    	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    	}
    
    	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    	}
    
    	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    	}
    
    	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
    			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    		super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    	}
    
    	@Override
    	public void execute(Runnable command) {
    		super.execute(TtlRunnable.get(command));
    	}
    }
    

    启动时加入插件

    HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());
    

     使用方法:调用feign client服务之前,设置线程变量

    ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);
    

     在FeignAuthConfiguration里,调用appTokenHolder.get();之前加入设置租户id

    Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get();
    DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);
    

      

      

    使用线程变量三种方式测试:

    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    import com.alibaba.ttl.TransmittableThreadLocal;
    import com.alibaba.ttl.TtlRunnable;
    
    public class Test {
    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    //		testThreadLocal1();
    		// testThreadLocal2();
    		testThreadLocal3();
    	}
    
    	private static void testThreadLocal1() throws InterruptedException, ExecutionException {
    		final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();
    		ExecutorService executorService = Executors.newFixedThreadPool(1);
    		for (int i = 0; i < 20; i++) {
    			local.set(i + "");
    			System.out.println(local.get());
    			Future<?> future = executorService.submit(new Runnable() {
    
    				@Override
    				public void run() {
    					System.out.println(Thread.currentThread().getName() + ":" + local.get());
    					local.set(null);
    				}
    			});
    			future.get();
    			System.out.println(local.get());
    			local.set(null);
    		}
    	}
    
    	private static void testThreadLocal2() throws InterruptedException, ExecutionException {
    		ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();
    		ExecutorService executorService = Executors.newFixedThreadPool(1);
    		for (int i = 0; i < 20; i++) {
    			local.set(i + "");
    			System.out.println(local.get());
    			Future<?> future = executorService.submit(new ParamRunnable(i + ""));
    			future.get();
    			System.out.println(local.get());
    			local.set(null);
    		}
    	}
    
    	private static void testThreadLocal3() throws InterruptedException, ExecutionException {
    		final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();
    		ExecutorService executorService = Executors.newFixedThreadPool(1);
    		for (int i = 0; i < 20; i++) {
    			context.set(i + "");
    			System.out.println(context.get());
    			Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() {
    				public void run() {
    					System.out.println(Thread.currentThread().getName() + ":" + context.get());
    					context.set(null);
    				}
    			}));
    			future.get();
    			System.out.println(context.get());
    			context.set(null);
    		}
    	}
    
    	private static class ParamRunnable implements Runnable {
    
    		private String param;
    
    		public ParamRunnable(String param) {
    			this.param = param;
    		}
    
    		@Override
    		public void run() {
    			System.out.println(Thread.currentThread().getName() + ":" + param);
    		}
    
    	}
    
    }
    
  • 相关阅读:
    期中架构实现步骤
    安装centos以及优化步骤
    inotify+rsync实现实时热备
    [转]ubuntu安装vncserver实现图形化访问
    [转]电烙铁的使用小技巧
    彻底解决 LINK : fatal error LNK1123: 转换到 COFF 期间失败: 文件无效或损坏
    解读系统托盘图标隐藏(删除)
    一个小公式帮你轻松将IP地址从10进制转到2进制
    [查阅]Dalvik opcodes
    [查阅]MSIL Instruction Set
  • 原文地址:https://www.cnblogs.com/yaoyu1983/p/12772029.html
Copyright © 2011-2022 走看看