一对一
public static void main(String[] args) { LinkedBlockingQueue list = new LinkedBlockingQueue(); List<Integer> fileList = Collections.synchronizedList(new ArrayList<>()); for (int i = 0, size = 10000; i < size; i++) { int num = new Random().nextInt(); fileList.add(num); } ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); //生产 ThreadPoolExecutor downExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!"); if (!executor.isShutdown()) { r.run(); } } } }); //消费 ThreadPoolExecutor upExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!"); if (!executor.isShutdown()) { r.run(); } } } }); downExecutor.execute(new Runnable() { @Override public void run() { lock.lock(); try { while (true) { if (fileList.size() > 0) { if (list.size() < 1) { Integer first = fileList.get(0); list.add(first); fileList.remove(0); System.out.println("生产了:" + first); condition.signalAll(); } else { System.out.println("fileList:" + fileList.size() + "list:" + list.size()); condition.await(); } } else { System.out.println("生产执行完毕~~~~~"); downExecutor.shutdownNow(); break; } } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }); upExecutor.execute(new Runnable() { @Override public void run() { lock.lock(); try { while (true) { if (list.size() > 0) { int num = (int) list.remove(); System.out.println("消费了:" + num); condition.signalAll(); } else { if (fileList.size() > 0) { System.out.println("list:" + list.size()); condition.await(); } else { System.out.println("消费执行完毕~~~~~"); upExecutor.shutdownNow(); break; } } } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); }
多对多1
public static void main(String[] args) { LinkedBlockingQueue list = new LinkedBlockingQueue(); List<Integer> fileList = Collections.synchronizedList(new ArrayList<>()); for (int i = 0, size = 1000; i < size; i++) { int num = new Random().nextInt(); fileList.add(num); } ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); //生产 ThreadPoolExecutor downExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!"); if (!executor.isShutdown()) { r.run(); } } } }); //消费 ThreadPoolExecutor upExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!"); if (!executor.isShutdown()) { r.run(); } } } }); downExecutor.execute(new Runnable() { @Override public void run() { try { while (true) { if (fileList.size() > 0) { if (list.size() < 1) { Integer first = fileList.get(0); list.add(first); fileList.remove(0); System.out.println("生产了:" + first); } } else { System.out.println("生产执行完毕~~~~~fileList:"+fileList.size()); downExecutor.shutdownNow(); break; } } } catch (Exception e) { e.printStackTrace(); } } }); upExecutor.execute(new Runnable() { @Override public void run() { try { while (true) { if (list.size() > 0) { int num = (int) list.remove(); System.out.println("消费了:" + num); } else { if (fileList.size() == 0) { System.out.println("消费执行完毕~~~~~list:"+list.size()); upExecutor.shutdownNow(); break; } } } } catch (Exception e) { e.printStackTrace(); } } }); }
多对多2
public static void main(String[] args) { LinkedBlockingQueue list = new LinkedBlockingQueue(); List<Integer> fileList = Collections.synchronizedList(new ArrayList<>()); for (int i = 0, size = 1000; i < size; i++) { int num = new Random().nextInt(); fileList.add(num); } ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); //生产 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!"); if (!executor.isShutdown()) { r.run(); } } } }); while (true) { if (fileList.size() > 0) { poolExecutor.execute(new Runnable() { @Override public void run() { if (fileList.size() > 0) { Integer first = fileList.get(0); list.add(first); fileList.remove(0); System.out.println(Thread.currentThread().getName()+"生产了:" + first); } } }); } else { System.out.println("生产执行完毕~~~~~fileList:" + fileList.size()); break; } } while (true) { if (list.size() > 0) { poolExecutor.execute(new Runnable() { @Override public void run() { if (list.size() > 0) { int num = (int) list.remove(); System.out.println(Thread.currentThread().getName()+"消费了:" + num); } } }); } else { System.out.println("消费执行完毕~~~~~list:" + list.size()); poolExecutor.shutdown(); break; } } }
一学就会,一用就懵系列,多线程还是要多练习~