zoukankan      html  css  js  c++  java
  • 10个线程同步处理1000行消息

    多线程,并发,是经常遇到的问题,平时解决的方案也想过很多,比如说现在有1000行消息,需要开10个线程同时处理。

    之前想过两个方案:

    方案一: 一次开10个线程,每个线程处理一条消息,等10个线程全部处理结束之后,再开启下10个线程,直到全部处理完毕

    缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程

    方案二: 将1000行消息分割为10份,每100行用一个线程处理。

    优点:无等待

    缺陷: 分割不均,无法充分利用所有的线程

    现在想想,上面两个缺点挺多,就又想了两种方案:

    方案三:使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.

    优点:充分利用所有线程,无等待

    缺点:需要将所有的task转移到Queue中,消耗一倍内存

    方案四:使用java.util.concurrent包,固定数量线程池。

    优点:完美解决

    缺点:当单个task执行时间很短的时候,线程池中的线程并不会被全部使用,这样很多task就会block在一个线程中,降低执行速率

    下面贴出每个方案的代码实现,备忘吧,如果有更好的想法,或者更简单的方式,再继续补充~

    Java代码  收藏代码
    1. public class Task {  
    2.   
    3.     private int id;  
    4.       
    5.     public Task(int id) {  
    6.         this.id = id;  
    7.     }  
    8.       
    9.     public void start() {  
    10.         System.out.println(Thread.currentThread().getName() + ": start to handle task " + id);  
    11.   
    12.         try {  
    13.             Thread.sleep(5000);  
    14.         } catch (InterruptedException e) {  
    15.             e.printStackTrace();  
    16.         }  
    17.     }  
    18.       
    19. }  
    Java代码  收藏代码
    1. import java.util.LinkedList;  
    2. import java.util.List;  
    3.   
    4. public class TaskProducer {  
    5.   
    6.     public static List<Task> produce(int count) {  
    7.         List<Task> tasks = new LinkedList<Task>();  
    8.           
    9.         for(int i = 0; i < count; i ++) {  
    10.             tasks.add(new Task(i + 1));  
    11.         }  
    12.           
    13.         return tasks;  
    14.     }  
    15.       
    16. }  
    Java代码  收藏代码
    1. import java.util.LinkedList;  
    2. import java.util.List;  
    3.   
    4. /** 
    5.  * 策略1: 每次开启n个线程,等待n个线程全部结束之后,再开启下n个线程,每个线程处理一个task. 
    6.  * 缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程 
    7.  */  
    8. public class Strategy1 {  
    9.   
    10.     public static void main(String[] args) {  
    11.         List<Task> tasks = TaskProducer.produce(1000);  
    12.         handleTasks(tasks, 10);  
    13.         System.out.println("All finished");  
    14.     }  
    15.       
    16.     public static void handleTasks(List<Task> tasks, int threadCount) {  
    17.         int taskCount = tasks.size();  
    18.           
    19.         List<Thread> threadHolder = new LinkedList<Thread>();  
    20.         for(int i = 0; i < taskCount; i += threadCount) {  
    21.             for(int j = 0; j < threadCount && (i + j) < taskCount; j ++) {  
    22.                 Thread thread = new Thread(new TaskHandler(tasks.get(i + j)));  
    23.                 threadHolder.add(thread);  
    24.                 thread.start();  
    25.             }  
    26.               
    27.             waitToFinish(threadHolder);  
    28.             threadHolder.clear();  
    29.         }  
    30.     }  
    31.       
    32.     public static void waitToFinish(List<Thread> threadHolder) {  
    33.         while(true) {  
    34.             boolean allFinished = true;  
    35.             for(Thread thread : threadHolder) {  
    36.                 allFinished = allFinished && !thread.isAlive();  
    37.             }  
    38.               
    39.             if(allFinished) {  
    40.                 break;  
    41.             }  
    42.         }  
    43.     }  
    44.       
    45.     public static class TaskHandler implements Runnable {  
    46.         private Task task;  
    47.           
    48.         public TaskHandler(Task task) {  
    49.             this.task = task;  
    50.         }  
    51.   
    52.         @Override  
    53.         public void run() {  
    54.             task.start();  
    55.         }  
    56.     }  
    57.       
    58. }  
    Java代码  收藏代码
    1. import java.math.BigDecimal;  
    2. import java.math.RoundingMode;  
    3. import java.util.ArrayList;  
    4. import java.util.LinkedList;  
    5. import java.util.List;  
    6.   
    7. /** 
    8.  * 策略2: 将所有的task分割成n个子task列表,然后开启n个线程,每个线程处理一个子列表 
    9.  * 优点:无等待 
    10.  * 缺陷: 分割不均,无法充分利用所有的线程 
    11.  */  
    12. public class Strategy2 {  
    13.   
    14.     public static void main(String[] args) {  
    15.         List<Task> tasks = TaskProducer.produce(1000);  
    16.         handleTasks(tasks, 10);  
    17.         System.out.println("All finished");  
    18.     }  
    19.   
    20.     public static void handleTasks(List<Task> tasks, int threadCount) {  
    21.         List<List<Task>> splitTasks = splitTasksToNThreads(tasks, threadCount);  
    22.   
    23.         List<Thread> threadHolder = new LinkedList<Thread>();  
    24.         for (List<Task> segment : splitTasks) {  
    25.             Thread thread = new Thread(new TaskHandler(segment));  
    26.             threadHolder.add(thread);  
    27.             thread.start();  
    28.         }  
    29.           
    30.         waitToFinish(threadHolder);  
    31.     }  
    32.       
    33.     public static void waitToFinish(List<Thread> threadHolder) {  
    34.         while(true) {  
    35.             boolean allFinished = true;  
    36.             for(Thread thread : threadHolder) {  
    37.                 allFinished = allFinished && !thread.isAlive();  
    38.             }  
    39.               
    40.             if(allFinished) {  
    41.                 break;  
    42.             }  
    43.         }  
    44.     }  
    45.       
    46.     public static List<List<Task>> splitTasksToNThreads(List<Task> tasks, int threadCount) {  
    47.         List<List<Task>> splitTasks = new ArrayList<List<Task>>(threadCount);  
    48.   
    49.         int taskCount = tasks.size();  
    50.         int taskPerThread = new BigDecimal(taskCount).divide(new BigDecimal(threadCount), RoundingMode.CEILING).intValue();  
    51.   
    52.         for (int i = 0; i < taskCount; i += taskPerThread) {  
    53.             List<Task> segment = new LinkedList<Task>();  
    54.             for (int j = 0; j < taskPerThread && (i + j) < taskCount; j++) {  
    55.                 segment.add(tasks.get(i + j));  
    56.             }  
    57.   
    58.             splitTasks.add(segment);  
    59.         }  
    60.   
    61.         tasks.clear();  
    62.           
    63.         return splitTasks;  
    64.     }  
    65.       
    66.     public static class TaskHandler implements Runnable {  
    67.         private List<Task> tasks;  
    68.   
    69.         public TaskHandler(List<Task> tasks) {  
    70.             this.tasks = tasks;  
    71.         }  
    72.           
    73.         @Override  
    74.         public void run() {  
    75.             for (Task task : tasks) {  
    76.                 task.start();  
    77.             }  
    78.         }  
    79.     }  
    80.   
    81. }  
    Java代码  收藏代码
    1. import java.util.LinkedList;  
    2. import java.util.List;  
    3. import java.util.Queue;  
    4. import java.util.concurrent.ConcurrentLinkedQueue;  
    5.   
    6. /** 
    7.  * 策略3: 使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue. 
    8.  * 优点:充分利用所有线程,无等待 
    9.  * 缺点:需要将所有的task转移到Queue中,消耗一倍内存 
    10.  */  
    11. public class Strategy3 {  
    12.   
    13.     public static void main(String[] args) {  
    14.         List<Task> tasks = TaskProducer.produce(1000);  
    15.         handleTasks(tasks, 10);  
    16.         System.out.println("All finished");  
    17.     }  
    18.       
    19.     public static void handleTasks(List<Task> tasks, int threadCount) {  
    20.         Queue<Task> taskQueue = new ConcurrentLinkedQueue<Task>();  
    21.         taskQueue.addAll(tasks);  
    22.           
    23.         List<Thread> threadHolder = new LinkedList<Thread>();  
    24.         for(int i = 0; i < threadCount; i ++) {  
    25.             Thread thread = new Thread(new TaskHandler(taskQueue));  
    26.             threadHolder.add(thread);  
    27.             thread.start();  
    28.         }  
    29.           
    30.         waitToFinish(threadHolder);  
    31.     }  
    32.       
    33.     public static void waitToFinish(List<Thread> threadHolder) {  
    34.         while(true) {  
    35.             boolean allFinished = true;  
    36.             for(Thread thread : threadHolder) {  
    37.                 allFinished = allFinished && !thread.isAlive();  
    38.             }  
    39.               
    40.             if(allFinished) {  
    41.                 break;  
    42.             }  
    43.         }  
    44.     }  
    45.       
    46.     public static class TaskHandler implements Runnable {  
    47.           
    48.         private final Queue<Task> tasks;  
    49.           
    50.         public TaskHandler(Queue<Task> tasks) {  
    51.             this.tasks = tasks;  
    52.         }  
    53.           
    54.         public void run() {  
    55.             while(!tasks.isEmpty()) {  
    56.                 Task task = tasks.poll();  
    57.                 if(task != null) {  
    58.                     task.start();  
    59.                 }  
    60.             }  
    61.         }  
    62.           
    63.     }  
    64.       
    65. }  
    Java代码  收藏代码
    1. import java.util.List;  
    2. import java.util.concurrent.ExecutorService;  
    3. import java.util.concurrent.Executors;  
    4. import java.util.concurrent.TimeUnit;  
    5.   
    6. /** 
    7.  * 策略4: 使用java.util.concurrent包,线程池。 
    8.  * 优点:完美解决。 
    9.  */  
    10. public class Strategy4 {  
    11.   
    12.     public static void main(String[] args) {  
    13.         List<Task> tasks = TaskProducer.produce(1000);  
    14.         handleTasks(tasks, 10);  
    15.         System.out.println("All finished");  
    16.     }  
    17.       
    18.     public static void handleTasks(List<Task> tasks, int threadCount) {  
    19.         try {  
    20.             ExecutorService executor = Executors.newFixedThreadPool(threadCount);  
    21.               
    22.             for(Task task : tasks) {  
    23.                 executor.submit(new TaskHandler(task));  
    24.             }  
    25.               
    26.             executor.shutdown();  
    27.             executor.awaitTermination(60, TimeUnit.SECONDS);  
    28.         } catch (Exception e) {  
    29.             e.printStackTrace();  
    30.         }  
    31.     }  
    32.       
    33.     public static class TaskHandler implements Runnable {  
    34.   
    35.         private Task task;  
    36.           
    37.         public TaskHandler(Task task) {  
    38.             this.task = task;  
    39.         }  
    40.           
    41.         public void run() {  
    42.             task.start();  
    43.         }  
    44.           
    45.     }  
    46.       
    47. }  
  • 相关阅读:
    移动硬盘文件被恶意隐藏
    asp.net identity UserSecurityStamp 的作用
    Head First Python学习笔记1
    WPF 确认动态加载数据完成
    rust by example 2
    Rust by Example1
    奇葩!把类型转成object
    Lambda高手之路第一部分
    理解Lambda表达式
    贪心算法-找零钱(C#实现)
  • 原文地址:https://www.cnblogs.com/wjwen/p/5081742.html
Copyright © 2011-2022 走看看