import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 多线程 字符流 * @author winkey * */ public class MyThread3 extends Thread{ public static SortArray sortArray = new SortArray(10000) ; public String[] dataArr; public MyThread3(String[] dataArr){ this.dataArr = dataArr; } public void run(){ //这个地方也有问题,怎么处理临界资源·~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ //其实我只是想多个线程处理batchDataArr数组 /*for(int i=0;i<batchDataArr.length;i++){ if(this.index<batchDataArr.length){ String str = batchDataArr[index++]; Integer numAsInt = Integer.valueOf(str); System.out.println(Thread.currentThread().getName()+"__"+numAsInt); sortArray.setBit(numAsInt, 1); } */ //while(true){ //String[] batchDataArr = BatchDataArrHolder.take(); //if (null == batchDataArr) continue; String[] batchDataArr = this.dataArr; if (null != batchDataArr){ for(int i=0;i<batchDataArr.length;i++){ String str = batchDataArr[i]; Integer numAsInt = Integer.valueOf(str); System.out.println(Thread.currentThread().getName()+"__"+numAsInt); sortArray.setBit(numAsInt, 1); } } //} } public static void main(String[] args) throws Exception { BufferedReader in = new BufferedReader(new FileReader("E:\bbbbbbb.txt")); BufferedWriter out = new BufferedWriter(new FileWriter("E:\cc.txt")); BatchDataArrHolder holder = new BatchDataArrHolder(); int readLength = 0; char[] chars = new char[5]; String tail=null; String batchData; try{ //创建一个可重用固定线程数的线程池 ExecutorService pool = Executors.newFixedThreadPool(2);//两个子线程 /*MyThread2 mt2 = new MyThread2(); pool.execute(mt1); pool.execute(mt2);*/ while(true){ readLength = in.read(chars, 0, chars.length); //没有读上来数据说明上一次读取数据已读完,不再处理 if(readLength == 0) break; boolean isEnd = false; //读上来的数据少于数组长度,说明这一次已读完,处理完这次后不再继续读取 if(readLength < chars.length){ //System.out.println(String.valueOf(chars).substring(0, readLength)); batchData = String.valueOf(chars).substring(0, readLength); isEnd = true; }else{ //System.out.println(String.valueOf(chars)); batchData = String.valueOf(chars); } //接上前一次的剩余数据 if(tail != null){ batchData = ""+tail+batchData; } //截取本次的剩余数据,留到下次用 tail = batchData.substring(batchData.lastIndexOf(",")+1,batchData.length()); if(tail.length()==0){ tail = null; } batchData = batchData.substring(0,batchData.lastIndexOf(",")); String[] batchDataArr = new String[batchData.split(",").length];//多线程处理这个东西!!! batchDataArr = batchData.split(","); //holder.push(batchDataArr);//主线程将每次读上来的数据保存到 MyThread3 mt1 = new MyThread3(batchDataArr); pool.execute(mt1); if(isEnd==true){ break; } } //关闭线程用 pool.shutdown();//只是不能再提交新任务,等待执行的任务不受影响 try { boolean loop = true; do { //等待所有任务完成 loop = !pool.awaitTermination(2, TimeUnit.SECONDS); //阻塞,直到线程池里所有任务结束 } while(loop); } catch (InterruptedException e) { e.printStackTrace(); } /*while(true){ if(BatchDataArrHolder.isEmpty()){ pool.shutdown(); break; } }*/ //写数据 //Thread.currentThread().sleep(2000); Integer sortUnit = sortArray.getMaxNum(); System.out.println("最大数为: "+sortUnit); for(int i = 0;i<=sortUnit;i++){ if(sortArray.getBit(i)==1){ StringBuffer buf = new StringBuffer(""+i+""); buf.append(","); Integer num = sortArray.repeatingData.get(i); if(num!=null && num>=2){ for(int j=2;j<=num;j++){ buf.append(""+i+"").append(","); } } out.write(buf.toString()); } } out.flush(); }finally{ if(in!=null){ try{ in.close(); }catch(IOException e){ e.printStackTrace(); } } if(out!=null){ try{ out.close(); }catch(IOException e){ e.printStackTrace(); } } } } }