zoukankan      html  css  js  c++  java
  • java多线程批量读取文件( 八)--读写分离

    package com.net.thread.future;
    
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    import java.util.Set;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * @author 
     * @Time:2017年8月16日 下午5:26:37
     * @version 1.0
     * @description
     */
    public class CallableDemo3 {
    
        final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        public static void main(String[] args)
        {
            File f = new File("C://Users//LENOVO//Desktop//file");
            // 文件总数
            final List<File> filePathsList = new ArrayList<File>();
            File[] filePaths = f.listFiles();
            for (File s : filePaths) {
                filePathsList.add(s);
            }
    
            CountDownLatch latch = new CountDownLatch(filePathsList.size());
            ExecutorService pool = Executors.newFixedThreadPool(10);
    
            BlockingQueue<Future<Map<String, FileInputStream>>> queue = 
                    new ArrayBlockingQueue<Future<Map<String, FileInputStream>>>(100);
    
            System.out.println("-------------文件读、写任务开始时间:" + sdf.format(new Date()));
            for (int i = 0; i < filePathsList.size(); i++) {
                File temp = filePathsList.get(i);
                Future<Map<String, FileInputStream>> future = pool.submit(new MyCallableProducer(latch, temp));
                queue.add(future);
    
                pool.execute(new MyCallableConsumer(queue));
            }
            
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-------------文件读、写任务结束时间:" + sdf.format(new Date()));
             pool.shutdownNow();
        }
    
        
        // 文件读线程
        static class MyCallableProducer implements Callable<Map<String, FileInputStream>>
        {
            private CountDownLatch latch;
            private File file;
            private FileInputStream fis = null;
            private Map<String, FileInputStream> fileMap = new HashMap<String, FileInputStream>();
    
            public MyCallableProducer(CountDownLatch latch, File file)
            {
                this.latch = latch;
                this.file = file;
            }
    
            @Override
            public Map<String, FileInputStream> call() throws Exception
            {
                System.out.println(Thread.currentThread().getName() + " 线程开始读取文件 :" + file.getName() + " ,时间为 "+ sdf.format(new Date()));
                fis = new FileInputStream(file);
                fileMap.put(file.getName(), fis);
                 doWork();
                System.out.println(Thread.currentThread().getName() + " 线程读取文件 :" + file.getName() + " 完毕"  + " ,时间为 "+ sdf.format(new Date()));
                latch.countDown();
                return fileMap;
            }
            
            private void doWork() 
            {
            //此方法可以添加一些业务逻辑,比如包装pojo等等操作,返回的值可以是任何类型 Random rand
    = new Random(); int time = rand.nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } // 文件写线程 static class MyCallableConsumer implements Runnable { private String fileName = ""; private BlockingQueue<Future<Map<String, FileInputStream>>> queue; private FileInputStream fis = null; private File dirFile = null; private BufferedReader br = null; private InputStreamReader isr = null; private FileWriter fw = null; private BufferedWriter bw = null; public MyCallableConsumer(BlockingQueue<Future<Map<String, FileInputStream>>> queue2) { this.queue = queue2; } @Override public void run() { try { Future<Map<String, FileInputStream>> future = queue.take(); Map<String, FileInputStream> map = future.get(); Set<String> set = map.keySet(); for (Iterator<String> iter = set.iterator(); iter.hasNext();) { fileName = iter.next().toString(); fis = map.get(fileName); System.out.println(Thread.currentThread().getName() + " 线程开始写文件 :" + fileName + " ,时间为 "+ sdf.format(new Date())); try { isr = new InputStreamReader(fis, "utf-8"); br = new BufferedReader(isr); dirFile = new File("d:" + File.separator + "gc3" + File.separator + fileName); fw = new FileWriter(dirFile); bw = new BufferedWriter(fw); String data = ""; bw.write("+++++++++++++" + Thread.currentThread().getName() + " 线程开始写文件++++++++++++"); while ((data = br.readLine()) != null) { bw.write(data + " "); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { bw.close(); br.close(); } catch (IOException e) { e.printStackTrace(); } } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }

    说明:

    1、其实构思很简单,阻塞队列是线程安全的,那么我多线程就用阻塞队列,这样可以保证每个写线程拿到的具体内容不同,不会导致重复写数据;

    2、我使用异步线程进行读写,而非同步线程,这样有助于提升整体读、写性能。

    3、CountDownLatch是信号灯,功能类似于join()方法,当然也可以使用CyclicBarrier

  • 相关阅读:
    C# GetHashCode 部分冲突列表 数字字符串版本
    Amazon Dynamo DB
    SCTP 一句话介绍
    SystemTap 使用以及安装
    Windows Azure Service Disruption on Feb 29th
    发布ASP.NET MVC3网站
    SQLServer数据集合的交、并、差集运算
    DataTable的几个函数
    oracle创建表空间以及用户的语句
    asp.net mvc3及odp.net资料下载地址
  • 原文地址:https://www.cnblogs.com/chen1-kerr/p/7382306.html
Copyright © 2011-2022 走看看