zoukankan      html  css  js  c++  java
  • java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 解说演示样例 --thinking java4

    package org.rui.thread.block2;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    import org.rui.thread.LiftOff;
    
    /**
     * 生产者-消费者与队列
     * 
     * @author lenovo
     * 
     */
    
    class LiftOffRunner implements Runnable {
    
    	private BlockingQueue<LiftOff> rockets;
    
    	public LiftOffRunner(BlockingQueue<LiftOff> b) {
    		rockets = b;
    	}
    
    	//加入一个任务到队列
    	public void add(LiftOff lo) {
    		//将指定元素插入此队列中(假设马上可行且不会违反容量限制),
    		try {
    			rockets.put(lo);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    	@Override
    	public void run() {
    
    		try {
    			while (!Thread.interrupted()) {
    				// 获取并移除此队列的头部,在元素变得可用之前一直等待(假设有必要)。
    				LiftOff rocket = rockets.take();
    				rocket.run();
    			}
    
    		} catch (InterruptedException e) {
    			System.out.println("中断退出");
    		}
    		System.out.println("x exiting liftOffRunner");
    
    	}
    }
    
    public class TestBlockingQueues {
    	
    	static void getkey() {
    		try {
    			// compensate for windows/linux difference in the
    			// 回车键产生的结果
    			new BufferedReader(new InputStreamReader(System.in)).readLine();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	static void getkey(String message) {
    		System.out.println(message);
    		getkey();
    	}
    
    	static void tets(String msg, BlockingQueue<LiftOff> queue) {
    		System.out.println(msg);
    		LiftOffRunner runner = new LiftOffRunner(queue);
    		
    		//启动一个线程
    		Thread t = new Thread(runner);
    		t.start();
    		
    		for (int i = 0; i < 5; i++) {
    			//加入任务到LiftOffRunner队列中
    			runner.add(new LiftOff(5));
    		}
    		
    		//输入控制台
    		getkey("press 'enter' (" + msg + ")");
    		t.interrupt();
    		System.out.println(" 完了 " + msg + "test");
    
    	}
    
    	public static void main(String[] args) {
    		tets("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());// unlimited																		// size
    		tets("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// fied																		// size
    		tets("SynchronousQueue", new SynchronousQueue<LiftOff>());// size of 1
    
    	}
    
    }
    


    package org.rui.thread.block2;
    
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 吐司BlockingQueue
     * @author lenovo
     *
     */
    
    class Toast {
    	public enum Status {
    		DRY/* 干的 */, BUTTERED/* 涂黄油 */, JAMMED// 果酱
    	}
    
    	private Status status = Status.DRY;
    	private final int id;
    
    	public Toast(int idn) {
    		id = idn;
    	}
    
    	public void butter() {
    		status = Status.BUTTERED;
    	}
    
    	public void jam() {
    		status = Status.JAMMED;
    	}
    
    	public Status getStatus() {
    		return status;
    	}
    
    	public int getId() {
    		return id;
    	}
    
    	public String toString() {
    		return "Toast " + id + ":" + status;
    	}
    }
    
    /**
     * 吐司队列
     * 
     * @author lenovo
     * 
     */
    class ToastQueue extends LinkedBlockingQueue<Toast> {
    }
    
    class Toaster implements Runnable {
    	private ToastQueue toastQueue;
    	private int count = 0;
    	private Random rand = new Random(47);
    
    	public Toaster(ToastQueue tq) {
    		toastQueue = tq;
    	}
    
    	@Override
    	public void run() {
    		try {
    			while (!Thread.interrupted()) {
    				TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
    				// 制作 toast
    				Toast t = new Toast(count++);
    				System.out.println(t);
    				// insert into queue
    				toastQueue.put(t);
    
    			}
    		} catch (InterruptedException e) {
    			System.out.println("Toaster interrupted");
    		}
    		System.out.println("toaster off");
    	}
    }
    
    // apply butter to toast
    class Butterer implements Runnable {
    	private ToastQueue dryQueue, butteredQueue;
    
    	public Butterer(ToastQueue dry, ToastQueue buttered) {
    		dryQueue = dry;
    		butteredQueue = buttered;
    	}
    
    	@Override
    	public void run() {
    		try {
    
    			while (!Thread.interrupted()) {
    				// blocks until next piece of toast is available 块,直到下一块面包
    				Toast t = dryQueue.take();
    				t.butter();
    				System.out.println(t);
    				butteredQueue.put(t);
    			}
    		} catch (InterruptedException e) {
    			System.out.println("涂黄油 interrupted");
    		}
    		System.out.println("涂黄油 off");
    	}
    
    }
    
    // apply jam to buttered toast
    class Jammer implements Runnable {
    	private ToastQueue butteredQueue, finishedQueue;
    
    	public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
    		this.butteredQueue = butteredQueue;
    		this.finishedQueue = finishedQueue;
    	}
    
    	@Override
    	public void run() {
    		try {
    
    			while (!Thread.interrupted()) {
    				// blocks until next piece of toast is available 块,直到下一块面包
    				Toast t = butteredQueue.take();
    				t.jam();
    				System.out.println(t);
    				finishedQueue.put(t);
    
    			}
    		} catch (InterruptedException e) {
    			System.out.println("涂果酱 interrupted");
    		}
    		System.out.println("涂果酱 off");
    	}
    
    }
    
    // ////使用烤面包 consume the toast
    class Eater implements Runnable {
    	private ToastQueue finishedQueue;
    	private int counter = 0;
    
    	public Eater(ToastQueue finished) {
    		finishedQueue = finished;
    	}
    
    	@Override
    	public void run() {
    		try {
    
    			while (!Thread.interrupted()) {
    				Toast t = finishedQueue.take();
    				// verify that the toast is coming in order 确认面包来了
    				// and that all pieces are getting jammed ,全部碎片越来越挤
    				if (t.getId() != counter++
    						|| t.getStatus() != Toast.Status.JAMMED) {
    					System.out.println("===>>>>error" + t);
    					System.exit(1);
    
    				} else {
    					System.out.println("吃!" + t);
    				}
    
    			}
    		} catch (InterruptedException e) {
    			System.out.println("食者 interrupted");
    		}
    		System.out.println(" 食者 off");
    	}
    }
    
    /**
     * main
     * 
     * @author lenovo
     * 
     */
    public class ToastOMatic {
    
    	public static void main(String[] args) throws InterruptedException {
    		ToastQueue dryQueue = new ToastQueue();
    		ToastQueue butteredQueue = new ToastQueue();
    		ToastQueue finishedQueue = new ToastQueue();
    		
    		ExecutorService exec = Executors.newCachedThreadPool();
    		exec.execute(new Toaster(dryQueue));//烤面包
    		exec.execute(new Butterer(dryQueue, butteredQueue));//涂黄油
    		exec.execute(new Jammer(butteredQueue, finishedQueue));//上果酱
    		exec.execute(new Eater(finishedQueue));//吃
    		TimeUnit.SECONDS.sleep(5);
    		exec.shutdownNow();
    
    	}
    }
    /**output:
     Toast 0:DRY
    Toast 0:BUTTERED
    Toast 0:JAMMED
    吃!Toast 0:JAMMED
    Toast 1:DRY
    Toast 1:BUTTERED
    Toast 1:JAMMED
    吃!Toast 1:JAMMED
    Toast 2:DRY
    Toast 2:BUTTERED
    Toast 2:JAMMED
    吃!Toast 2:JAMMED
    ...
    ...
    Toast 10:DRY
    Toast 10:BUTTERED
    Toast 10:JAMMED
    吃!Toast 10:JAMMED
    Toast 11:DRY
    Toast 11:BUTTERED
    Toast 11:JAMMED
    吃!Toast 11:JAMMED
    Toast 12:DRY
    Toast 12:BUTTERED
    Toast 12:JAMMED
    吃!Toast 12:JAMMED
    Toast 13:DRY
    Toast 13:BUTTERED
    Toast 13:JAMMED
    吃!Toast 13:JAMMED
    Toast 14:DRY
    Toast 14:BUTTERED
    Toast 14:JAMMED
    吃!Toast 14:JAMMED
    食者 interrupted
    Toaster interrupted
     食者 off
    涂果酱 interrupted
    涂果酱 off
    涂黄油 interrupted
    涂黄油 off
    toaster off
    
     */
    

    package org.rui.thread.block2;
    
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 任务间使用管道进行输入、输出
     * 
     * @author lenovo
     * 
     */
    class Sender implements Runnable {
    	private Random rand = new Random(47);
    	private PipedWriter out = new PipedWriter();
    
    	public PipedWriter getPipedWriter() {
    		return out;
    	}
    
    	@Override
    	public void run() {
    		try {
    			while (true) {
    				for (char c = 'A'; c <= 'z'; c++) {
    					out.write(c);
    					TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
    
    				}
    			}
    		} catch (IOException e) {
    			System.out.println(e + " sender write Exception");
    		} catch (InterruptedException e) {
    			System.out.println(e + " sender sleep interrupted");
    		}
    
    	}
    
    }
    
    class Receiver implements Runnable {
    
    	private PipedReader in;
    
    	public Receiver(Sender sender) throws IOException {
    		in = new PipedReader(sender.getPipedWriter());
    	}
    
    	@Override
    	public void run() {
    		try {
    			while (true) {
    				// blocks until characters are there
    				System.out.println("Read:" + (char) in.read() + ",");
    
    			}
    		} catch (IOException e) {
    			System.out.println(e+"receiver read execption");
    		}
    
    	}
    
    }
    
    public class PipedIO {
    	// 接收器 Receiver
    	public static void main(String[] args) throws IOException, InterruptedException {
    		Sender sender = new Sender();
    		Receiver receiver = new Receiver(sender);
    		
    		ExecutorService exec=Executors.newCachedThreadPool();
    		exec.execute(sender);
    		exec.execute(receiver);
    		
    		TimeUnit.SECONDS.sleep(4);
    		exec.shutdownNow();
    		
    	}
    }
    
    /**outpt:
    Read:A,
    Read:B,
    Read:C,
    Read:D,
    Read:E,
    Read:F,
    Read:G,
    Read:H,
    Read:I,
    Read:J,
    Read:K,
    Read:L,
    Read:M,
    Read:N,
    Read:O,
    Read:P,
    java.lang.InterruptedException: sleep interrupted sender sleep interrupted
    Read:Q,
    java.io.IOException: Write end deadreceiver read execption
    
     */
    
    
    
    
    
    
    
    
    
    
    
    
    



  • 相关阅读:
    在线考试————随机出题
    HTTP协议
    团队
    做作业
    图书馆管理说明书性能
    关于敏捷开发的学习
    运行环境
    图书馆管理系统说明书
    性能(2)
    作业
  • 原文地址:https://www.cnblogs.com/lxjshuju/p/7066181.html
Copyright © 2011-2022 走看看