zoukankan      html  css  js  c++  java
  • 大数据文件处理

    在处理大数据文件时,利用"生产者-消费者"线程模型进行处理,代码实现如下:

    /**
     * 文件处理类
     * */
    public class FileProcessor {
    	
    	/**读取文件的路径*/
    	private String path = "";
    	
    	/**指定默认工作队列的大小*/
    	public static final int MAXWORKQUEUESIZE = 2 << 12;
    	
    	/**工作线程队列*/
    	private BlockingQueue<Runnable> workQueue = null; 
    	
    	/**数据处理线程池*/
    	private ThreadPoolExecutor excutor = null;
    	
    	public FileProcessor(String file) {
    		this.path = file;
    		workQueue = new LinkedBlockingQueue<Runnable>(MAXWORKQUEUESIZE);
    		excutor = new ThreadPoolExecutor(10 , 15 , 5 * 60 * 1000L , TimeUnit.MILLISECONDS, workQueue);
    	}
    	
    	public FileProcessor(BlockingQueue<Runnable> workQueue,String file) {
    		this.path = file;
    		excutor = new ThreadPoolExecutor(10 , 15 , 5 * 60 * 1000L , TimeUnit.MILLISECONDS, workQueue);
    	}
    	
    	public void process() {
    		/**开启文件读取线程*/
    		FileReaderProcessor fileReaderProcessor = new FileReaderProcessor(path,excutor);
    		excutor.execute(fileReaderProcessor);
    		/**任务提交失败时,交给FileReaderRejectHandler处理*/
    		excutor.setRejectedExecutionHandler(FileReaderRejectHandler.getInstance());
    	}
    	
    	public static void main(String []args) {
    		FileProcessor proc = new FileProcessor("D://test");
    		proc.process();
    	}
    	
    }
    /************************华丽的分割线**************************/
    /**读取文件线程*/
    public class FileReaderProcessor implements Runnable {
    
    	/**读取文件路径*/
    	private String path = "";
    	
    	private ThreadPoolExecutor excutor = null;
    	
    	public FileReaderProcessor(String file, ThreadPoolExecutor excutor) {
    		this.path = file;
    		this.excutor = excutor;
    	}
    	
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		FileReader reader = null;
    		BufferedReader br = null;
    		int lineNumber = 0;
    		try {
    			reader = new FileReader(path);
    			br = new BufferedReader(reader);
    			String str = null;
    			while((str = br.readLine()) != null) {
    				++lineNumber;
    				System.out.println("[" + Thread.currentThread().getName() + "] read " + lineNumber + " rows");
    				/**防止读入过快,导致工作队列已满无法接受任务,则超过工作队列0.75时,暂停提交*/
    				if(excutor.getQueue().size() >= FileProcessor.MAXWORKQUEUESIZE * 0.75) {
    					System.out.println("[" + Thread.currentThread().getName() + "] sleep 5 seconds");
    					TimeUnit.SECONDS.sleep(5); /**休眠五秒中*/
    				}
    				excutor.submit(new DateHandlerProcessor(str));
    			}
    		} catch (FileNotFoundException e) {
    			// TODO Auto-generated catch block
    			System.out.println("File Not Find Error : " + e.getMessage());
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			System.out.println("Read File Io Error : " + e.getMessage());
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			System.out.println("Thread Interrupt Error : " + e.getMessage());
    		} finally {
    			/**关闭资源*/
    			this.close(br, reader, excutor);
    		}
    	}
    
    	public void close(BufferedReader br, FileReader reader, ThreadPoolExecutor executor) {
    		try {
    			if(br != null) {
    				br.close();
    			}
    			if(reader != null) {
    				reader.close();
    			}
    			/**关闭线程池*/
    			while(excutor.getQueue().size() != 0) {
    				TimeUnit.SECONDS.sleep(1);
    			}
    			excutor.shutdown();
    			if(!excutor.awaitTermination(5 * 60 * 1000L, TimeUnit.MILLISECONDS)) {
    				excutor.shutdownNow();
    			}
    		} catch(Exception e) {
    			System.out.println("Close Error : " + e.getMessage());
    		}
    	}
    	
    }
    /**********************华丽的分割线****************************/
    /**
     * 向线程池提交任务时,提交任务被拒绝(线程池已shutdown或任务队列已满)时处理类
     * */
    public class FileReaderRejectHandler implements RejectedExecutionHandler {
    
    	private static FileReaderRejectHandler instance = null;
    	
    	static {
    		if(instance == null) {
    			instance = new FileReaderRejectHandler();
    		}
    	}
    	
    	private FileReaderRejectHandler() {
    		
    	}
    	
    	public static FileReaderRejectHandler getInstance() {
    		return instance;
    	}
    	
    	@Override
    	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    		// TODO Auto-generated method stub
    		if(executor.isShutdown()) { /**线程池已关闭,则不做处理*/
    			return;
    		} 
    		try {
    			/**当前线程睡眠5秒中,再次提交任务*/
    			TimeUnit.SECONDS.sleep(5);
    			executor.execute(r);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    /***********************华丽的分割线***************************/
    /**数据处理类*/
    public class DateHandlerProcessor implements Runnable {
    
    	/**处理文件一行内容*/
    	private String line = "";
    	
    	public DateHandlerProcessor(String line) {
    		this.line = line;
    	}
    	
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		try {
    			System.out.println("Thread[" + Thread.currentThread().getName() + "] Get Line " +  line);
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			System.out.println("Thread[" + Thread.currentThread().getName() + "] Interrupt : " + e.getMessage());
    		}
    	}
    
    }
    

      

  • 相关阅读:
    SpringBoot整合NoSql--(四)Session共享
    SpringBoot整合NoSql--(三)Redis集群
    SpringBoot整合NoSql--(二)MongoDB
    SpringBoot整合NoSql--(一)Redis
    SpringBoot整合持久层技术--(三)Spring Data JPA
    ArcGIS Server 10.4切片图的制作与发布
    ArcGIS api for JavaScript 3.27 聚合(cluster)
    ArcGIS api for JavaScript 3.27 按需显示需要的图层
    ArcGIS api for JavaScript 3.27 在线浏览的一些小部件
    WebGIS小理论(持续更新)
  • 原文地址:https://www.cnblogs.com/hanfight/p/4146158.html
Copyright © 2011-2022 走看看