zoukankan      html  css  js  c++  java
  • java线程基础巩固---线程生产者消费者的综合实战结合Java8语法

    基于上一次【http://www.cnblogs.com/webor2006/p/8909558.html】学习的多个生产者与多个消费者模型,此次用另外一个案例来进一步巩固线程之间的调度处理,这里还是以之前【http://www.cnblogs.com/webor2006/p/7895410.html】举过的信息采集功能为例:有若干台采集服务器,然后还有一台主机,这台主机需要等待这若干台服务器信息采集完之后再做进一步的处理,一台采集服务器就对应一个线程,如之前写的代码:

    但是假如有成千上万台那怎么办,不可能每台服务器对应一个线程,因为线程是有一个stackSize的上限滴,此时就需要用到线程同步来避勉这个问题了,比如:最大工作线程数就是5个,然后当超过5个的时候其它的工作线程都会进行wait()释放出cpu执行权,待有一个工作线程执行完了则通知wait()的其它线程加入工作队列,大体思路是这样,下面来看一下如何具体执行,这里采用Java8的语法来编写,毕境之前学习过练练手:

    首先创建十个线程,这里采用流的方式来创建,如下:

    好,具体线程的执行代码这里先放一放,先将框架代码编写好,因为需要等待每个线程的执行完成再执行,所以此时需要用到join()方法了,如下:

    所以此时应该用一个集合将所有启动的线程给缓存下来,但是对于Java8中的Stream.forEach()方法是一个teminal operation,执行完流操作就结束了,所以这里定义一个集合变量准备来缓存一下:

    然后接下来就可以进行join()总的操作了,如下:

    好了~~框架搭建完毕!接下来就得将重心放在线程的具体执行这块了,如下:

    接着线程执行前先要来判断是否工作线程数已经超过5个了,如果超过那其它所有线程都得wait()了,而如何判断工作线程总数呢?这里需要用一定的技巧,这里就不卖关子了,贴出来细细体会:

    public class CaptureService {
    
        private static final int MAX_WORKER = 5;
        private static final LinkedList<CaptureService.Control> CONTROLS = new LinkedList<>();//代表总的工作线程
    
        public static void main(String[] args) {
    
            List<Thread> worker = new ArrayList<>();
    
            Arrays.asList("M1", "M2", "M3", "M4", "M5", "M6", "M7", "M8", "M9", "M10").stream()
                    .map(CaptureService::createCaptureThread)
                    .forEach(t -> {
                        t.start();//启动每一个线程
                        worker.add(t);
                    });
            worker.stream().forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Optional.of("All of capture work finished").ifPresent(System.out::println);
        }
    
        private static Thread createCaptureThread(String name) {
            return new Thread(() -> {
                Optional.of("The worker [" + Thread.currentThread().getName() + "] BEGIN capture data").ifPresent(System.out::println);
                synchronized (CONTROLS) {
                    while (CONTROLS.size() > MAX_WORKER) {
                        try {
                            Optional.of("The worker [" + Thread.currentThread().getName() + "] WAIT..").ifPresent(System.out::println);
                            CONTROLS.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                    CONTROLS.addLast(new CaptureService.Control());
                }
            }, name);
        }
    
        private static class Control {
            //nothing to do,just a tag
        }
    }

    而如果不满足wait()条件的那当然就得正常工作喽,如下:

    而当一个线程任务执行完,则需要通知其它还在wait()住的线程叫他们赶紧加入工作队列中,如下:

    另外还得注意咱们是按先进先出的原则来添加tag的,如下:

    好,至此代码写完,看一下运行结果:

  • 相关阅读:
    Max Sum of Max-K-sub-sequence(单调队列)
    Matrix Swapping II(求矩阵最大面积,dp)
    重温世界杯(贪心)
    Pie(求最小身高差,dp)
    Matrix(多线程dp)
    Python 实现自动导入缺失的库
    分布式系统session一致性解决方案
    数据结构 【链表】
    【数字图像处理】gamma变换
    【数字图像处理】顶帽变换和底帽变换
  • 原文地址:https://www.cnblogs.com/webor2006/p/9021816.html
Copyright © 2011-2022 走看看