zoukankan      html  css  js  c++  java
  • Tiny并行计算框架之复杂示例

    问题来源  非常感谢@doctorwho的问题: 
    假如职业介绍所来了一批生产汽车的工作,假设生产一辆汽车任务是这样的:搭好底盘、拧4个轮胎、安装发动机、安装4个座椅、再装4个车门、最后安装顶棚。之间有的任务是可以并行计算的(比如拧4个轮胎,安装发动机和安装座椅),有的任务有前置任务(比如先装好座椅,才能装车门和顶棚)。让两组包工头组织两种类型的工作:将工人分成两种类型,即可并行计算的放在同一组内,由职业介绍所来控制A组包工头做完的任务交给B组包工头。中间环节的半成品保存到Warehouse中,是这样使用TINY框架来生产汽车么?
    接下来,我就用Tiny并行计算框架来展示一下这个示例,在编写示例的时候,发现了一个BUG,这也充分体现了开源的精神与价值,再次感谢@doctorwho。 
    问题分析 
    doctorwho的问题还是比较复杂的,但是实际上道理是一样的,因此我把问题简化成下面的过程 
    第一步:构建底盘 
    第二步:并行进行安装引擎,座位和轮胎

    第三步:并行进行安装门及车顶 
    由于我和doctorwho都不是造车行家,因此就不用纠结这么造是不是合理了,假设这么做就是合理的。 
    代码实现 
    按我前面说的过程,工人是必须要有的,因此我们首先构建工人: 
    第一步的底盘构建工人 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class StepFirstWorker extends AbstractWorker {
        public StepFirstWorker() throws RemoteException {
            super("first");
        }

        @Override
        protected Warehouse doWork(Work work) throws RemoteException {
            System.out.println(String.format("%s 构建底盘完成.", work.getInputWarehouse().get("carType")));
            Warehouse outputWarehouse = work.getInputWarehouse();
            outputWarehouse.put("baseInfo", "something about baseInfo");
            return outputWarehouse;
        }
    }



    由于第二步工人有好几个类型,因此再搞个第二步抽象工人: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public abstract class StepThirdWorker extends AbstractWorker {
        public StepThirdWorker() throws RemoteException {
            super("third");
        }


        protected boolean acceptMyWork(Work work) {
            String workClass = work.getInputWarehouse().get("class");
            if (workClass != null) {
                return true;
            }
            return false;
        }
        protected Warehouse doMyWork(Work work) throws RemoteException {
            System.out.println(String.format("Base:%s ", work.getInputWarehouse().get("baseInfo")));
            System.out.println(String.format("%s is Ok", work.getInputWarehouse().get("class")));
            return work.getInputWarehouse();
        }
    }



    接下来构建第二步的引擎工人: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class StepSecondEngineWorker extends StepSecondWorker {


    public static final String ENGINE = "engine";


    public StepSecondEngineWorker() throws RemoteException {
    super();
    }


    public boolean acceptWork(Work work) {
    return acceptMyWork(work);
    }


    protected Warehouse doWork(Work work) throws RemoteException {
    return super.doMyWork(work);
    }
    }



    第二步的座位工人: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class StepSecondSeatWorker extends StepSecondWorker {


        public static final String SEAT = "seat";


        public StepSecondSeatWorker() throws RemoteException {
            super();
        }
        public boolean acceptWork(Work work) {
           return acceptMyWork(work);
        }
        protected Warehouse doWork(Work work) throws RemoteException {
            return super.doMyWork(work);
        }
    }

    <div>


    </div>



    第二步的轮胎工人: 

    ?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class StepSecondTyreWorker extends StepSecondWorker {
    public static final String TYRE = "tyre";


    public StepSecondTyreWorker() throws RemoteException {
    super();
    }


    public boolean acceptWork(Work work) {
    return acceptMyWork(work);
    }


    protected Warehouse doWork(Work work) throws RemoteException {
    return super.doMyWork(work);
    }
    }



    同理,第三步也是大同小异的。 
    第三步的抽象工人类: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public abstract class StepThirdWorker extends AbstractWorker {
        public StepThirdWorker() throws RemoteException {
            super("third");
        }


        protected boolean acceptMyWork(Work work) {
            String workClass = work.getInputWarehouse().get("class");
            if (workClass != null) {
                return true;
            }
            return false;
        }
        protected Warehouse doMyWork(Work work) throws RemoteException {
            System.out.println(String.format("Base:%s ", work.getInputWarehouse().get("baseInfo")));
            System.out.println(String.format("%s is Ok", work.getInputWarehouse().get("class")));
            return work.getInputWarehouse();
        }
    }



    第三步的车门工人: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class StepThirdDoorWorker extends StepThirdWorker {


        public static final String DOOR = "door";


        public StepThirdDoorWorker() throws RemoteException {
            super();
        }
        public boolean acceptWork(Work work) {
            return acceptMyWork(work);
        }
        @Override
        protected Warehouse doWork(Work work) throws RemoteException {
            return super.doMyWork(work);
        }
    }



    第三步的车顶工人: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class StepThirdRoofWorker extends StepThirdWorker {


        public static final String ROOF = "roof";


        public StepThirdRoofWorker() throws RemoteException {
            super();
        }
        public boolean acceptWork(Work work) {
            return acceptMyWork(work);
        }
        protected Warehouse doWork(Work work) throws RemoteException {
            return super.doMyWork(work);
        }
    }



    以上就把工人都构建好了,我们前面也说过,如果要进行任务分解,是必须要构建任务分解合并器的,这里简单起见,只实现任务分解了。 
    第二部的任务分解: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class SecondWorkSplitter implements WorkSplitter {
        public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {
            List<Warehouse> list = new ArrayList<Warehouse>();
            list.add(getWareHouse(work.getInputWarehouse(), "engine"));
            list.add(getWareHouse(work.getInputWarehouse(), "seat"));
            list.add(getWareHouse(work.getInputWarehouse(), "seat"));
            list.add(getWareHouse(work.getInputWarehouse(), "seat"));
            list.add(getWareHouse(work.getInputWarehouse(), "seat"));
            list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
            list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
            list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
            list.add(getWareHouse(work.getInputWarehouse(), "tyre"));
            return list;
        }

        private Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {
            Warehouse warehouse = new WarehouseDefault();
            warehouse.put("class", stepClass);
            warehouse.putSubWarehouse(inputWarehouse);
            return warehouse;
        }
    }



    从上面可以看到,构建了一个引擎的仓库,4个座位仓库,4个轮胎仓库。呵呵,既然能并行,为啥不让他做得更快些? 
    接下来是第三步的任务分解器: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class ThirdWorkSplitter implements WorkSplitter {
        public List<Warehouse> split(Work work, List<Worker> workers) throws RemoteException {
            List<Warehouse> list = new ArrayList<Warehouse>();
            list.add(getWareHouse(work.getInputWarehouse(), "door"));
            list.add(getWareHouse(work.getInputWarehouse(), "door"));
            list.add(getWareHouse(work.getInputWarehouse(), "door"));
            list.add(getWareHouse(work.getInputWarehouse(), "door"));
            list.add(getWareHouse(work.getInputWarehouse(), "roof"));
            return list;
        }

        private Warehouse getWareHouse(Warehouse inputWarehouse, String stepClass) {
            Warehouse warehouse = new WarehouseDefault();
            warehouse.put("class", stepClass);
            warehouse.putSubWarehouse(inputWarehouse);
            return warehouse;
        }
    }



    从上面可以看到,第三部构建了4个门仓库一个车顶仓库,同样的,可以让4个工人同时装门。 
    上面就把所有的准备工作都做好了,接下来就是测试方法了: 


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    public class Test {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            JobCenter jobCenter = new JobCenterLocal();


            for (int i = 0; i < 5; i++) {
                jobCenter.registerWorker(new StepFirstWorker());
            }
            for (int i = 0; i < 5; i++) {
                jobCenter.registerWorker(new StepSecondTyreWorker());
            }
            for (int i = 0; i < 5; i++) {
                jobCenter.registerWorker(new StepSecondSeatWorker());
            }
            for (int i = 0; i < 5; i++) {
                jobCenter.registerWorker(new StepSecondEngineWorker());
            }
            for (int i = 0; i < 5; i++) {
                jobCenter.registerWorker(new StepThirdDoorWorker());
            }
            for (int i = 0; i < 5; i++) {
                jobCenter.registerWorker(new StepThirdRoofWorker());
            }

            jobCenter.registerForeman(new ForemanSelectOneWorker("first"));
         <span></span> <span></span>jobCenter.registerForeman(new ForemanSelectAllWorker("second",
    new SecondWorkSplitter()));
    jobCenter.registerForeman(new ForemanSelectAllWorker("third",new ThirdWorkSplitter()));


    Warehouse inputWarehouse = new WarehouseDefault();
    inputWarehouse.put("class", "car");
    inputWarehouse.put("carType", "普桑");
    WorkDefault work = new WorkDefault("first", inputWarehouse);
    work.setForemanType("first");
    WorkDefault work2 = new WorkDefault("second");
    work2.setForemanType("second");
    WorkDefault work3 = new WorkDefault("third");
    work3.setForemanType("third");
    work.setNextWork(work2).setNextWork(work3);


    Warehouse warehouse = jobCenter.doWork(work);


    jobCenter.stop();

        }
    }



    呵呵,工人各加了5个,然后注册了三个工头,第一步的工头是随便挑一个工人类型的,第二步和第三步是挑所有工人的,同时还指定了任务分解器。 
    接下来就构建了一个工作,造一个高端大气上档次的普桑汽车,然后告诉职业介绍所说给我造就可以了。 
    下面是造车的过程,我把日志也贴上来了: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    普桑 构建底盘完成.
    -234  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行开始,线程数9...
    -234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>运行开始...
    -234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>运行开始...
    -234  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>运行开始...
    -235  [id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>运行开始...
    -236  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>运行开始...
    -237  [id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>运行开始...
    Base:something about baseInfo 
    engine is Ok
    Base:something about baseInfo 
    seat is Ok
    -245  [id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-a763f156ffd74b5db285198d2498edcf>运行结束
    -246  [id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>运行开始...
    -246  [id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-abdb57f0641a4727a9efa744d07cf2d1>运行结束
    Base:something about baseInfo 
    seat is Ok
    -248  [id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>运行开始...
    Base:something about baseInfo 
    tyre is Ok
    -250  [id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-f3efba2dc7804c6cbcd5a25f42fdc177>运行结束
    -250  [id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-c2d2fb38ef6c4509b3a39b3e7d5c1d61>运行结束
    Base:something about baseInfo 
    seat is Ok
    -252  [id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>运行开始...
    -253  [id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-8c3b9359bcfa4de7b6e0492daab0d73a>运行结束
    Base:something about baseInfo 
    seat is Ok
    Base:something about baseInfo 
    tyre is Ok
    -257  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d624ea0a6df3409c80df6b97ab3c813b>运行结束
    -258  [id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-04db3f945b804500a2bbe2b9aabdce3b>运行结束
    Base:something about baseInfo 
    tyre is Ok
    Base:something about baseInfo 
    tyre is Ok
    -262  [id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-869b573e226046aca8ad30765f1f300c>运行结束
    -264  [id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8] INFO   - 线程<id:4af96b81d14a4954a6b649308d444e4c,type:second-d6f7074f6c4a4b12bd37ec5f5c11aff8>运行结束
    -264  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行结束, 用时:30ms
    -333  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行开始,线程数5...
    -334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>运行开始...
    -334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>运行开始...
    -334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>运行开始...
    -334  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>运行开始...
    Base:something about baseInfo 
    door is Ok
    Base:something about baseInfo 
    door is Ok
    Base:something about baseInfo 
    -338  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-ca58e9b733514c668a224875417c9d26>运行结束
    door is Ok
    -339  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>运行开始...
    -338  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-17debef817e34c49996a2c38840f3de2>运行结束
    Base:something about baseInfo 
    door is Ok
    Base:something about baseInfo 
    -340  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-3f7707cb2a224d3d8844a09271b24a07>运行结束
    roof is Ok
    -340  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-af0e914cce89480987c6184a885770d5>运行结束
    -342  [id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb] INFO   - 线程<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third-cfccfb37ebda4279b7552f7e060b2ddb>运行结束
    -343  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行结束, 用时:10ms



    从上面的日志可以看出: 
    由于第一步工作是挑一个单干的,因此是没有启用线程组的 
    第二步同时有9个线程干活: 


    1
    2
    3
    -234  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行开始,线程数9...
    ...
    -264  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:4af96b81d14a4954a6b649308d444e4c,type:second>运行结束, 用时:30ms



    第三步同时有5个线程干活: 

    1
    2
    3
    -333  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行开始,线程数5...
    ...
    -343  [RMI TCP Connection(1)-192.168.84.73] INFO   - 线程组<id:9b0de678632d4fb8b87ae9db4b6436f8,type:third>运行结束, 用时:10ms



    总结: 
    Tiny并行计算框架确实是可以方便的解决各种复杂并行计算的问题。

  • 相关阅读:
    mysql实现主从复制
    go get时候 timeout
    linux 修改/etc/profile文件之后 没有效果
    初试 laravel
    php 实现单个大文件(视频)的 断点上传
    UEditor图片左对齐右对齐 要的作用显示之后 保存之后没有效果
    docker 实现 mysql+nginx+php
    redis
    easyPoi框架的excel导入导出
    从生产计划的角度认识精益生产
  • 原文地址:https://www.cnblogs.com/j2eetop/p/4612481.html
Copyright © 2011-2022 走看看