zoukankan      html  css  js  c++  java
  • Java并发之:生产者消费者问题

    生产者消费者问题是Java并发中的常见问题之一,在实现时,一般可以考虑使用juc包下的BlockingQueue接口,至于具体使用哪个类,则就需要根据具体的使用场景具体分析了。本文主要实现一个生产者消费者的原型,以及实现一个生产者消费者的典型使用场景。

    第一个问题:实现一个生产者消费者的原型。

     1 import java.util.concurrent.*;
     2 
     3 class Consumer implements Runnable {
     4     BlockingQueue q = null;
     5 
     6     public Consumer(BlockingQueue q) {
     7         this.q = q;
     8     }
     9 
    10     @Override 
    11     public void run() {
    12         while(true) {
    13             try {
    14                 q.take();
    15                 System.out.println("Consumer has taken a product.");
    16             }catch(InterruptedException e) {
    17     
    18             }
    19         }
    20     }
    21 }
    22 
    23 class Producer implements Runnable {
    24     BlockingQueue q = null;
    25     
    26     public Producer(BlockingQueue q) {
    27         this.q = q;
    28     }
    29 
    30     @Override
    31     public void run() {
    32         while(true) {
    33             try { // note that if there is any chance that block, usually we need a InterruptedException
    34                 q.put(new Object());
    35                 System.out.println("Producer has puted a product.");
    36             }catch(InterruptedException e) {
    37 
    38             }
    39         }
    40     }
    41 
    42 
    43 }
    44 
    45 public class JC_ProducerConsumerPrototype {
    46     static int queueCapacity = 1024;
    47     //static BlockingQueue<Object> q = new ArrayBlockingQueue<Object>(queueCapacity); // Can also compile
    48     static BlockingQueue q = new ArrayBlockingQueue(queueCapacity); // ABQ must has a capacity
    49     public static void main(String[] args) {
    50         Thread t1 = new Thread(new Producer(q));
    51         Thread t2 = new Thread(new Consumer(q));
    52         t1.start();
    53         t2.start();
    54     }
    55 
    56 
    57 }

    第二个问题,现在假设生产者是在读取磁盘上的多个log文件,对于每一个文件,依次读取文件中的每一行,也就是一条log记录;消费者需要读取并分析这些记录,假设消费者是计算密集型的。如何在生产者消费者原型的基础上实现这些功能?

    这个场景在server端开发中是经常碰到的,因为在Server端,不可避免地会产生大量的日志文件。

     1 import java.util.concurrent.*;
     2 import java.io.*;
     3 import java.nio.*;
     4 import java.nio.file.*;
     5 import java.util.*;
     6 import java.nio.charset.*;
     7 
     8 
     9 class Producer implements Runnable {
    10     BlockingQueue q = null;
    11     String fileName = null;
    12     CountDownLatch latch = null;
    13     
    14     public Producer(BlockingQueue q,String fileName,CountDownLatch latch) {
    15         this.q = q;
    16         this.fileName = fileName;
    17         this.latch = latch;
    18     }
    19 
    20     @Override
    21     public void run() {
    22         Path path = Paths.get(".",fileName);
    23         try{
    24             List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);
    25             for(int i=lines.size();i>0;i--){
    26                 try{
    27                     q.put(lines.get(i));
    28                 }catch(InterruptedException e) {
    29 
    30                 }
    31             }
    32         }catch(IOException e){
    33 
    34         }
    35         latch.countDown();
    36     }
    37 }
    38 
    39 class Consumer implements Runnable {
    40     BlockingQueue<String> q = null;
    41     Boolean done = false;
    42     
    43     public Consumer(BlockingQueue q,Boolean done){
    44         this.q = q;
    45         this.done = done;
    46     }
    47 
    48     @Override
    49     public void run(){
    50         while(!done||q.size()!=0){
    51             try{
    52                 q.take();
    53             }catch(InterruptedException e){
    54 
    55             }
    56         }
    57     }
    58 }
    59 
    60 public class JC_ProducerConsumerHandlingLog{
    61     public static int fileCount = 1024;
    62     public static String[] fileNames = new String[fileCount];
    63     public static int cpuCount = 8;
    64     public static CountDownLatch latch = new CountDownLatch(fileCount);
    65     public static volatile boolean done = false;
    66     public static BlockingQueue<String> q = new LinkedBlockingQueue<String>(fileCount);//one thread for one file
    67 
    68     public static void main(String[] args){
    69         for(int i=0;i<fileCount;i++){
    70             Thread t = new Thread(new Producer(q,fileNames[i],latch));
    71             t.start();
    72         }
    73         for(int i=0;i<cpuCount;i++){//for computing tasks, we don't need too many threads.
    74             Thread t = new Thread(new Consumer(q,done));
    75             t.start();
    76         }
    77         try{
    78             latch.await();
    79             done = true;
    80         }catch(InterruptedException e){
    81 
    82         }
    83 
    84     }
    85 }

     需要稍微注意一下线程数的选择,对于计算密集型的任务,我认为线程数达到cpu的核数比较合理(在不考虑超线程的情况下,也就是说一个核只有一个线程)。有不同意见欢迎跟我交流!

  • 相关阅读:
    Compiling LIBFFM On OSX 10.9
    Linux shell 脚本入门教程+实例
    Understanding the Bias-Variance Tradeoff
    Learning How To Code Neural Networks
    MXNet设计和实现简介
    数据需求统计常用awk命令
    Deal with relational data using libFM with blocks
    MATLAB 在同一个m文件中写多个独立的功能函数
    Debug 路漫漫-06
    MATLAB 求两个矩阵的 欧氏距离
  • 原文地址:https://www.cnblogs.com/hzg1981/p/5552198.html
Copyright © 2011-2022 走看看