zoukankan      html  css  js  c++  java
  • hbase源码分析.客户端.预备知识.ExecutorService

    在hbase客户端htable中批处理操作是通过ExecutorService实现的。ExecutorService类似于线程池,用户提交的put,delete等操作都被响应地创建了线程在ExecutorService中执行,并对各个操作的响应进行返回或异常处理。本文对ExecutorService进行初步介绍,作为hbase客户端代码学习的准备知识。

        通常我们会创建一个ExecutorService对象并向其中丢一些线程,然后就任由之执行。例如下面的例子1。

    package java.ExecutorServiceStudy;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    class Reader implements Runnable {
    
    	@Override
    	public void run() {
    		System.err.println(Thread.currentThread().getId());
    
    	}
    
    }
    
    public class ExecutorServiceStudy {
    	public static void main(String args[]) {
    		int readThreads = 10;
    
    		Reader[] readers = new Reader[readThreads];
    		
    		ExecutorService readPool = Executors.newFixedThreadPool(
    				readThreads,
    				new ThreadFactoryBuilder()
    						.setNameFormat("ExecutorServiceStudy " + 1)
    						.setDaemon(true).build());
    		
    		for (int i = 0; i < readThreads; ++i) {
    			Reader reader = new Reader();
    			readers[i] = reader;
    			readPool.execute(reader);
    		}
    	}
    }
    

      

    然而,在hbase中我们如果只是将put,delete操作丢到线程池中任他执行是不够的。所以我们常常需要对各个线程的执行情况或者结果做处理。我们还可以向ExecutorService中丢(submit)一些Callable的对象,并且在submit的时候将其返回值Future记录先来,将来再处理。也就是我们要把握住各个线程的“未来”,而不是任由其发展。例如下面的例子2.

    package java.ExecutorServiceStudy;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    import java.util.TreeMap;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    /**
     * a thread pool where we can throw threads. Anyway, how can we get response/
     * results from each thread?
     * 
     * @author wlu 2012-08-09
     * 
     */
    class MyCallable implements Callable<String> {
    	String id = "";
    
    	public MyCallable(String s) {
    		id = s;
    	}
    
    	@Override
    	public String call() throws Exception {
    		int r = new Random().nextInt();
    		r = Math.abs(r) % 10;
    		for (int i = 0; i < 10000; i++) {
    			for (int j = 0; j < r * 10000; j++)
    				;
    		}
    		return "loop " + r + " X 10^8 times @ id = " + id;
    	}
    
    }
    
    public class ExecutorServiceStudy2 {
    	static int readThreads = 10;
    
    	static ExecutorService pool = Executors.newFixedThreadPool(
    			readThreads,
    			new ThreadFactoryBuilder()
    					.setNameFormat("IPC Reader %d on port " + 1)
    					.setDaemon(true).build());
    
    	static Map<String, Future> futures = new TreeMap<String, Future>();
    
    	public static void main(String args[]) throws InterruptedException,
    			ExecutionException {
    		for (int i = 0; i < 10; i++) {
    			String id = i + "";
    			futures.put(id, pool.submit(new MyCallable(id)));
    		}
    
    		// tmp the keys in a list
    		List<String> keys = new ArrayList<String>();
    
    		for (String s : futures.keySet()) {
    			keys.add(s);
    		}
    
    		int idx = 0;
    
    		// poll the list and deal with result of finished thread
    		while (!keys.isEmpty()) {
    			Object ss = null;
    			try {
    				ss = futures.get(keys.get(idx)).get(5, TimeUnit.MILLISECONDS);
    			} catch (TimeoutException e) {
    				ss = null;
    			}
    			// not finished yet
    			if (ss == null) {
    				idx = (idx + 1) % keys.size();
    				continue;
    			}
    			// finished, remove from the list
    			keys.remove(idx);
    			if (idx >= keys.size()) {
    				idx = 0;
    			}
    			System.err.println(ss);
    		}
    	}
    }
    

      

     可以发现,我们将每个线程打上标签,并把各自的标签和Future绑定在一起(存放在Map中)。在Future的get()函数执行时,会阻塞直到线程执行完成。在上面的例子里,我们对各个Future进行轮训。首先将它们存放在一个List中,轮训时对于没有执行完成的线程暂且跳过,对于已完成的线程则处理线程执行结果,并把它的Future从List中删除掉。get函数的参数是阻塞超时时间,也就是说如果在超时时间之内没有完成,则先跳过。

      这样我们就掌握了用ExecutorService执行多线程,并异步地处理各个线程的返回结果。上面例2的执行结果形如:

    loop 0 X 10^8 times @ id = 2
    loop 0 X 10^8 times @ id = 3
    loop 3 X 10^8 times @ id = 8
    loop 4 X 10^8 times @ id = 9
    loop 5 X 10^8 times @ id = 0
    loop 6 X 10^8 times @ id = 6
    loop 9 X 10^8 times @ id = 4
    loop 7 X 10^8 times @ id = 7
    loop 7 X 10^8 times @ id = 1
    loop 8 X 10^8 times @ id = 5

    id小的线程先与id大的线程执行,从结果可以发现,线程执行时间开销决定了线程结果处理顺序。

  • 相关阅读:
    Stream流的使用
    ThreadLocal原理和使用场景?
    Python+Appium实现APP自动化测试
    查看Linux系统版本信息
    linux命令之修改yum源为国内镜像
    lsb_release: command not found 解决
    docker安装mysql
    win10 系统出现“你不能访问此共享文件夹,因为你组织的安全策略阻止未经身份验证的来宾访问。”
    python常用sys模块
    python常用os模块
  • 原文地址:https://www.cnblogs.com/luweiseu/p/2631732.html
Copyright © 2011-2022 走看看