zoukankan      html  css  js  c++  java
  • 使用Java7提供的Fork/Join框架

    http://blog.csdn.net/a352193394/article/details/39872923

    使用Java7提供的Fork/Join框架

     分类:
     
     

    目录(?)[+]

     

    在Java7中,JDK提供对多线程开发提供了一个非常强大的框架,就是Fork/Join框架。这个是对原来的Executors更

    进一步,在原来的基础上增加了并行分治计算中的一种Work-stealing策略,就是指的是。当一个线程正在等待他创建的

    子线程运行的时候,当前线程如果完成了自己的任务后,就会寻找还没有被运行的任务并且运行他们,这样就是和

    Executors这个方式最大的区别,更加有效的使用了线程的资源和功能。所以非常推荐使用Fork/Join框架。

    下面我们以一个例子来说明这个框架如何使用,主要就是创建一个含有10000个资源的List,分别去修改他的内容。

    [java] view plain copy
     
     print?
    1. package com.bird.concursey.charpet8;  
    2.   
    3. /** 
    4.  * store the name and price of a product 
    5.  * @author bird 2014年10月7日 下午11:23:14 
    6.  */  
    7. public class Product {  
    8.   
    9.     private String name;  
    10.     private double price;  
    11.   
    12.     public String getName() {  
    13.         return name;  
    14.     }  
    15.   
    16.     public void setName(String name) {  
    17.         this.name = name;  
    18.     }  
    19.   
    20.     public double getPrice() {  
    21.         return price;  
    22.     }  
    23.   
    24.     public void setPrice(double price) {  
    25.         this.price = price;  
    26.     }  
    27.   
    28. }  


    [java] view plain copy
     
     print?
    1. package com.bird.concursey.charpet8;  
    2.   
    3. import java.util.ArrayList;  
    4. import java.util.List;  
    5.   
    6. /** 
    7.  * generate a list of random products 
    8.  * @author bird 
    9.  * 2014年10月7日 下午11:24:47 
    10.  */  
    11. public class ProductListGenerator {  
    12.       
    13.     public List<Product> generate(int size) {  
    14.         List<Product> list = new ArrayList<Product>();  
    15.         for(int i = 0 ; i < size; i++) {  
    16.             Product product = new Product();  
    17.             product.setName("Product" + i);  
    18.             product.setPrice(10);  
    19.             list.add(product);  
    20.         }  
    21.         return list;  
    22.     }  
    23. }  


    [java] view plain copy
     
     print?
    1. package com.bird.concursey.charpet8;  
    2.   
    3. import java.util.List;  
    4. import java.util.concurrent.ForkJoinPool;  
    5. import java.util.concurrent.RecursiveAction;  
    6. import java.util.concurrent.TimeUnit;  
    7.   
    8. public class Task extends RecursiveAction {  
    9.   
    10.     private static final long serialVersionUID = 1L;  
    11.     // These attributes will determine the block of products this task has to  
    12.     // process.  
    13.     private List<Product> products;  
    14.     private int first;  
    15.     private int last;  
    16.     // store the increment of the price of the products  
    17.     private double increment;  
    18.   
    19.     public Task(List<Product> products, int first, int last, double increment) {  
    20.         super();  
    21.         this.products = products;  
    22.         this.first = first;  
    23.         this.last = last;  
    24.         this.increment = increment;  
    25.     }  
    26.   
    27.     /** 
    28.      * If the difference between the last and first attributes is greater than 
    29.      * or equal to 10, create two new Task objects, one to process the first 
    30.      * half of products and the other to process the second half and execute 
    31.      * them in ForkJoinPool using the invokeAll() method. 
    32.      */  
    33.     @Override  
    34.     protected void compute() {  
    35.         if (last - first < 10) {  
    36.             updatePrices();  
    37.         } else {  
    38.             int middle = (first + last) / 2;  
    39.             System.out.printf("Task: Pending tasks:%s ", getQueuedTaskCount());  
    40.             Task t1 = new Task(products, first, middle + 1, increment);  
    41.             Task t2 = new Task(products, middle + 1, last, increment);  
    42.             invokeAll(t1, t2);  
    43.         }  
    44.     }  
    45.   
    46.     private void updatePrices() {  
    47.         for (int i = first; i < last; i++) {  
    48.             Product product = products.get(i);  
    49.             product.setPrice(product.getPrice() * (1 + increment));  
    50.         }  
    51.     }  
    52.   
    53.     public static void main(String[] args) {  
    54.         ProductListGenerator productListGenerator = new ProductListGenerator();  
    55.         List<Product> products = productListGenerator.generate(10000);  
    56.         Task task = new Task(products, 0, products.size(), 0.2);  
    57.   
    58.         ForkJoinPool pool = new ForkJoinPool();  
    59.         pool.execute(task);  
    60.   
    61.         do {  
    62.             System.out.printf("Main: Thread Count: %d ",  
    63.                     pool.getActiveThreadCount());  
    64.             System.out.printf("Main: Thread Steal: %d ", pool.getStealCount());  
    65.             System.out.printf("Main: Parallelism: %d ", pool.getParallelism());  
    66.             try {  
    67.                 TimeUnit.MILLISECONDS.sleep(5);  
    68.             } catch (InterruptedException e) {  
    69.                 e.printStackTrace();  
    70.             }  
    71.         } while (!task.isDone());  
    72.           
    73.         pool.shutdown();  
    74.           
    75.         if(task.isCompletedNormally()) {  
    76.             System.out.printf("Main: The process has completed normally. ");  
    77.         }  
    78.           
    79.         for(Product product : products) {  
    80.             if(product.getPrice() != 12) {  
    81.                 System.out.printf("Product %s: %f ",product.getName(),product.getPrice());  
    82.             }  
    83.         }  
    84.           
    85.         System.out.println("Main: End of the program. ");  
    86.     }  
    87.   
    88. }  


    In this example, you have created a ForkJoinPool object and a subclass of the
    ForkJoinTask class that you execute in the pool. To create the ForkJoinPool object,
    you have used the constructor without arguments, so it will be executed with its default
    configuration. It creates a pool with a number of threads equal to the number of processors
    of the computer. When the ForkJoinPool object is created, those threads are created and
    they wait in the pool until some tasks arrive for their execution.


    Since the Task class doesn't return a result, it extends the RecursiveAction class. In the
    recipe, you have used the recommended structure for the implementation of the task. If the
    task has to update more than 10 products, it divides those set of elements into two blocks,
    creates two tasks, and assigns a block to each task. You have used the first and last
    attributes in the Task class to know the range of positions that this task has to update in the
    list of products. You have used the first and last attributes to use only one copy of the
    products list and not create different lists for each task.


    To execute the subtasks that a task creates, it calls the invokeAll() method. This is a
    synchronous call, and the task waits for the finalization of the subtasks before continuing
    (potentially finishing) its execution. While the task is waiting for its subtasks, the worker thread
    that was executing it takes another task that was waiting for execution and executes it. With
    this behavior, the Fork/Join framework offers a more efficient task management than the
    Runnable and Callable objects themselves.

     
    The invokeAll() method of the ForkJoinTask class is one of the main differences
    between the Executor and the Fork/Join framework. In the Executor framework, all the tasks
    have to be sent to the executor, while in this case, the tasks include methods to execute and
    control the tasks inside the pool. You have used the invokeAll() method in the Task class,
    that extends the RecursiveAction class that extends the ForkJoinTask class.

    You have sent a unique task to the pool to update all the list of products using the execute()
    method. In this case, it's an asynchronous call, and the main thread continues its execution.
    You have used some methods of the ForkJoinPool class to check the status and the
    evolution of the tasks that are running. The class includes more methods that can be useful
    for this purpose. See the Monitoring a Fork/Join pool recipe for a complete list of
    those methods.

    Finally, like with the Executor framework, you should finish ForkJoinPool using the
    shutdown() method.
     
     
     
  • 相关阅读:
    在上传文件时限制上传文件的大小,并捕捉超过文件大小限制的
    javascript 获取标签具体位置
    终端服务器超出了最大允许连接数
    常用SQL语句书写技巧
    Javascript实现截图功能(代码)
    JavaScript实现类,有多种方法。
    DBCC CHECKDB 数据库或表修复
    Lucene的例子
    控制同一exe程序打开多次
    IIS6 MMC检测到此管理单元发生一个错误,建议您关闭并重新启动mmc
  • 原文地址:https://www.cnblogs.com/donaldlee2008/p/5292184.html
Copyright © 2011-2022 走看看