zoukankan      html  css  js  c++  java
  • Java线程同步的Monitor机制(Lock配合Condition)

    Monitor模式是一种常见的并行开发机制, 一个Monitor实例可以被多个线程安全使用, 所有的monitor下面的方法在运行时是互斥的, 这种互斥机制机制可以用于一些特性, 例如让线程等待某种条件, 在等待时线程会将CPU时间交出去, 但是在条件满足时确保重新获得CPU时间. 在条件达成时, 你可以同时通知一个或多个线程. 这样做有以下的优点:

    1. 所有的同步代码都集中在一起, 用户不需要知道这是如何实现的
    2. 代码不依赖于线程数量, 线程数量只取决于业务需要
    3. 不需要对某个互斥对象做释放, 不存在忘记的风险

    一个Monitor的结构是这样的

    public class SimpleMonitor {
        public method void testA(){
            //Some code
        }
    
        public method int testB(){
            return 1;
        }
    }
    

    使用Java代码不能直接创建一个Monitor, 要实现Monitor, 需要使用Lock和Condition类. 一般使用的Lock是ReentrantLock, 例如

    public class SimpleMonitor {
        private final Lock lock = new ReentrantLock();
    
        public void testA() {
            lock.lock();
    
            try {
                //Some code
            } finally {
                lock.unlock();
            }
        }
    
        public int testB() {
            lock.lock();
    
            try {
                return 1;
            } finally {
                lock.unlock();
            }
        }
    }
    

    如果不需要判断条件, 那么用synchronized就可以了. 在需要判断条件的情况下, 使用Lock的newCondition()方法创建Condition, 可以通过Condition的await方法, 让当前线程wait, 放弃cpu时间. 然后用signal或者signalAll方法让线程重新获得CPU时间. signalAll方法会唤起所有wait在当前condition的线程. 下面是一个例子, 一个需要被多个线程使用的容量固定的buffer.

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedBuffer {
        private final String[] buffer;
        private final int capacity;
    
        private int front;
        private int rear;
        private int count;
    
        private final Lock lock = new ReentrantLock();
    
        private final Condition notFull = lock.newCondition();
        private final Condition notEmpty = lock.newCondition();
    
        public BoundedBuffer(int capacity) {
            super();
    
            this.capacity = capacity;
    
            buffer = new String[capacity];
        }
    
        public void deposit(String data) throws InterruptedException {
            lock.lock();
    
            try {
                while (count == capacity) {
                    notFull.await();
                }
    
                buffer[rear] = data;
                rear = (rear + 1) % capacity;
                count++;
    
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public String fetch() throws InterruptedException {
            lock.lock();
    
            try {
                while (count == 0) {
                    notEmpty.await();
                }
    
                String result = buffer[front];
                front = (front + 1) % capacity;
                count--;
    
                notFull.signal();
    
                return result;
            } finally {
                lock.unlock();
            }
        }
    }
    

    代码说明

    1. 这两个方法通过lock互斥
    2. 然后通过两个condition变量, 一个用于在buffer非空时等待, 一个用于buffer未满时等待
    3. 上面使用while循环将await包围, 这是为了防止在使用Signal&Condition时产生signal stealers问题.
    4. 以上方法可以安全地在多个线程中被调用

    还有一个例子, 用于协调多个线程按固定顺序进行输出

    public class TestSequentialThreads {
        private final Lock lock = new ReentrantLock();
        private final Condition[] conditions = {lock.newCondition(), lock.newCondition(), lock.newCondition()};
        private int count = 0;
    
        public void action(int i) {
            while (true) {
                print(i + " wait lock");
                lock.lock();
                print(i + " has lock");
                try {
                    while (count != i) {
                        print(i + " await");
                        conditions[i].await();
                    }
                    print("===== " + i + " =====");
                    Thread.sleep(500);
                    count = (count + 1) % 3;
                    int j = (i + 1) % 3;
                    print(i + " signal " + j);
                    conditions[j].signal();
                } catch (InterruptedException e) {
                    print(i + " InterruptedException");
                } finally {
                    print(i + " unlock");
                    lock.unlock();
                }
            }
        }
    
        public static void main(String[] args) {
            TestSequentialThreads ts = new TestSequentialThreads();
            new Thread(()->ts.action(0)).start();
            new Thread(()->ts.action(2)).start();
            new Thread(()->ts.action(1)).start();
            new Thread(()->ts.action(1)).start();
            new Thread(()->ts.action(0)).start();
            new Thread(()->ts.action(2)).start();
        }
    
        public static void print(String str) {
            System.out.println(str);
        }
    }
    

      

    如果是使用wait()和notify()的话, 就要写成这样, 这种情况下, 运行时notify()随机通知的线程, 是有可能不满足而跳过的.

    public class DemoThreadWait2 {
        private Object obj = 0;
        private int pos = 1;
    
        public void one(int i) {
            synchronized (obj) {
                if (pos == i) {
                    System.out.println("T-" + i);
                    pos = i % 3 + 1;
                } else {
                    // System.out.println(".");
                }
                obj.notify();
                try {
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            DemoThreadWait2 demo = new DemoThreadWait2();
            new Thread(()->{
                while(true) {
                    demo.one(1);
                }
            }).start();
    
            new Thread(()->{
                while(true) {
                    demo.one(2);
                }
            }).start();
    
            new Thread(()->{
                while(true) {
                    demo.one(3);
                }
            }).start();
        }
    }
    

      

  • 相关阅读:
    软件对标分析
    alpha内测版发布
    第一阶段项目评审
    第一阶段意见汇总
    冲刺(二十)
    冲刺(十九)
    冲刺(十八)
    冲刺(十七)
    冲刺(十六)
    冲刺(十五)
  • 原文地址:https://www.cnblogs.com/milton/p/10883429.html
Copyright © 2011-2022 走看看