zoukankan      html  css  js  c++  java
  • Java 多线程读取文件并统计词频 实例 出神入化的《ThreadPoolExecutor》

    
    
    重在展示多线程ThreadPoolExecutor的使用,和线程同步器CountDownLatch,以及相关CAS的原子操作和线程安全的Map/队列。
    ThreadPool主线程
     1 import java.io.BufferedWriter;
     2 import java.io.File;
     3 import java.io.FileWriter;
     4 import java.util.*;
     5 import java.util.concurrent.*;
     6 import java.util.concurrent.atomic.AtomicInteger;
     7 import java.util.concurrent.atomic.AtomicReferenceArray;
     8 
     9 /**
    10  * ClassName: ThreadPool
    11  * Description:
    12  * date: 2021/1/16 18:24
    13  *
    14  * @author hewei
    15  */
    16 public class ThreadPool {
    17     /**存储词的队列**/
    18     private static ArrayBlockingQueue<String> oneWordQueue = new ArrayBlockingQueue<String>(Integer.MAX_VALUE / 4);
    19     /**存储词的次数**/
    20     private static ConcurrentHashMap<String, AtomicInteger> wordCount = new ConcurrentHashMap<String, AtomicInteger>();
    21     /**判断中英文的正则**/
    22     private static String[] englishOrChinaWordCompile = {"[a-zA-Z]+", "[\u4e00-\u9fa5]"};
    23     private static long waitTimeSecond = 5;
    24     /**已知会创建10个线程池**/
    25     private static CountDownLatch countDownLatch = new CountDownLatch(10);
    26     /**存储词的次数排序**/
    27     private static AtomicReferenceArray wordCountArray=new AtomicReferenceArray<String>(Integer.MAX_VALUE/16);
    28 
    29     public static void main(String[] args) {
    30         BlockingQueue<Runnable> blockingQueue = new LinkedBlockingDeque<>(10000);
    31         ThreadPoolExecutor readFilePool = new ThreadPoolExecutor(5, 5,
    32                 waitTimeSecond, TimeUnit.SECONDS, blockingQueue, new UserThreadFactory("Study-WordCountReadFile-"),
    33                 new UserRejectHandler());
    34         //executor.allowCoreThreadTimeOut(true);
    35         Long beginTime = System.currentTimeMillis();
    36         //读取D盘的文件
    37         File file = new File("E:\tmp");
    38         if (file.isDirectory()) {
    39             File[] files = file.listFiles();
    40             // 定义文件类型
    41             for (File one : files) {
    42                 String fileName = one.getName();
    43                 readFilePool.execute(new WordCountReadFileTask(countDownLatch, "E:\tmp\" + fileName, oneWordQueue, englishOrChinaWordCompile));
    44             }
    45         }
    46         ThreadPoolExecutor sortWordPool = new ThreadPoolExecutor(5, 5,
    47                 waitTimeSecond, TimeUnit.SECONDS, blockingQueue, new UserThreadFactory("Study-WordCount-"),
    48                 new UserRejectHandler());
    49         //executor1.allowCoreThreadTimeOut(true);
    50         for (int i = 0; i < 5; i++) {
    51             sortWordPool.execute(new WordCountTask(countDownLatch, wordCount, oneWordQueue, waitTimeSecond,null));
    52         }
    53         try {
    54             countDownLatch.await();
    55             readFilePool.shutdown();
    56             sortWordPool.shutdown();
    57             // 写出到文件
    58             List<Map.Entry<String, AtomicInteger>> list = new ArrayList(wordCount.entrySet());
    59             Comparator com = new Comparator<Map.Entry<String, AtomicInteger>>(){
    60                 @Override
    61                 public int compare(Map.Entry<String, AtomicInteger> o1, Map.Entry<String, AtomicInteger> o2) {
    62                     return ((Integer)o2.getValue().get()).compareTo((Integer) o1.getValue().get());
    63                 }
    64             };
    65             list.sort(com);
    66             // 写出到文件
    67             BufferedWriter bw = new BufferedWriter(new FileWriter("E:\read.txt"));
    68             for(int i=0;i<list.size();i++){
    69                 if(i<10) {
    70                     System.out.println("单词  " + list.get(i).getKey() + ",次数  " + list.get(i).getValue());
    71                 }
    72                 bw.write("单词  "+ list.get(i).getKey()+",次数  "+ list.get(i).getValue());
    73                 bw.newLine();
    74             }
    75             bw.flush();
    76             bw.close();
    77         } catch (Exception e) {
    78             e.printStackTrace();
    79         }
    80     }
    81 }



    UserThreadFactory
     1 import java.util.concurrent.ThreadFactory;
     2 import java.util.concurrent.atomic.AtomicInteger;
     3 
     4 /**
     5  * ClassName: UserThreadFactory
     6  * Description:自定义线程创建工厂
     7  * date: 2021/1/16 18:26
     8  *
     9  * @author hewei
    10  */
    11 public class UserThreadFactory implements ThreadFactory {
    12     /**
    13      * 自定义线程名称前缀
    14      **/
    15     private final String prefixName;
    16     /**
    17      * 线程计数器 从1开始
    18      */
    19     private final AtomicInteger threadNumber = new AtomicInteger(1);
    20 
    21     public UserThreadFactory(String prefixName) {
    22         this.prefixName = prefixName;
    23     }
    24 
    25     @Override
    26     public Thread newThread(Runnable runnable) {
    27         //创建线程
    28         String name = prefixName + threadNumber.getAndIncrement();
    29         return new WorkThread(runnable,name);
    30     }
    31     /**
    32      *自定义工作线程,定义线程名称有助于对jvm问题排查
    33      */
    34     class WorkThread extends Thread {
    35         /**
    36          * 线程名称
    37          */
    38         private String name;
    39 
    40         /**
    41          * @param target  执行的方法
    42          * @param name 线程的名称
    43          */
    44         public WorkThread(Runnable target, String name) {
    45             super(target);
    46             super.setName(name);
    47             this.name=name;
    48             System.out.println("创建:"+name);
    49         }
    50 
    51         @Override
    52         public void run() {
    53             try {
    54                 /**
    55                  * super.run()等同于target.run()
    56                  */
    57                 super.run();
    58             } finally {
    59                 System.out.println("结束线程:" + name);
    60             }
    61         }
    62     }
    63 }
    WordCountReadFileTask
     1 import java.io.BufferedReader;
     2 import java.io.FileReader;
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ArrayBlockingQueue;
     6 import java.util.concurrent.CountDownLatch;
     7 import java.util.regex.Matcher;
     8 import java.util.regex.Pattern;
     9 
    10 /**
    11  * ClassName: WordCountTask
    12  * Description:
    13  * date: 2021/1/17 19:48
    14  *
    15  * @author hewei
    16  */
    17 public class WordCountReadFileTask implements Runnable {
    18     private String filePathAndName;
    19     private ArrayBlockingQueue<String> oneWordQueue;
    20     private  String[] englishOrChinaWordCompile;
    21     private  CountDownLatch countDownLatch;
    22 
    23     public WordCountReadFileTask(CountDownLatch countDownLatch,String filePathAndName, ArrayBlockingQueue<String> oneWordQueue, String[] englishOrChinaWordCompile) {
    24         this.countDownLatch=countDownLatch;
    25         this.filePathAndName = filePathAndName;
    26         this.oneWordQueue = oneWordQueue;
    27         this.englishOrChinaWordCompile = englishOrChinaWordCompile;
    28     }
    29 
    30     @Override
    31     public void run() {
    32         try {
    33             BufferedReader br = new BufferedReader(new FileReader(filePathAndName));
    34             StringBuffer sb = new StringBuffer();
    35             List<String> strList=new ArrayList<String>();
    36             String line = "";
    37             while((line=br.readLine())!=null){
    38                 sb.append(line);
    39                 /**
    40                  * 为了保证不超过Integer.max_value
    41                  */
    42                 if(sb.length()>50000000) {
    43                     strList.add(sb.toString());
    44                     /**
    45                      * 清空StringBuffer
    46                      * 1.delete,从到到尾
    47                      * 2.new 新的对象。但会丢弃老对象加速gc到来
    48                      * 3.setlength=0,不符合这里的场景
    49                      */
    50                     sb.delete(0,sb.length());
    51                 }
    52             }
    53             if(sb!=null){
    54                 strList.add(sb.toString());
    55             }
    56             br.close();
    57             for(String words:strList) {
    58                 for (String oneCompile : englishOrChinaWordCompile) {
    59                     //正则
    60                     Pattern p = Pattern.compile(oneCompile);
    61                     Matcher matcher = p.matcher(words);
    62                     while (matcher.find()) {
    63                         /**
    64                          * 添加一个元素,如果队列满,则阻塞等待队列被消费腾出空间来
    65                          */
    66                         oneWordQueue.put(matcher.group());
    67                     }
    68                 }
    69             }
    70         } catch (Exception e) {
    71             e.printStackTrace();
    72         }finally {
    73             countDownLatch.countDown();
    74         }
    75     }
    76 }
    WordCountTask
     1 import java.io.BufferedReader;
     2 import java.io.FileReader;
     3 import java.util.ArrayList;
     4 import java.util.Arrays;
     5 import java.util.List;
     6 import java.util.concurrent.ArrayBlockingQueue;
     7 import java.util.concurrent.ConcurrentHashMap;
     8 import java.util.concurrent.CountDownLatch;
     9 import java.util.concurrent.TimeUnit;
    10 import java.util.concurrent.atomic.AtomicInteger;
    11 import java.util.concurrent.atomic.AtomicReferenceArray;
    12 import java.util.regex.Matcher;
    13 import java.util.regex.Pattern;
    14 
    15 /**
    16  * ClassName: WordCountTask
    17  * Description:
    18  * date: 2021/1/17 19:48
    19  *
    20  * @author hewei
    21  */
    22 public class WordCountTask implements Runnable {
    23     private ArrayBlockingQueue<String> oneWordQueue;
    24     private ConcurrentHashMap<String, AtomicInteger> wordCount;
    25     private long waitTimeSecond;
    26     private  CountDownLatch countDownLatch;
    27     private static AtomicReferenceArray wordCountArray;
    28 
    29     public WordCountTask(CountDownLatch countDownLatch,ConcurrentHashMap<String,AtomicInteger> wordCount,
    30                          ArrayBlockingQueue<String> oneWordQueue,long waitTimeSecond,AtomicReferenceArray wordCountArray) {
    31         this.wordCountArray=wordCountArray;
    32         this.countDownLatch=countDownLatch;
    33         this.oneWordQueue = oneWordQueue;
    34         this.wordCount=wordCount;
    35         this.waitTimeSecond=waitTimeSecond;
    36     }
    37 
    38     @Override
    39     public void run() {
    40         try {
    41             String oneWord;
    42             AtomicInteger nowCount;
    43             while ((oneWord=oneWordQueue.poll(waitTimeSecond, TimeUnit.SECONDS))!=null) {
    44                 /**
    45                  * 循环从队列里取出元素,然后加入到map中
    46                  * 在加入map过程中,代码块会有指令重排问题。所以每一步都需要加判断。
    47                  * 所以需要每一个操作都要具有原子性。
    48                  */
    49                 if((nowCount=wordCount.get(oneWord))==null){
    50                     nowCount=new AtomicInteger(1);
    51                     AtomicInteger ifExistCount=wordCount.putIfAbsent(oneWord,nowCount);
    52                     if(ifExistCount!=null) {
    53                         ifExistCount.getAndIncrement();
    54                     }
    55                 }else{
    56                     nowCount.getAndIncrement();
    57                 }
    58                 /**
    59                  * 实时排序,该排序依赖线程安全
    60                  * 略
    61                  */
    62             }
    63         } catch (Exception e) {
    64             e.printStackTrace();
    65         }finally {
    66             countDownLatch.countDown();
    67         }
    68     }
    69 }
  • 相关阅读:
    在SQL2000怎樣用動態實現SQL2005的nvarchar(max)功能
    行列互换
    c#+GUI在aspx页面画图
    做网站用UTF8还是GB2312?
    Mvc如何做权限
    表白网
    vs2008保存很慢,提速
    MVC 向View传值
    aspx画图表
    什么是MVC
  • 原文地址:https://www.cnblogs.com/xzdwn/p/14293580.html
Copyright © 2011-2022 走看看