zoukankan      html  css  js  c++  java
  • java多线程之生存者与消费者(Java编程思想)

    1.通过wait() 与 Notify实现

    package Produce;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class Meal {
    
        private final int orderNum;
    
        public Meal(int orderNum) {
            this.orderNum = orderNum;
        }
    
        @Override
        public String toString() {
            return "Meal " + orderNum;
        }
    }
    
    class WaitPerson implements Runnable {
        private Restaurant restaurant;
    
        public WaitPerson(Restaurant r) {
            restaurant = r;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    synchronized (this) {
                        while (restaurant.meal == null)
                            wait(); // 等待chef生产meal
                    }
                    System.out.println("Waitperson got " + restaurant.meal);
                    synchronized (restaurant.chef) {
                        restaurant.meal = null;
                        restaurant.chef.notifyAll();// 通知chef继续生产
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("WaitPerson interrupted");
            }
        }
    }
    
    class Chef implements Runnable {
    
        private Restaurant restaurant;
        private int count = 0;
    
        public Chef(Restaurant r) {
            restaurant = r;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    synchronized (this) {
                        while (restaurant.meal != null)
                            wait();// 等待meal被拿走
                    }
                    if (++count == 10) {
                        System.out.println("Out of food,closing");
                        restaurant.exec.shutdownNow();//向每个线程发送Interrupt
                         return;//如果没有直接return 将多执行了下面的 Order up ,所以一般情况下都要直接return
                    }
    
                    System.out.println("Order up! ");
                    synchronized (restaurant.waitPerson) {
                        // 对notifyAll()的调用必须先获得waitPerson的锁
                        restaurant.meal = new Meal(count);
                        restaurant.waitPerson.notifyAll();
                    }
                    TimeUnit.MILLISECONDS.sleep(500);//休眠一下是为了给shutdownNow留出时间
                }
            } catch (InterruptedException e) {
                System.out.println("Chef interrupted");
            }
        }
    }
    
    public class Restaurant {
        Meal meal;
        ExecutorService exec = Executors.newCachedThreadPool();
        WaitPerson waitPerson = new WaitPerson(this);
        Chef chef = new Chef(this);
    
        public Restaurant() {
            exec.execute(chef);
            exec.execute(waitPerson);
        }
    
        public static void main(String[] args) {
            new Restaurant();
        }
    }

    2.使用java.util.concurrent.locks.Condition进行同步操作

    Lock和Condition对象只有在更加困难的多线程问题中才是必需的。

    package Produce;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    class Car {
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
        private boolean waxOn = false;
    
        public void waxed() {
            lock.lock();
            try {
                waxOn = true;// Ready to buff
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public void buffed() {
            lock.lock();
            try {
                waxOn = false;// Ready for another coat of wax
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public void waitForWaxing() throws InterruptedException {
            lock.lock();
            try {
                while (waxOn == false)
                    condition.await();
            } finally {
                lock.unlock();
            }
        }
    
        public void waitForBuffing() throws InterruptedException {
            lock.lock();
            try {
                while (waxOn == true)
                    condition.await();
            } finally {
                lock.unlock();
            }
        }
    }
    
    class WaxOn implements Runnable {
        private Car car;
    
        public WaxOn(Car c) {
            car = c;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) 
    //            while (true) //与使用这行效果一样
                {
                    System.out.println("Wax On!");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.waxed();
                    car.waitForBuffing();
                }
            } catch (InterruptedException e) {
                System.out.println("Exiting via interrupt");
            }
            System.out.println("Ending Wax On task");
        }
    }
    
    class WaxOff implements Runnable {
        private Car car;
    
        public WaxOff(Car c) {
            car = c;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) 
    //                while (true) 
                        
                {
                    car.waitForWaxing();
                    System.out.println("Wax Off!");
                    TimeUnit.MILLISECONDS.sleep(200);
                    car.buffed();
                }
            } catch (InterruptedException e) {
                System.out.println("Exiting via interrupt");
            }
            System.out.println("Ending Wax Off task");
        }
    }
    
    public class WaxOMatic2 {
        public static void main(String[] args) throws InterruptedException {
            Car car = new Car();
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new WaxOff(car));
            exec.execute(new WaxOn(car));
            TimeUnit.SECONDS.sleep(5);
            exec.shutdownNow();
        }
    }

     3.通过同步队列实现

    package Produce;
    
    public class LiftOff implements Runnable {
        protected int countDown = 10;
        private static int taskCount = 0;
        private final int id = taskCount++;
    
        public LiftOff() {
        }
    
        public LiftOff(int countDown) {
            this.countDown = countDown;
        }
    
        public String status() {
            return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!")
                    + ")";
        }
    
        @Override
        public void run() {
            while (countDown-- > 0) {
                System.out.println(status());
                Thread.yield();//线程的礼让
            }
        }
    
    }

    package Produce;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.SynchronousQueue;
    
    
    
    class LiftOffRunner implements Runnable {
        private BlockingQueue<LiftOff> rockets;
    
        public LiftOffRunner(BlockingQueue<LiftOff> queue) {
            rockets = queue;
        }
    
        public void add(LiftOff lo) {
            try {
                rockets.put(lo);
            } catch (InterruptedException e) {
                System.out.println("Interrupted during put()");
            }
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    LiftOff rocket = rockets.take();
                    rocket.run(); 
                }
            } catch (InterruptedException e) {
                System.out.println("Waking from take()");
            }
            System.out.println("Exiting LiftOffRunner");
        }
    
    }
    
    public class TestBlockingQueues {
        static void getkey() {
            try {
                // Compensate for Windows/Linux difference in the
                // length of the result produced by the Enter key:
                new BufferedReader(new InputStreamReader(System.in)).readLine();
            } catch (java.io.IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        static void getkey(String message) {
            System.out.println(message);
            getkey();
        }
    
        static void test(String msg, BlockingQueue<LiftOff> queue) {
            System.out.println(msg);
            LiftOffRunner runner = new LiftOffRunner(queue);
            Thread t = new Thread(runner);
            t.start();
            for (int i = 0; i < 5; i++) {
                runner.add(new LiftOff(5));
            }
            getkey("Press'Enter'(" + msg + ")");
            t.interrupt();
            System.out.println("Finished " + msg + " test");
        }
    
        public static void main(String[] args) {
            test("LinkedBlockingQueue", new LinkedBlockingDeque<LiftOff>());// 不限制长度
            test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// 限制长度,当队列满时存数据将阻塞,当队列空时取数据也阻塞
            test("SynchronousQueue", new SynchronousQueue<LiftOff>());
        }
    
    }

    4.典型Toast的队列实例

    package Produce;
    
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.TimeUnit;
    
    class Toast {
        public enum Status {
            DRY, BUTTERED, JAMMED
        }
    
        private Status status = Status.DRY;
        private final int id;
    
        public Toast(int idn) {
            id = idn;
        }
    
        public void butter() {
            status = Status.BUTTERED;
        }
    
        public void jam() {
            status = Status.JAMMED;
        }
    
        public Status getStatus() {
            return status;
        }
    
        public int getId() {
            return id;
        }
    
        public String toString() {
            return "Toast " + id + ": " + status;
        }
    }
    
    class ToastQueue extends LinkedBlockingDeque<Toast> {
    }
    
    class Toaster implements Runnable {
        private ToastQueue toastQueue;
        private int count = 0;
        private Random rand = new Random(47);
    
        public Toaster(ToastQueue tq) {
            toastQueue = tq;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
                    Toast t = new Toast(count++);
                    System.out.println(t);
                    toastQueue.put(t);
                }
            } catch (InterruptedException e) {
                System.out.println("Toaster interrupted");
            }
            System.out.println("Toaster off");
        }
    }
    
    class Butterer implements Runnable {
        private ToastQueue dryQueue, butteredQueue;
    
        public Butterer(ToastQueue dry, ToastQueue buttered) {
            dryQueue = dry;
            butteredQueue = buttered;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    // Blocks until next piece of toast is available
                    Toast t = dryQueue.take();
                    t.butter();
                    System.out.println(t);
                    butteredQueue.put(t);
                }
            } catch (InterruptedException e) {
                System.out.println("Butterer interrupted");
            }
            System.out.println("Butterer off");
        }
    }
    
    class Jammer implements Runnable {
        private ToastQueue butteredQueue, finishedQueue;
    
        public Jammer(ToastQueue buttered, ToastQueue finished) {
            butteredQueue = buttered;
            finishedQueue = finished;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Toast t = butteredQueue.take();
                    t.jam();
                    System.out.println(t);
                    finishedQueue.put(t);
                }
            } catch (InterruptedException e) {
                System.out.println("Jammer interrupted");
            }
            System.out.println("Jammer off");
        }
    }
    
    class Eater implements Runnable {
        private ToastQueue finishedqQueue;
        private int counter = 0;
    
        public Eater(ToastQueue finished) {
            finishedqQueue = finished;
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Toast t = finishedqQueue.take();
                    if (t.getId() != counter++
                            || t.getStatus() != Toast.Status.JAMMED) {
                        System.out.println(">>>> Error: " + t);
                        System.exit(1);
                    } else {
                        System.out.println("Chomp! " + t);
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Eater interrupted");
            }
            System.out.println("Eater off");
        }
    }
    
    public class ToastOMatic {
        public static void main(String[] args) throws Exception {
            ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue();
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new Toaster(dryQueue));
            exec.execute(new Butterer(dryQueue, butteredQueue));
            exec.execute(new Jammer(butteredQueue, finishedQueue));
            exec.execute(new Eater(finishedQueue));
            TimeUnit.SECONDS.sleep(5);
            exec.shutdownNow();
        }
    
    }

     5.输入输出管道,功能类似生产者消费者

    package Produce;
    
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    class Sender implements Runnable {
        private Random rand = new Random(47);
        private PipedWriter out = new PipedWriter();
    
        public PipedWriter getpPipedWriter() {
            return out;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    for (char c = 'A'; c <= 'z'; c++) {
                        out.write(c);
                        TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                    }
                }
            } catch (IOException e) {
                System.out.println(e + " Sender write exception");
            } catch (InterruptedException e) {
                System.out.println(e + " Sender sleep interrupted");
            }
        }
    }
    
    class Receiver implements Runnable {
        private PipedReader in;
    
        public Receiver(Sender sender) throws IOException {
            in = new PipedReader(sender.getpPipedWriter());
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println("Read:" + (char) in.read() + ". ");
                }
            } catch (IOException e) {
                System.out.println(e + " Receiver read exception");
            }
        }
    }
    
    public class PipedIO {
        public static void main(String[] args) throws Exception {
            Sender sender = new Sender();
            Receiver receiver = new Receiver(sender);
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(sender);
            exec.execute(receiver);
            TimeUnit.SECONDS.sleep(4);
            exec.shutdownNow();
        }
    }
  • 相关阅读:
    opentsdb安装部署
    python发送邮件(html)例子
    python查库写库例子
    获取rds的cpu和内存使用情况
    数据库损坏的情况下如何获取到dbid
    grafana安装升级部署
    Specified key was too long
    mysql动态执行sql批量删除数据
    kafka删除topics
    cratedb导入json文件
  • 原文地址:https://www.cnblogs.com/zhuawang/p/3776987.html
Copyright © 2011-2022 走看看