zoukankan      html  css  js  c++  java
  • 并发编程之同步

    线程之间的协作


    前面我们可以使用线程来同时运行多个任务,通过锁(互斥)来同步两个任务的行为,从而使得一个任务不会干涉另一个的任务的资源。现在研究如何使任务之间通过协作,以使得多个任务可以一起工作去解决某个问题。我们可以在互斥的基础上使用挂起操作,直到某些外部条件反生变化时,让这个任务继续往前执行。下面具体讨论该方法的实现。

    1.挂起操作

    1.1 wait()操作

    我们可以等待某个条件发生变化之间将线程挂起,当条件满足之后通过notify()或notifyAll()将挂起的线程唤醒,告知其可以继续运行了。注意调用wait()、notify()、notifyAll()时,需要首先获得某个对象的锁,否则会出现以下错误信息。

    当线程wait()时会释放掉对象锁(sychronized),这是与sleep()和yield()不同的地方。在进行条件判断时,应当首先获取对象锁,简单的说就是将条件判断放置着sychronized代码块内。

     1 package com.dy.xidian;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 import java.util.concurrent.TimeUnit;
     6 
     7 class Car {
     8     private boolean stop = false;
     9     public synchronized void stop() {
    10         stop = true;
    11         notifyAll();
    12     }
    13 
    14     public synchronized void drive() {
    15         stop = false;
    16         notifyAll();
    17     }
    18 
    19     public synchronized void waitForStopFinish() throws InterruptedException {
    20         while (stop == false)
    21             wait();
    22     }
    23 
    24     public synchronized void waitForDriveFinish() throws InterruptedException {
    25         while (stop == true)
    26             wait();
    27     }
    28 }
    29 
    30 class Stop implements Runnable {
    31     private Car car;
    32     public Stop(Car c) {
    33         car = c;
    34     }
    35 
    36     @Override
    37     public void run() {
    38         try {
    39             while (!Thread.interrupted()) {
    40                 System.out.println("car stop!");
    41                 TimeUnit.MILLISECONDS.sleep(500);
    42                 car.stop();
    43                 car.waitForDriveFinish();
    44             }
    45         } catch (InterruptedException e) {
    46             System.out.println("Exiting via interrupt");
    47         }
    48         System.out.println("Ending stop task");
    49 
    50     }
    51 }
    52 
    53 class Drive implements Runnable {
    54     private Car car;
    55     public Drive(Car c) {
    56         car = c;
    57     }
    58 
    59     @Override
    60     public void run() {
    61         try {
    62             while (!Thread.interrupted()) {
    63                 car.waitForStopFinish();
    64                 System.out.println("car drive!");
    65                 TimeUnit.MILLISECONDS.sleep(200);
    66                 car.drive();
    67             }
    68         } catch (InterruptedException e) {
    69             System.out.println("Exiting via interrupt");
    70         }
    71         System.out.println("End drive task");
    72     }
    73 }
    74 
    75 public class Test {
    76     public static void main(String[] args) throws InterruptedException {
    77         Car car = new Car();
    78         ExecutorService exec = Executors.newCachedThreadPool();
    79         exec.execute(new Stop(car));
    80         exec.execute(new Drive(car));
    81         TimeUnit.SECONDS.sleep(2);
    82         exec.shutdownNow();
    83     }
    84 }

    该程序模拟的汽车行驶的过程,汽车会走一会,停一会,直到一个中断产生。while()循环的使用,如果一个线程正在处于等待状态,当被唤醒时需要重新检测当前的条件是否满足继续等待,因为有可能这个唤醒操作并不是要唤醒这个线程,或者说唤醒之后条件又发生了改变,需要继续wait()。

    1.2 await()操作

    JAVA中也提供了Condition类,在Condition上调用await()来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者通过signalAll()来唤醒所有在Condition上挂起的任务。现在使用Condition来重写上面的程序。

     1 package com.dy.xidian;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 import java.util.concurrent.TimeUnit;
     6 import java.util.concurrent.locks.Condition;
     7 import java.util.concurrent.locks.Lock;
     8 import java.util.concurrent.locks.ReentrantLock;
     9 
    10 class Car {
    11     private Lock lock = new ReentrantLock();
    12     Condition condition = lock.newCondition();
    13     private boolean stop = false;
    14 
    15     public void stop() {
    16         lock.lock();
    17         stop = true;
    18         condition.signalAll();
    19         lock.unlock();
    20     }
    21 
    22     public void drive() {
    23         lock.lock();
    24         stop = false;
    25         condition.signalAll();
    26         lock.unlock();
    27     }
    28 
    29     public void waitForStopFinish() throws InterruptedException {
    30         lock.lock();
    31         while (stop == false)
    32             condition.await();
    33         lock.unlock();
    34     }
    35 
    36     public void waitForDriveFinish() throws InterruptedException {
    37         lock.lock();
    38         while (stop == true)
    39             condition.await();
    40         lock.unlock();
    41     }
    42 }
    43 
    44 class Stop implements Runnable {
    45     private Car car;
    46     public Stop(Car c) {
    47         car = c;
    48     }
    49 
    50     @Override
    51     public void run() {
    52         try {
    53             while (!Thread.interrupted()) {
    54                 System.out.println("car stop!");
    55                 TimeUnit.MILLISECONDS.sleep(500);
    56                 car.stop();
    57                 car.waitForDriveFinish();
    58             }
    59         } catch (InterruptedException e) {
    60             System.out.println("Exiting via interrupt");
    61         }
    62         System.out.println("Ending stop task");
    63 
    64     }
    65 }
    66 
    67 class Drive implements Runnable {
    68     private Car car;
    69     public Drive(Car c) {
    70         car = c;
    71     }
    72 
    73     @Override
    74     public void run() {
    75         try {
    76             while (!Thread.interrupted()) {
    77                 car.waitForStopFinish();
    78                 System.out.println("car drive!");
    79                 TimeUnit.MILLISECONDS.sleep(200);
    80                 car.drive();
    81             }
    82         } catch (InterruptedException e) {
    83             System.out.println("Exiting via interrupt");
    84         }
    85         System.out.println("End drive task");
    86     }
    87 }
    88 
    89 public class Test {
    90     public static void main(String[] args) throws InterruptedException {
    91         Car car = new Car();
    92         ExecutorService exec = Executors.newCachedThreadPool();
    93         exec.execute(new Stop(car));
    94         exec.execute(new Drive(car));
    95         TimeUnit.SECONDS.sleep(2);
    96         exec.shutdownNow();
    97     }
    98 }
    View Code

    2.notify()与notifyAll()

    notifyAll()唤醒等待在某个对象上的所有线程,而notify()只是随机唤醒其中一个。注意前面我们已经说过,wait()、notify()、notifyAll()必须sychronized块中。其实还有一点就是,notify()和notifyAll()必须在线程等待的对象上调用,而不是在Thread中直接调用。(如上面的代码中,Car类中提供了notifyAll()方法调用的接口,而不是直接在A线程中调用B线程的notify()方法来唤醒B线程)。一旦线程得到了通知,它首先会重新获得所等待对象的锁。如果获取成功,则会从紧跟着wait()调用之后的语句继续执行。下面的代码是为了说明调用某个对象的wait()方法时,会使得线程处于挂起状态,当调用该对象的notify()/notifyAll()方法时,线程才会被唤醒。wait()与notify()/notifyAll()是绑定在对象上,所以可以说任务在某个对象上处于wait()状态。

    package com.dy.xidian;
    
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class Blocker {
        synchronized void waitingCall() {
            try {
                while (!Thread.interrupted()) {
                    wait();
                    System.out.println(Thread.currentThread());
                }
            } catch (InterruptedException e) {
            }
    
        }
    
        synchronized void prod() {
            notify();
        }
        synchronized void prodAll() {
            notifyAll();
        }
    }
    
    class Task implements Runnable {
        static Blocker blocker = new Blocker();
        public void run() {
            blocker.waitingCall();
        }
    }
    
    class Task2 implements Runnable {
        static Blocker blocker = new Blocker();
        @Override
        public void run() {
            blocker.waitingCall();
        }
    
    }
    
    public class Test {
        public static void main(String[] args) throws Exception {
            ExecutorService exec = Executors.newCachedThreadPool();
            for (int i = 0; i < 3; i++)
                exec.execute(new Task());
            exec.execute(new Task2());
            Timer timer = new Timer();
            timer.scheduleAtFixedRate(new TimerTask() {
                boolean prod = true;
    
                @Override
                public void run() {
                    if (prod) {
                        System.out.println("
    notify()");
                        Task.blocker.prod();
                        prod = false;
                    } else {
                        System.out.println("
    notifyAll()");
                        Task.blocker.prodAll();
                        prod = true;
                    }
                }
            }, 400, 400);
            TimeUnit.SECONDS.sleep(5);
            timer.cancel();
            System.out.println("
    Timer canceled");
            TimeUnit.MILLISECONDS.sleep(400);
            System.out.println("Task2.blocker.prodAll()");
            Task2.blocker.prodAll();
            TimeUnit.MILLISECONDS.sleep(500);
            System.out.println("
    Shutting down");
            exec.shutdownNow();
        }
    }

     

    3.生产者与消费者

    生产者对缓冲区进行写操作,消费者对缓冲区进行读操作。缓冲区的大小固定的,生产者不能让缓冲区溢出,消费者不能从空缓冲区读值。下面的代码缓冲区用数组实现,注意,等待是反生在缓冲区对象上,而不是线程调用自己的wait()方法。

      1 package com.dy.xidian;
      2 
      3 import java.util.Random;
      4 import java.util.concurrent.ExecutorService;
      5 import java.util.concurrent.Executors;
      6 import java.util.concurrent.TimeUnit;
      7 
      8 class Producer implements Runnable {
      9     Repository repos;
     10     Random random = new Random();
     11 
     12     public Producer(Repository repos) {
     13         this.repos = repos;
     14     }
     15 
     16     @Override
     17     public void run() {
     18         try {
     19             while (true) {
     20                 int numWantToPut = random.nextInt(10);
     21                 repos.put();
     22                 int numCanPut = 10 - repos.index;
     23                 int temp = repos.index + Math.min(numWantToPut, numCanPut);
     24                 int i;
     25                 for (i = repos.index; i < temp; i++) {
     26                     TimeUnit.MILLISECONDS.sleep(100);
     27                     System.out.println("Producer i =" + i);
     28                     repos.repository[i] = new Product();
     29                 }
     30                 repos.index = i;
     31                 repos.putFinished();
     32             }
     33         } catch (InterruptedException e) {
     34         }
     35 
     36     }
     37 
     38 }
     39 
     40 class Customer implements Runnable {
     41     Repository repos;
     42     Random random = new Random();
     43     public Customer(Repository repos) {
     44         this.repos = repos;
     45     }
     46 
     47     @Override
     48     public void run() {
     49         try {
     50             while (true) {
     51                 int numWanToGet = random.nextInt(10);
     52                 repos.get();
     53                 int numCanGet = repos.index;
     54                 int temp = repos.index - 1 - Math.min(numWanToGet, numCanGet);
     55                 int i;
     56                 for (i = repos.index - 1; i > temp; i--) {
     57                     TimeUnit.MILLISECONDS.sleep(100);
     58                     System.out.println("Customer i =" + i);
     59                     repos.repository[i] = null;
     60                 }
     61                 repos.index = i + 1;
     62                 repos.getFinished();
     63             }
     64         } catch (InterruptedException e) {
     65         }
     66     }
     67 }
     68 
     69 class Product {
     70     int num;
     71     int weight;
     72 }
     73 
     74 class Repository {
     75     int index = 0;
     76     boolean state = true;
     77     Product[] repository = new Product[10];
     78 
     79     public synchronized void put() throws InterruptedException {
     80         while (state != true)
     81             wait();
     82     }
     83 
     84     public synchronized void get() throws InterruptedException {
     85         while (state != false)
     86             wait();
     87     }
     88 
     89     public synchronized void putFinished() {
     90         state = false;
     91         notifyAll();
     92     }
     93 
     94     public synchronized void getFinished() {
     95         state = true;
     96         notifyAll();
     97     }
     98 }
     99 
    100 public class Test {
    101     static volatile int i = 0;
    102     static Repository repos = new Repository();
    103     public static void main(String[] args) throws InterruptedException {
    104         ExecutorService exec = Executors.newCachedThreadPool();
    105         exec.execute(new Customer(repos));
    106         exec.execute(new Producer(repos));
    107         TimeUnit.SECONDS.sleep(10);
    108         exec.shutdownNow();
    109     }
    110 }
    View Code

    4.同步队列

    上面生产者与消费者之间的缓冲区使用数组实现的,可以发现对数组的操作比较麻烦,处理不好就会发生越界,并且编程人员要处理好挂起与唤醒操作之间的逻辑关系。java中提供了一个更有效的解决方案,来实现对缓冲区的访问,那就同步队列(BlockingQueue)。同步队列在任何时候只允许一个线程插入或者移除元素,它的实现类有ArrayBlockingQueue(固定大小),LinkedBlockingQueue(无大小限制)。下面使用三个线程对队列进程访问。一个线程制作面包,一个线程给面包摸黄油,最后一个线程再往上摸果酱。注意:对队列中比较适合放对象

      1 package com.dy.xidian;
      2 
      3 import java.util.Random;
      4 import java.util.concurrent.ExecutorService;
      5 import java.util.concurrent.Executors;
      6 import java.util.concurrent.LinkedBlockingQueue;
      7 import java.util.concurrent.TimeUnit;
      8 
      9 enum Status {
     10     DRY, BUTTERED, JAMMED
     11 }
     12 
     13 class Bread {
     14     private Status status = Status.DRY;
     15     private final int id;
     16     public Bread(int idn) {
     17         id = idn;
     18     }
     19     public void butter() {
     20         status = Status.BUTTERED;
     21     }
     22     public void jam() {
     23         status = Status.JAMMED;
     24     }
     25 
     26     public Status getStatus() {
     27         return status;
     28     }
     29     public int getId() {
     30         return id;
     31     }
     32     public String toString() {
     33         return "Bread" + id + ":" + status;
     34     }
     35 }
     36 
     37 @SuppressWarnings("serial")
     38 class BreadQueue extends LinkedBlockingQueue<Bread> {
     39 }
     40 class Toast implements Runnable {
     41     private BreadQueue breadQueue;
     42     private int count = 0;
     43     private Random rand = new Random(47);
     44 
     45     public Toast(BreadQueue breadQueue) {
     46         this.breadQueue = breadQueue;
     47     }
     48 
     49     @Override
     50     public void run() {
     51         try {
     52             while (!Thread.interrupted()) {
     53                 TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
     54                 Bread b = new Bread(count++);
     55                 System.out.println(b);
     56                 breadQueue.put(b);
     57             }
     58         } catch (InterruptedException e) {
     59             System.out.println("Toast interrupted");
     60         }
     61         System.out.println("Toast off");
     62     }
     63 }
     64 
     65 class Butter implements Runnable {
     66     private BreadQueue dryQueue, butteredQueue;
     67 
     68     public Butter(BreadQueue dryQueue, BreadQueue butteredQueue) {
     69         super();
     70         this.dryQueue = dryQueue;
     71         this.butteredQueue = butteredQueue;
     72     }
     73 
     74     @Override
     75     public void run() {
     76         try {
     77             while (!Thread.interrupted()) {
     78                 Bread b = dryQueue.take();
     79                 b.butter();
     80                 System.out.println(b);
     81                 butteredQueue.put(b);
     82             }
     83         } catch (InterruptedException e) {
     84             System.out.println("interrupted exception!");
     85         }
     86         System.out.println("Butter off");
     87     }
     88 }
     89 
     90 class Jammer implements Runnable {
     91     private BreadQueue butterQueue, finishQueue;
     92 
     93     public Jammer(BreadQueue butterQueue, BreadQueue finishQueue) {
     94         super();
     95         this.butterQueue = butterQueue;
     96         this.finishQueue = finishQueue;
     97     }
     98 
     99     @Override
    100     public void run() {
    101         try {
    102             while (!Thread.interrupted()) {
    103                 Bread b = butterQueue.take();
    104                 b.jam();
    105                 System.out.println(b);
    106                 finishQueue.put(b);
    107             }
    108         } catch (InterruptedException e) {
    109             System.out.println("Interrupted exception!");
    110         }
    111         System.out.println("Jammer off");
    112     }
    113 }
    114 
    115 class Eater implements Runnable {
    116     private BreadQueue finishQueue;
    117     private int counter = 0;
    118     public Eater(BreadQueue finishQueue) {
    119         super();
    120         this.finishQueue = finishQueue;
    121     }
    122 
    123     @Override
    124     public void run() {
    125         try {
    126             while (!Thread.interrupted()) {
    127                 Bread b = finishQueue.take();
    128                 if (b.getId() != counter++ || b.getStatus() != Status.JAMMED) {
    129                     System.out.println(">>>>Error:" + b);
    130                     System.exit(1);
    131                 } else {
    132                     System.out.println("Chomp " + b);
    133                 }
    134             }
    135         } catch (InterruptedException e) {
    136             System.out.println("Interrupted Exception!");
    137         }
    138         System.out.println("Eater off!");
    139     }
    140 }
    141 
    142 public class BreadMatic {
    143     static BreadQueue q1 = new BreadQueue();
    144     static BreadQueue q2 = new BreadQueue();
    145     static BreadQueue q3 = new BreadQueue();
    146     public static void main(String[] args) throws InterruptedException {
    147         ExecutorService exec = Executors.newCachedThreadPool();
    148         exec.execute(new Toast(q1));
    149         exec.execute(new Butter(q1, q2));
    150         exec.execute(new Jammer(q2, q3));
    151         exec.execute(new Eater(q3));
    152         TimeUnit.SECONDS.sleep(5);
    153         exec.shutdownNow();
    154     }
    155 }
    View Code

    5.管道

    线程间也可以使用管道进行数据传输,并且是以字符流的方式,这是与队列不同的地方。Java中提供了PipedWriter类可以作为管道的输入,PipedReader可以作为管道的输出。如果输出发现没有数据了,管道将自动阻塞。对与管道的创建尽可能早完成,比如说写到构造方法里面。值得注意的是,管道上的阻塞是可以被中断的,即PipedReader是可中断的,这是与普通IO不同的地方。

     1 package com.dy.xidian;
     2 
     3 import java.io.IOException;
     4 import java.io.PipedReader;
     5 import java.io.PipedWriter;
     6 import java.util.concurrent.ExecutorService;
     7 import java.util.concurrent.Executors;
     8 import java.util.concurrent.TimeUnit;
     9 
    10 class Sender implements Runnable {
    11     PipedWriter pw = new PipedWriter();
    12     @Override
    13     public void run() {
    14 
    15         try {
    16             while (true) {
    17                 for (char c = 'a'; c <= 'z'; c++) {
    18                     pw.write(c);
    19                     TimeUnit.MILLISECONDS.sleep(100);
    20                 }
    21             }
    22         } catch (IOException e) {
    23             System.out.println(e + "Sender write Exception!");
    24         } catch (InterruptedException e) {
    25             System.out.println("Interrupted Exception!");
    26         }
    27     }
    28 
    29     public PipedWriter getPw() {
    30         return pw;
    31     }
    32 
    33 }
    34 
    35 class Receiver implements Runnable {
    36     PipedReader pr;
    37     public Receiver(Sender sender) throws IOException {
    38         super();
    39         pr = new PipedReader(sender.getPw());
    40     }
    41 
    42     @Override
    43     public void run() {
    44         try {
    45             while (true) {
    46                 System.out.print("Read:" + (char) pr.read() + " ");
    47             }
    48         } catch (IOException e) {
    49             System.out.println(e + "Receiver read exception!");
    50         }
    51     }
    52 
    53 }
    54 
    55 public class PipeIO {
    56     public static void main(String[] args) throws IOException,
    57             InterruptedException {
    58         Sender sender = new Sender();
    59         Receiver receiver = new Receiver(sender);
    60         ExecutorService exec = Executors.newCachedThreadPool();
    61         exec.execute(sender);
    62         exec.execute(receiver);
    63         TimeUnit.SECONDS.sleep(4);
    64         exec.shutdownNow();
    65     }
    66 }
  • 相关阅读:
    setTimeout,setInterval你不知道的…
    浏览器console的用法
    命题和命题的否定可以同时成立吗?
    Web云笔记--CSS
    dreamweaver代码提示失效
    游戏平台代表--PS4【推荐】
    Mac上好用的视频播放器有哪些?
    新浪博客“网络繁忙请稍后再试”
    人工智能成功识别“色情暴力”信息??…
    JS查错小工具-三生有幸【推荐】
  • 原文地址:https://www.cnblogs.com/xidongyu/p/5300250.html
Copyright © 2011-2022 走看看