工人线程Worker thread会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来
5个工人线程从传送带取数据,3个传送工人线程将数据放入传送带
public class Channel { private final static int MAX_REQUEST = 100; private final Request[] requestQueue; private int head; private int tail; private int count; private final WorkerThread[] workerPool; public Channel(int workers) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; this.workerPool = new WorkerThread[workers]; this.init(); } private void init() { for (int i = 0; i < workerPool.length; i++) { workerPool[i] = new WorkerThread("【工人:"+i+"】", this); } } /** * push switch to start all of worker to work. */ public void startWorker() { Arrays.asList(workerPool).forEach(WorkerThread::start); } public synchronized void put(Request request) { while (count >= requestQueue.length) { try { this.wait(); } catch (Exception e) { } } this.requestQueue[tail] = request; this.tail = (tail + 1) % requestQueue.length; this.count++; this.notifyAll(); } public synchronized Request take() { while (count <= 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Request request = this.requestQueue[head]; this.head = (this.head + 1) % this.requestQueue.length; this.count--; this.notifyAll(); return request; } }
public class Request { private final String name; private final int number; public Request(final String name, final int number) { this.name = name; this.number = number; } public void execute() { System.out.println(Thread.currentThread().getName() + " executed " + this); } @Override public String toString() { return " Request=> No." + number + " Name." + name; } }
public class TransportThread extends Thread { private final Channel channel; private static final Random random = new Random(System.currentTimeMillis()); public TransportThread(String name, Channel channel) { super(name); this.channel = channel; } @Override public void run() { try { for (int i = 0; true; i++) { Request request = new Request(getName(), i); //向channel中放入request对象(谁放的,编号是多少) this.channel.put(request); Thread.sleep(random.nextInt(1_000)); } } catch (Exception e) { } } }
public class WorkerThread extends Thread { private final Channel channel; private static final Random random = new Random(System.currentTimeMillis()); public WorkerThread(String name, Channel channel) { super(name); this.channel = channel; } @Override public void run() { while (true) { //取出request,执行里面的方法 channel.take().execute(); try { Thread.sleep(random.nextInt(1_000)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Test { public static void main(String[] args) { final Channel channel = new Channel(5); channel.startWorker(); new TransportThread("Mark", channel).start(); new TransportThread("Jack", channel).start(); new TransportThread("Irish", channel).start(); } }