首先吐槽python的多线程是真的垃圾。。。
业务:
对文件里的近2万条数据进行处理,然后存回文件
0. 读取txt存入ArrayList
1. 把ArrayList以2000为一组切割
2. 把2000数据存入各自的线程中
3.把线程放入线程池
4.线程池运行完毕后把结果存回txt
package edu.thu.xlore.unitId; import java.io.*; import java.util.ArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Test { public void unitFile(String filePath){ System.out.println("unitFile start"); ArrayList<String> testArrayList = new ArrayList<>(); File file = new File(filePath); if (!file.exists()) { return; } //从txt中读取数据 BufferedReader bufferedReaderRaw = null; try { bufferedReaderRaw = new BufferedReader(new FileReader(file)); int count = 0; while (true) { String line = bufferedReaderRaw.readLine(); if(line == null){ break; } testArrayList.add(line); } } catch (Exception e) { e.printStackTrace(); }finally { if(bufferedReaderRaw != null) { try { bufferedReaderRaw.close(); } catch (IOException e) { e.printStackTrace(); } } } System.out.println("unitFile: 读取完毕,数量:" + testArrayList.size()); ArrayList<ArrayList<String>> arrayListArrayList = new ArrayList<>(); if(testArrayList.size() < 2000){ arrayListArrayList.add(testArrayList); } //如果数量大于2000,分割ArrayList进行多线程; else { for (int i = 0; i < testArrayList.size(); i++) { int num = i / 2000; if (i % 2000 == 0) { // i = 0, 2000, 4000... arrayListArrayList.add(new ArrayList<String>(2000)); } if (arrayListArrayList.size() == num + 1) { arrayListArrayList.get(num).add(testArrayList.get(i)); } } } System.out.println("unitFile: 分割数量:" + arrayListArrayList.size()); Long time_start = System.currentTimeMillis(); //使用线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for(int i = 0; i < arrayListArrayList.size(); i++){ UnitThread unitThread = new UnitThread(arrayListArrayList, i); threadPoolExecutor.execute(unitThread); } threadPoolExecutor.shutdown(); // shutdown线程池会把已经提交的剩余线程执行完然后关闭, shutdownNow是直接关闭执行中的线程返回剩余没执行的线程 while(true) { //等线程全部执行完毕 //System.out.println("线程池剩余线程数量:" + threadPoolExecutor.getActiveCount()); if (threadPoolExecutor.isTerminated()) { System.out.println("线程全部运行完毕"); break; } } Long time_end = System.currentTimeMillis(); //结果存回txt FileWriter fileWriter = null; BufferedWriter bufferedWriter = null; try { fileWriter = new FileWriter(file); bufferedWriter = new BufferedWriter(fileWriter); for(int i = 0; i < arrayListArrayList.size(); i++){ System.out.println("共有数据: " + arrayListArrayList.get(i).size()); for(int j = 0; j < arrayListArrayList.get(i).size(); j++) { bufferedWriter.write(arrayListArrayList.get(i).get(j) + " "); } } } catch (IOException e) { e.printStackTrace(); }finally { try { if(fileWriter != null) { fileWriter.close(); } } catch (IOException e) { e.printStackTrace(); } } System.out.println("共用时:" + (time_end - time_start) + "毫秒"); } //内部线程类 public class UnitThread extends Thread{ private int pageIndex; private ArrayList<ArrayList<String>> arrayListArrayList; public UnitThread(ArrayList<ArrayList<String>> arrayListArrayList, int pageIndex){ //线程不能取得局部变量,只能作为参数传进来,ArrayList是引用变量,所以值可以直接修改,不需要返回结果。 this.pageIndex = pageIndex; this.arrayListArrayList = arrayListArrayList; } @Override public void run(){ System.out.println("线程" + pageIndex + "开始"); ArrayList<String> unitedCategory = dealwithArrayList(arrayListArrayList.get(pageIndex)); arrayListArrayList.set(this.pageIndex, unitedCategory); //把新的list传回给list集合 System.out.println("线程" + pageIndex + "运行完毕"); } } public ArrayList<String> dealwithArrayList(ArrayList<String> arrayList){ ArrayList<String> reArrayList = new ArrayList<>(); //要对list进行的操作 for(int i = 0 ; i < arrayList.size(); i++){ reArrayList.add(arrayList.get(i) + " 已经处理完了"); } return reArrayList; } public static void main(String[] args){ Test test = new Test(); String filePath = "C:\Users\Administrator\Desktop\test\wordFile.txt"; test.unitFile(filePath); } }
运行结果:
unitFile start
unitFile: 读取完毕,数量:19399
unitFile: 分割数量:10
线程0开始
线程1开始
线程3开始
线程2开始
线程0运行完毕
线程1运行完毕
线程3运行完毕
线程4开始
线程2运行完毕
线程5开始
线程8开始
线程4运行完毕
线程8运行完毕
线程5运行完毕
线程9开始
线程6开始
线程9运行完毕
线程7开始
线程6运行完毕
线程7运行完毕
线程全部运行完毕
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 2000
共有数据: 1399
共用时:8毫秒
Process finished with exit code 0