zoukankan      html  css  js  c++  java
  • Java线程间怎么实现同步

    1、Object#wait(), Object#notify()让两个线程依次执行

    /**
     * 类AlternatePrintDemo.java的实现描述:交替打印
     */
    class NumberPrint implements Runnable {
        private int       number;
        public byte       res[];
        public static int count = 5;
    
        public NumberPrint(int number, byte a[]) {
            this.number = number;
            res = a;
        }
    
        public void run() {
            synchronized (res) {
                while (count-- > 0) {
                    try {
                        res.notify();//唤醒等待res资源的线程,把锁交给线程(该同步锁执行完毕自动释放锁)
                        System.out.println(" " + number);
                        res.wait();//释放CPU控制权,释放res的锁,本线程阻塞,等待被唤醒。
                        System.out.println("------线程" + Thread.currentThread().getName() + "获得锁,wait()后的代码继续运行:" + number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    public class AlternatePrintDemo {
        public static void main(String args[]) {
            final byte a[] = { 0 };//以该对象为共享资源
            new Thread(new NumberPrint(1, a), "1").start();
            new Thread(new NumberPrint(2, a), "2").start();
        }
    }

    2、Condition#signal(), Condition#wait()让两个线程依次执行

    /**
     * 
     * 类ConditionDemo.java的实现描述:Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,
     * 为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
     */
    public class ConditionDemo {
        public static void main(String[] args) {
            final Business business = new Business();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    threadExecute(business, "sub");
                }
            }).start();
            threadExecute(business, "main");
        }
    
        public static void threadExecute(Business business, String threadType) {
            for (int i = 0; i < 10; i++) {
                try {
                    if ("main".equals(threadType)) {
                        business.main(i);
                    } else {
                        business.sub(i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    class Business {
        private boolean   bool      = true;
        private Lock      lock      = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        public /* synchronized */ void main(int loop) throws InterruptedException {
            lock.lock();
            try {
                while (bool) {
                    condition.await();//this.wait();
                }
                System.out.println("main thread seq  loop of " + loop);
                
                bool = true;
                condition.signal();//this.notify();
            } finally {
                lock.unlock();
            }
        }
    
        public /* synchronized */ void sub(int loop) throws InterruptedException {
            lock.lock();
            try {
                while (!bool) {
                    condition.await();//this.wait();
                }
    
                System.out.println("sub thread seq loop of " + loop);
       
                bool = false;
                condition.signal();//this.notify();
            } finally {
                lock.unlock();
            }
        }
    }
    

     

    Lock.Condition同理

    import java.util.concurrent.locks.*;
    
    class BoundedBuffer {
        final Lock      lock     = new ReentrantLock();                          //锁对象
        final Condition notFull  = lock.newCondition();                          //写线程条件 
        final Condition notEmpty = lock.newCondition();                          //读线程条件 
    
        final Object[]  items    = new Object[100];                              //缓存队列
        int             putptr/* 写索引 */, takeptr/* 读索引 */, count/* 队列中存在的数据个数 */;
    
        public void put(Object x) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length)//如果队列满了 
                    notFull.await();//阻塞写线程
                items[putptr] = x;//赋值 
                if (++putptr == items.length)
                    putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
                ++count;//个数++
                notEmpty.signal();//唤醒读线程
            } finally {
                lock.unlock();
            }
        }
    
        public Object take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0)//如果队列为空
                    notEmpty.await();//阻塞读线程
                Object x = items[takeptr];//取值 
                if (++takeptr == items.length)
                    takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
                --count;//个数--
                notFull.signal();//唤醒写线程
                return x;
            } finally {
                lock.unlock();
            }
        }
    }

    3、两个线程使用Object#wait(), Object#notify()实现生产消费者模式。

    /**
     * 
     * 类ProducerConsumerDemo.java的实现描述:生产消费者模式
     */
    public class ProducerConsumerDemo {
    
        public static void main(String args[]) {
    
            final Queue<Integer> sharedQ = new LinkedList<>();
    
            Thread producer = new Producer(sharedQ);
            Thread consumer = new Consumer(sharedQ);
    
            producer.start();
            consumer.start();
    
        }
    }
    
    class Producer extends Thread {
        private static final int MAX_COUNT = 10;
        private Queue<Integer>   sharedQ;
    
        public Producer(Queue<Integer> sharedQ) {
            super("Producer");
            this.sharedQ = sharedQ;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < MAX_COUNT; i++) {
                synchronized (sharedQ) {
                    //waiting condition - wait until Queue is not empty
                    while (sharedQ.size() >= 1) {
                        try {
                            System.out.println("Queue is full, waiting");
                            sharedQ.wait();
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                    }
                    System.out.println("producing : " + i);
                    sharedQ.add(i);
                    sharedQ.notify();
                }
            }
        }
    }
    
    class Consumer extends Thread {
        private Queue<Integer> sharedQ;
    
        public Consumer(Queue<Integer> sharedQ) {
            super("Consumer");
            this.sharedQ = sharedQ;
        }
    
        @Override
        public void run() {
            while (true) {
                synchronized (sharedQ) {
                    //waiting condition - wait until Queue is not empty
                    while (sharedQ.size() == 0) {
                        try {
                            System.out.println("Queue is empty, waiting");
                            sharedQ.wait();
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                    }
                    int number = (int) sharedQ.poll();
                    System.out.println("consuming : " + number);
                    sharedQ.notify();
    
                    //termination condition
                    if (number == 3) {
                        break;
                    }
                }
            }
        }
    }
    

    4、CountDownLatch实现类似计数器的功能。

    /**
     * 
     * 类CountDownLatchDemo.java的实现描述:CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能.
     * 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
     */
    public class CountDownLatchDemo {
        public static void main(String[] args) {
            final CountDownLatch latch = new CountDownLatch(2);
    
            new Thread() {
                public void run() {
                    try {
                        System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
                        Thread.sleep(3000);
                        System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
    
            new Thread() {
                public void run() {
                    try {
                        System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
                        Thread.sleep(3000);
                        System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
    
            try {
                System.out.println("等待2个子线程执行完毕...");
                latch.await();
                System.out.println("2个子线程已经执行完毕");
                System.out.println("继续执行主线程");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    5、 CyclicBarrier(回环栅栏)可以实现让一组线程等待至某个状态之后再全部同时执行。

    /**
     * 类CyclicBarrierDemo.java的实现描述:字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
     * 叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,
     * 线程就处于barrier了。
     */
    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            int N = 4;
            //所有线程写入操作完之后,进行额外的其他操作可以为CyclicBarrier提供Runnable参数
            CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程" + Thread.currentThread().getName());
                }
            });
            for (int i = 0; i < N; i++) {
                if (i < N - 1) {
                    new Writer(barrier).start();
                } else {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    new Writer(barrier).start();
                }
            }
    
            System.out.println("CyclicBarrier重用");
    
            for (int i = 0; i < N; i++) {
                new Writer(barrier).start();
            }
        }
    
        static class Writer extends Thread {
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
                try {
                    Thread.sleep(5000); //以睡眠来模拟写入数据操作
                    System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
                    try {
                        cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
        }
    }
    

    6、Semaphore用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。

    /**
     * 类SemaphoreDemo.java的实现描述:Semaphore用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。
     * 主要通过控制一组虚拟的“许可”,当需要执行操作时首先申请获取许可,如果还有剩余的许可 并且获取成功,就执行操作;如果剩余许可为0,就阻塞当前线程;
     * 操作执行完成后释放许可,排队的阻塞线程可以被唤醒重新获取许可继续执行。这里提到排队,其实就是利用AQS的队列进行排队。
     */
    public class SemaphoreDemo {
        public static void main(String[] args) {
            // 线程池
            ExecutorService exec = Executors.newCachedThreadPool();
    
            // 只能5个线程同时访问
            final Semaphore semp = new Semaphore(5);
    
            // 模拟20个客户端访问
            for (int index = 0; index < 20; index++) {
                final int NO = index;
                Runnable run = new Runnable() {
                    public void run() {
                        try {
                            // 获取许可
                            semp.acquire();
                            System.out.println("Accessing: " + NO);
                            Thread.sleep((long) (Math.random() * 10000));
                            // 访问完后,释放
                            semp.release();
                        } catch (InterruptedException e) {
                        }
                    }
                };
                exec.execute(run);
            }
    
            // 退出线程池
            exec.shutdown();
        }
    }
  • 相关阅读:
    三数之和
    罗马数字与整数
    Oracle 开启或关闭归档
    Oracle RMAN scripts to delete archivelog
    Oracle check TBS usage
    Oracle kill locked sessions
    场景9 深入RAC运行原理
    场景7 Data Guard
    场景4 Data Warehouse Management 数据仓库
    场景5 Performance Management
  • 原文地址:https://www.cnblogs.com/kaleidoscope/p/9767318.html
Copyright © 2011-2022 走看看