zoukankan      html  css  js  c++  java
  • Java 多线程读取文件并统计词频 实例 出神入化的《ThreadPoolExecutor》

    
    
    重在展示多线程ThreadPoolExecutor的使用,和线程同步器CountDownLatch,以及相关CAS的原子操作和线程安全的Map/队列。
    ThreadPool主线程
     1 import java.io.BufferedWriter;
     2 import java.io.File;
     3 import java.io.FileWriter;
     4 import java.util.*;
     5 import java.util.concurrent.*;
     6 import java.util.concurrent.atomic.AtomicInteger;
     7 import java.util.concurrent.atomic.AtomicReferenceArray;
     8 
     9 /**
    10  * ClassName: ThreadPool
    11  * Description:
    12  * date: 2021/1/16 18:24
    13  *
    14  * @author hewei
    15  */
    16 public class ThreadPool {
    17     /**存储词的队列**/
    18     private static ArrayBlockingQueue<String> oneWordQueue = new ArrayBlockingQueue<String>(Integer.MAX_VALUE / 4);
    19     /**存储词的次数**/
    20     private static ConcurrentHashMap<String, AtomicInteger> wordCount = new ConcurrentHashMap<String, AtomicInteger>();
    21     /**判断中英文的正则**/
    22     private static String[] englishOrChinaWordCompile = {"[a-zA-Z]+", "[\u4e00-\u9fa5]"};
    23     private static long waitTimeSecond = 5;
    24     /**已知会创建10个线程池**/
    25     private static CountDownLatch countDownLatch = new CountDownLatch(10);
    26     /**存储词的次数排序**/
    27     private static AtomicReferenceArray wordCountArray=new AtomicReferenceArray<String>(Integer.MAX_VALUE/16);
    28 
    29     public static void main(String[] args) {
    30         BlockingQueue<Runnable> blockingQueue = new LinkedBlockingDeque<>(10000);
    31         ThreadPoolExecutor readFilePool = new ThreadPoolExecutor(5, 5,
    32                 waitTimeSecond, TimeUnit.SECONDS, blockingQueue, new UserThreadFactory("Study-WordCountReadFile-"),
    33                 new UserRejectHandler());
    34         //executor.allowCoreThreadTimeOut(true);
    35         Long beginTime = System.currentTimeMillis();
    36         //读取D盘的文件
    37         File file = new File("E:\tmp");
    38         if (file.isDirectory()) {
    39             File[] files = file.listFiles();
    40             // 定义文件类型
    41             for (File one : files) {
    42                 String fileName = one.getName();
    43                 readFilePool.execute(new WordCountReadFileTask(countDownLatch, "E:\tmp\" + fileName, oneWordQueue, englishOrChinaWordCompile));
    44             }
    45         }
    46         ThreadPoolExecutor sortWordPool = new ThreadPoolExecutor(5, 5,
    47                 waitTimeSecond, TimeUnit.SECONDS, blockingQueue, new UserThreadFactory("Study-WordCount-"),
    48                 new UserRejectHandler());
    49         //executor1.allowCoreThreadTimeOut(true);
    50         for (int i = 0; i < 5; i++) {
    51             sortWordPool.execute(new WordCountTask(countDownLatch, wordCount, oneWordQueue, waitTimeSecond,null));
    52         }
    53         try {
    54             countDownLatch.await();
    55             readFilePool.shutdown();
    56             sortWordPool.shutdown();
    57             // 写出到文件
    58             List<Map.Entry<String, AtomicInteger>> list = new ArrayList(wordCount.entrySet());
    59             Comparator com = new Comparator<Map.Entry<String, AtomicInteger>>(){
    60                 @Override
    61                 public int compare(Map.Entry<String, AtomicInteger> o1, Map.Entry<String, AtomicInteger> o2) {
    62                     return ((Integer)o2.getValue().get()).compareTo((Integer) o1.getValue().get());
    63                 }
    64             };
    65             list.sort(com);
    66             // 写出到文件
    67             BufferedWriter bw = new BufferedWriter(new FileWriter("E:\read.txt"));
    68             for(int i=0;i<list.size();i++){
    69                 if(i<10) {
    70                     System.out.println("单词  " + list.get(i).getKey() + ",次数  " + list.get(i).getValue());
    71                 }
    72                 bw.write("单词  "+ list.get(i).getKey()+",次数  "+ list.get(i).getValue());
    73                 bw.newLine();
    74             }
    75             bw.flush();
    76             bw.close();
    77         } catch (Exception e) {
    78             e.printStackTrace();
    79         }
    80     }
    81 }



    UserThreadFactory
     1 import java.util.concurrent.ThreadFactory;
     2 import java.util.concurrent.atomic.AtomicInteger;
     3 
     4 /**
     5  * ClassName: UserThreadFactory
     6  * Description:自定义线程创建工厂
     7  * date: 2021/1/16 18:26
     8  *
     9  * @author hewei
    10  */
    11 public class UserThreadFactory implements ThreadFactory {
    12     /**
    13      * 自定义线程名称前缀
    14      **/
    15     private final String prefixName;
    16     /**
    17      * 线程计数器 从1开始
    18      */
    19     private final AtomicInteger threadNumber = new AtomicInteger(1);
    20 
    21     public UserThreadFactory(String prefixName) {
    22         this.prefixName = prefixName;
    23     }
    24 
    25     @Override
    26     public Thread newThread(Runnable runnable) {
    27         //创建线程
    28         String name = prefixName + threadNumber.getAndIncrement();
    29         return new WorkThread(runnable,name);
    30     }
    31     /**
    32      *自定义工作线程,定义线程名称有助于对jvm问题排查
    33      */
    34     class WorkThread extends Thread {
    35         /**
    36          * 线程名称
    37          */
    38         private String name;
    39 
    40         /**
    41          * @param target  执行的方法
    42          * @param name 线程的名称
    43          */
    44         public WorkThread(Runnable target, String name) {
    45             super(target);
    46             super.setName(name);
    47             this.name=name;
    48             System.out.println("创建:"+name);
    49         }
    50 
    51         @Override
    52         public void run() {
    53             try {
    54                 /**
    55                  * super.run()等同于target.run()
    56                  */
    57                 super.run();
    58             } finally {
    59                 System.out.println("结束线程:" + name);
    60             }
    61         }
    62     }
    63 }
    WordCountReadFileTask
     1 import java.io.BufferedReader;
     2 import java.io.FileReader;
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ArrayBlockingQueue;
     6 import java.util.concurrent.CountDownLatch;
     7 import java.util.regex.Matcher;
     8 import java.util.regex.Pattern;
     9 
    10 /**
    11  * ClassName: WordCountTask
    12  * Description:
    13  * date: 2021/1/17 19:48
    14  *
    15  * @author hewei
    16  */
    17 public class WordCountReadFileTask implements Runnable {
    18     private String filePathAndName;
    19     private ArrayBlockingQueue<String> oneWordQueue;
    20     private  String[] englishOrChinaWordCompile;
    21     private  CountDownLatch countDownLatch;
    22 
    23     public WordCountReadFileTask(CountDownLatch countDownLatch,String filePathAndName, ArrayBlockingQueue<String> oneWordQueue, String[] englishOrChinaWordCompile) {
    24         this.countDownLatch=countDownLatch;
    25         this.filePathAndName = filePathAndName;
    26         this.oneWordQueue = oneWordQueue;
    27         this.englishOrChinaWordCompile = englishOrChinaWordCompile;
    28     }
    29 
    30     @Override
    31     public void run() {
    32         try {
    33             BufferedReader br = new BufferedReader(new FileReader(filePathAndName));
    34             StringBuffer sb = new StringBuffer();
    35             List<String> strList=new ArrayList<String>();
    36             String line = "";
    37             while((line=br.readLine())!=null){
    38                 sb.append(line);
    39                 /**
    40                  * 为了保证不超过Integer.max_value
    41                  */
    42                 if(sb.length()>50000000) {
    43                     strList.add(sb.toString());
    44                     /**
    45                      * 清空StringBuffer
    46                      * 1.delete,从到到尾
    47                      * 2.new 新的对象。但会丢弃老对象加速gc到来
    48                      * 3.setlength=0,不符合这里的场景
    49                      */
    50                     sb.delete(0,sb.length());
    51                 }
    52             }
    53             if(sb!=null){
    54                 strList.add(sb.toString());
    55             }
    56             br.close();
    57             for(String words:strList) {
    58                 for (String oneCompile : englishOrChinaWordCompile) {
    59                     //正则
    60                     Pattern p = Pattern.compile(oneCompile);
    61                     Matcher matcher = p.matcher(words);
    62                     while (matcher.find()) {
    63                         /**
    64                          * 添加一个元素,如果队列满,则阻塞等待队列被消费腾出空间来
    65                          */
    66                         oneWordQueue.put(matcher.group());
    67                     }
    68                 }
    69             }
    70         } catch (Exception e) {
    71             e.printStackTrace();
    72         }finally {
    73             countDownLatch.countDown();
    74         }
    75     }
    76 }
    WordCountTask
     1 import java.io.BufferedReader;
     2 import java.io.FileReader;
     3 import java.util.ArrayList;
     4 import java.util.Arrays;
     5 import java.util.List;
     6 import java.util.concurrent.ArrayBlockingQueue;
     7 import java.util.concurrent.ConcurrentHashMap;
     8 import java.util.concurrent.CountDownLatch;
     9 import java.util.concurrent.TimeUnit;
    10 import java.util.concurrent.atomic.AtomicInteger;
    11 import java.util.concurrent.atomic.AtomicReferenceArray;
    12 import java.util.regex.Matcher;
    13 import java.util.regex.Pattern;
    14 
    15 /**
    16  * ClassName: WordCountTask
    17  * Description:
    18  * date: 2021/1/17 19:48
    19  *
    20  * @author hewei
    21  */
    22 public class WordCountTask implements Runnable {
    23     private ArrayBlockingQueue<String> oneWordQueue;
    24     private ConcurrentHashMap<String, AtomicInteger> wordCount;
    25     private long waitTimeSecond;
    26     private  CountDownLatch countDownLatch;
    27     private static AtomicReferenceArray wordCountArray;
    28 
    29     public WordCountTask(CountDownLatch countDownLatch,ConcurrentHashMap<String,AtomicInteger> wordCount,
    30                          ArrayBlockingQueue<String> oneWordQueue,long waitTimeSecond,AtomicReferenceArray wordCountArray) {
    31         this.wordCountArray=wordCountArray;
    32         this.countDownLatch=countDownLatch;
    33         this.oneWordQueue = oneWordQueue;
    34         this.wordCount=wordCount;
    35         this.waitTimeSecond=waitTimeSecond;
    36     }
    37 
    38     @Override
    39     public void run() {
    40         try {
    41             String oneWord;
    42             AtomicInteger nowCount;
    43             while ((oneWord=oneWordQueue.poll(waitTimeSecond, TimeUnit.SECONDS))!=null) {
    44                 /**
    45                  * 循环从队列里取出元素,然后加入到map中
    46                  * 在加入map过程中,代码块会有指令重排问题。所以每一步都需要加判断。
    47                  * 所以需要每一个操作都要具有原子性。
    48                  */
    49                 if((nowCount=wordCount.get(oneWord))==null){
    50                     nowCount=new AtomicInteger(1);
    51                     AtomicInteger ifExistCount=wordCount.putIfAbsent(oneWord,nowCount);
    52                     if(ifExistCount!=null) {
    53                         ifExistCount.getAndIncrement();
    54                     }
    55                 }else{
    56                     nowCount.getAndIncrement();
    57                 }
    58                 /**
    59                  * 实时排序,该排序依赖线程安全
    60                  * 略
    61                  */
    62             }
    63         } catch (Exception e) {
    64             e.printStackTrace();
    65         }finally {
    66             countDownLatch.countDown();
    67         }
    68     }
    69 }
  • 相关阅读:
    luogu P1833 樱花 看成混合背包
    luogu P1077 摆花 基础记数dp
    luogu P1095 守望者的逃离 经典dp
    Even Subset Sum Problem CodeForces
    Maximum White Subtree CodeForces
    Sleeping Schedule CodeForces
    Bombs CodeForces
    病毒侵袭持续中 HDU
    病毒侵袭 HDU
    Educational Codeforces Round 35 (Rated for Div. 2)
  • 原文地址:https://www.cnblogs.com/xzdwn/p/14293580.html
Copyright © 2011-2022 走看看