zoukankan      html  css  js  c++  java
  • java多线程之生存者与消费者(Java编程思想)

    1.通过wait() 与 Notify实现

    package Produce;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class Meal {
    
        private final int orderNum;
    
        public Meal(int orderNum) {
            this.orderNum = orderNum;
        }
    
        @Override
        public String toString() {
            return "Meal " + orderNum;
        }
    }
    
    class WaitPerson implements Runnable {
        private Restaurant restaurant;
    
        public WaitPerson(Restaurant r) {
            restaurant = r;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    synchronized (this) {
                        while (restaurant.meal == null)
                            wait(); // 等待chef生产meal
                    }
                    System.out.println("Waitperson got " + restaurant.meal);
                    synchronized (restaurant.chef) {
                        restaurant.meal = null;
                        restaurant.chef.notifyAll();// 通知chef继续生产
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("WaitPerson interrupted");
            }
        }
    }
    
    class Chef implements Runnable {
    
        private Restaurant restaurant;
        private int count = 0;
    
        public Chef(Restaurant r) {
            restaurant = r;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    synchronized (this) {
                        while (restaurant.meal != null)
                            wait();// 等待meal被拿走
                    }
                    if (++count == 10) {
                        System.out.println("Out of food,closing");
                        restaurant.exec.shutdownNow();//向每个线程发送Interrupt
                         return;//如果没有直接return 将多执行了下面的 Order up ,所以一般情况下都要直接return
                    }
    
                    System.out.println("Order up! ");
                    synchronized (restaurant.waitPerson) {
                        // 对notifyAll()的调用必须先获得waitPerson的锁
                        restaurant.meal = new Meal(count);
                        restaurant.waitPerson.notifyAll();
                    }
                    TimeUnit.MILLISECONDS.sleep(500);//休眠一下是为了给shutdownNow留出时间
                }
            } catch (InterruptedException e) {
                System.out.println("Chef interrupted");
            }
        }
    }
    
    public class Restaurant {
        Meal meal;
        ExecutorService exec = Executors.newCachedThreadPool();
        WaitPerson waitPerson = new WaitPerson(this);
        Chef chef = new Chef(this);
    
        public Restaurant() {
            exec.execute(chef);
            exec.execute(waitPerson);
        }
    
        public static void main(String[] args) {
            new Restaurant();
        }
    }

    2.使用java.util.concurrent.locks.Condition进行同步操作

    Lock和Condition对象只有在更加困难的多线程问题中才是必需的。

    package Produce;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    class Car {
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
        private boolean waxOn = false;
    
        public void waxed() {
            lock.lock();
            try {
                waxOn = true;// Ready to buff
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public void buffed() {
            lock.lock();
            try {
                waxOn = false;// Ready for another coat of wax
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public void waitForWaxing() throws InterruptedException {
            lock.lock();
            try {
                while (waxOn == false)
                    condition.await();
            } finally {
                lock.unlock();
            }
        }
    
        public void waitForBuffing() throws InterruptedException {
            lock.lock();
            try {
                while (waxOn == true)
                    condition.await();
            } finally {
                lock.unlock();
            }
        }
    }
    
    class WaxOn implements Runnable {
        private Car car;
    
        public WaxOn(Car c) {
            car = c;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) 
    //            while (true) //与使用这行效果一样
                {
                    System.out.println("Wax On!");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.waxed();
                    car.waitForBuffing();
                }
            } catch (InterruptedException e) {
                System.out.println("Exiting via interrupt");
            }
            System.out.println("Ending Wax On task");
        }
    }
    
    class WaxOff implements Runnable {
        private Car car;
    
        public WaxOff(Car c) {
            car = c;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) 
    //                while (true) 
                        
                {
                    car.waitForWaxing();
                    System.out.println("Wax Off!");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.buffed();
                }
            } catch (InterruptedException e) {
                System.out.println("Exiting via interrupt");
            }
            System.out.println("Ending Wax Off task");
        }
    }
    
    public class WaxOMatic2 {
        public static void main(String[] args) throws InterruptedException {
            Car car = new Car();
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new WaxOff(car));
            exec.execute(new WaxOn(car));
            TimeUnit.SECONDS.sleep(5);
            exec.shutdownNow();
        }
    }

     3.通过同步队列实现

    package Produce;
    
    public class LiftOff implements Runnable {
        protected int countDown = 10;
        private static int taskCount = 0;
        private final int id = taskCount++;
    
        public LiftOff() {
        }
    
        public LiftOff(int countDown) {
            this.countDown = countDown;
        }
    
        public String status() {
            return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!")
                    + ")";
        }
    
        @Override
        public void run() {
            while (countDown-- > 0) {
                System.out.println(status());
                Thread.yield();//线程的礼让
            }
        }
    
    }

    package Produce;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.SynchronousQueue;
    
    
    
    class LiftOffRunner implements Runnable {
        private BlockingQueue<LiftOff> rockets;
    
        public LiftOffRunner(BlockingQueue<LiftOff> queue) {
            rockets = queue;
        }
    
        public void add(LiftOff lo) {
            try {
                rockets.put(lo);
            } catch (InterruptedException e) {
                System.out.println("Interrupted during put()");
            }
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    LiftOff rocket = rockets.take();
                    rocket.run(); 
                }
            } catch (InterruptedException e) {
                System.out.println("Waking from take()");
            }
            System.out.println("Exiting LiftOffRunner");
        }
    
    }
    
    public class TestBlockingQueues {
        static void getkey() {
            try {
                // Compensate for Windows/Linux difference in the
                // length of the result produced by the Enter key:
                new BufferedReader(new InputStreamReader(System.in)).readLine();
            } catch (java.io.IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        static void getkey(String message) {
            System.out.println(message);
            getkey();
        }
    
        static void test(String msg, BlockingQueue<LiftOff> queue) {
            System.out.println(msg);
            LiftOffRunner runner = new LiftOffRunner(queue);
            Thread t = new Thread(runner);
            t.start();
            for (int i = 0; i < 5; i++) {
                runner.add(new LiftOff(5));
            }
            getkey("Press'Enter'(" + msg + ")");
            t.interrupt();
            System.out.println("Finished " + msg + " test");
        }
    
        public static void main(String[] args) {
            test("LinkedBlockingQueue", new LinkedBlockingDeque<LiftOff>());// 不限制长度
            test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// 限制长度,当队列满时存数据将阻塞,当队列空时取数据也阻塞
            test("SynchronousQueue", new SynchronousQueue<LiftOff>());
        }
    
    }

    4.典型Toast的队列实例

    package Produce;
    
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.TimeUnit;
    
    class Toast {
        public enum Status {
            DRY, BUTTERED, JAMMED
        }
    
        private Status status = Status.DRY;
        private final int id;
    
        public Toast(int idn) {
            id = idn;
        }
    
        public void butter() {
            status = Status.BUTTERED;
        }
    
        public void jam() {
            status = Status.JAMMED;
        }
    
        public Status getStatus() {
            return status;
        }
    
        public int getId() {
            return id;
        }
    
        public String toString() {
            return "Toast " + id + ": " + status;
        }
    }
    
    class ToastQueue extends LinkedBlockingDeque<Toast> {
    }
    
    class Toaster implements Runnable {
        private ToastQueue toastQueue;
        private int count = 0;
        private Random rand = new Random(47);
    
        public Toaster(ToastQueue tq) {
            toastQueue = tq;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
                    Toast t = new Toast(count++);
                    System.out.println(t);
                    toastQueue.put(t);
                }
            } catch (InterruptedException e) {
                System.out.println("Toaster interrupted");
            }
            System.out.println("Toaster off");
        }
    }
    
    class Butterer implements Runnable {
        private ToastQueue dryQueue, butteredQueue;
    
        public Butterer(ToastQueue dry, ToastQueue buttered) {
            dryQueue = dry;
            butteredQueue = buttered;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    // Blocks until next piece of toast is available
                    Toast t = dryQueue.take();
                    t.butter();
                    System.out.println(t);
                    butteredQueue.put(t);
                }
            } catch (InterruptedException e) {
                System.out.println("Butterer interrupted");
            }
            System.out.println("Butterer off");
        }
    }
    
    class Jammer implements Runnable {
        private ToastQueue butteredQueue, finishedQueue;
    
        public Jammer(ToastQueue buttered, ToastQueue finished) {
            butteredQueue = buttered;
            finishedQueue = finished;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Toast t = butteredQueue.take();
                    t.jam();
                    System.out.println(t);
                    finishedQueue.put(t);
                }
            } catch (InterruptedException e) {
                System.out.println("Jammer interrupted");
            }
            System.out.println("Jammer off");
        }
    }
    
    class Eater implements Runnable {
        private ToastQueue finishedqQueue;
        private int counter = 0;
    
        public Eater(ToastQueue finished) {
            finishedqQueue = finished;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Toast t = finishedqQueue.take();
                    if (t.getId() != counter++
                            || t.getStatus() != Toast.Status.JAMMED) {
                        System.out.println(">>>> Error: " + t);
                        System.exit(1);
                    } else {
                        System.out.println("Chomp! " + t);
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Eater interrupted");
            }
            System.out.println("Eater off");
        }
    }
    
    public class ToastOMatic {
        public static void main(String[] args) throws Exception {
            ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue();
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new Toaster(dryQueue));
            exec.execute(new Butterer(dryQueue, butteredQueue));
            exec.execute(new Jammer(butteredQueue, finishedQueue));
            exec.execute(new Eater(finishedQueue));
            TimeUnit.SECONDS.sleep(5);
            exec.shutdownNow();
        }
    
    }

     5.输入输出管道,功能类似生产者消费者

    package Produce;
    
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class Sender implements Runnable {
        private Random rand = new Random(47);
        private PipedWriter out = new PipedWriter();
    
        public PipedWriter getpPipedWriter() {
            return out;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    for (char c = 'A'; c <= 'z'; c++) {
                        out.write(c);
                        TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                    }
                }
            } catch (IOException e) {
                System.out.println(e + " Sender write exception");
            } catch (InterruptedException e) {
                System.out.println(e + " Sender sleep interrupted");
            }
        }
    }
    
    class Receiver implements Runnable {
        private PipedReader in;
    
        public Receiver(Sender sender) throws IOException {
            in = new PipedReader(sender.getpPipedWriter());
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println("Read:" + (char) in.read() + ". ");
                }
            } catch (IOException e) {
                System.out.println(e + " Receiver read exception");
            }
        }
    }
    
    public class PipedIO {
        public static void main(String[] args) throws Exception {
            Sender sender = new Sender();
            Receiver receiver = new Receiver(sender);
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(sender);
            exec.execute(receiver);
            TimeUnit.SECONDS.sleep(4);
            exec.shutdownNow();
        }
    }
  • 相关阅读:
    LeetCode 242. Valid Anagram (验证变位词)
    LeetCode 205. Isomorphic Strings (同构字符串)
    LeetCode 204. Count Primes (质数的个数)
    LeetCode 202. Happy Number (快乐数字)
    LeetCode 170. Two Sum III
    LeetCode 136. Single Number (落单的数)
    LeetCode 697. Degree of an Array (数组的度)
    LeetCode 695. Max Area of Island (岛的最大区域)
    Spark中的键值对操作
    各种排序算法总结
  • 原文地址:https://www.cnblogs.com/zhuawang/p/3776987.html
Copyright © 2011-2022 走看看