zoukankan      html  css  js  c++  java
  • Tiny并行计算框架之实现机理

    这一篇呢,主要介绍其实现机理。  当然,秉承偶的一向的观点,让新手也能看得懂。 
    首先看工作的接口: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    public interface Work extends Serializable {
        /**
         * 返回工作类型,每个工作都有一个工作类型,包工头及工人只能处理同样类型的工作
         *
         * @return
         */
        String getType();

        String getId();

        /**
         * 返回后续步骤的工作,如果有,说明是复合工作,如果没有,说明是简单工作
         *
         * @return
         */
        Work getNextWork();

        /**
         * 设置后续步骤工作
         *
         * @param nextWork 后续工作
         * @return 返回后续工作
         */
        Work setNextWork(Work nextWork);

        /**
         * 是否需要序列化
         *
         * @return true表示工作永不丢失,false表示容器关闭即丢失
         */
        boolean isNeedSerialize();

        /**
         * 设置是否需要进行序列化,如果要用到MQ,则需要设置为需要序列化
         *
         * @param needSerialize true表示工作永不丢失,false表示容器关闭即丢失
         */
        void setNeedSerialize(boolean needSerialize);

        /**
         * 返回输入仓库
         *
         * @return
         */
        Warehouse getInputWarehouse();


        /**
         * 设置输入仓库
         *
         * @param inputWarehouse
         */
        void setInputWarehouse(Warehouse inputWarehouse);

        /**
         * 设置工作状态
         *
         * @param workStatus
         */
        void setWorkStatus(WorkStatus workStatus);

        /**
         * 获取工作状态
         *
         * @return
         */
        WorkStatus getWorkStatus();
    }



    是不是很简单,它的实现也是同样简单: 

    1
    2
    3
    4
    5
    6
    7
    8
    public class WorkDefault implements Work {
        private String id;
        private String type;
        private Warehouse inputWarehouse;
        private boolean needSerialize = false;
        private Work nextStepWork = null;
        private WorkStatus workStatus = WorkStatus.WAITING;
    }



    基本上就是上述属性的set,get方法。 
    工人的接口如下: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
    * 工人,用于干具体的工作
    * Created by luoguo on 14-1-8.
    */
    public interface Worker extends ParallelObject {
        /**
         * 执行工作
         *
         * @return
         */
        Warehouse work(Work work) throws RemoteException;

        /**
         * 是否接受工作
         * 即使是同样类型的工人,有可能对工作也挑三捡四,这里给了工人一定的灵活性
         *
         * @param work
         * @return true表示接受,false表示不接受
         */
        boolean acceptWork(Work work) throws RemoteException;

          /**
         * 返回类型
         *
         * @return
         */
        String getType() throws RemoteException;

    }



    是不是也很简单? 
    下面看看工头: 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * 包工头
    * 包工头用于带着一组工人并完成对应的任务
    * Created by luoguo on 14-1-8.
    */
    public interface Foreman extends ParallelObject {
        /**
         * 返回执行哪种类型的工作任务
         *
         * @return
         */
        String getType() throws RemoteException;

        /**
         * 开始干活以完成工作
         */
        Warehouse work(Work work, List<Worker> workerList) throws IOException;

        /**
         * 设置工作合并器
         *
         * @param workCombiner
         */
        void setWorkCombiner(WorkCombiner workCombiner);

        /**
         * 设置工作分解器
         *
         * @param workSplitter
         */
        void setWorkSplitter(WorkSplitter workSplitter);

    }



    下面看看职业介绍所,呵呵,这个就复杂些了(方法多了) 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    /**
    * 职介所
    * 职介所是分布式处理的核心场所,所有工作相关的元素都要通过职介所进行关联
    * Created by luoguo on 14-1-8.
    */
    public interface JobCenter {
        String WORK_QUEUE = "WorkQueue";
        String FOREMAN = "Foreman";
        String WORKER = "Worker";

        RmiServer getRmiServer();

        void setRmiServer(RmiServer rmiServer);

        /**
         * 注册工人
         *
         * @param worker
         */
        void registerWorker(Worker worker) throws RemoteException;

        /**
         * 返回工作队列对象
         *
         * @return
         */
        WorkQueue getWorkQueue();

        /**
         * 注消工人
         *
         * @param worker
         */
        void unregisterWorker(Worker worker) throws RemoteException;

        /**
         * 注册一份工作,工作情况不需要马上关注。因此也就不用等待,马上返回可以进行其它处理
         * 如果有返回结果,可以通过异步方式,异步方式可以用后续工作的方式来指定
         *
         * @param work
         */
        void registerWork(Work work) throws IOException;


        /**
         * 取消工作,在工作没有分配出去之前,可以从职介所注消工作,如果工作已经分配出去,则无法注消
         *
         * @param work
         */
        void unregisterWork(Work work) throws RemoteException;

        /**
         * 返回指定工作的工作状态
         *
         * @param work
         * @return
         */
        WorkStatus getWorkStatus(Work work) throws RemoteException;

        /**
         * 执行一项工作,期望同步得到结果或异常
         * 如果没有合适的工人或包工头进行处理,马上会抛出异常
         *
         * @param work
         */
        Warehouse doWork(Work work) throws IOException;

        /**
         * 注册包工头
         *
         * @param foreman
         */
        void registerForeman(Foreman foreman) throws RemoteException;

        /**
         * 注销包工头
         *
         * @param foreman
         */
        void unregisterForeMan(Foreman foreman) throws RemoteException;

        /**
         * 返回具有某种类型的空闲且愿意接受工作的工人列表
         *
         * @return
         */
        List<Worker> getIdleWorkerList(Work work);


        /**
         * 返回所有的工作列表
         *
         * @return
         */
        List<Work> getWorkList() throws RemoteException;

        /**
         * 返回某种类型的某种状态的工作列表
         *
         * @return
         */
        List<Work> getWorkList(String type, WorkStatus workStatus) throws RemoteException;


        /**
         * 返回组织某种工作的的空闲工头列表
         *
         * @param type
         * @return
         */
        List<Foreman> getIdleForeman(String type);

        /**
         * 自动进行匹配,如果有匹配成功的,则予以触发执行
         */
        void autoMatch() throws IOException;

        /**
         * 职业介绍所关门
         *
         * @throws RemoteException
         */
        void stop() throws RemoteException;
    }





    讲过了四个重要接口,现在说说实现思路:   工人、包工头都是无状态的。这有个好处是不管来多少工作,都可以进行处理;缺点是没有办法进行后续干预。 
    综合来说,我还是觉得无状态的比有状态的更好。 
    因为一开始我的实现是有状态的,甚至可以让包工头取消一个工作,包工头再让工人取消工作,但是这样会带来异常复杂的分布式状态维护。但是改成用无状态的模式,就方便多了。 
    在职业介绍所,有两个重要的方法,一个是doWork,即立即执行一个工作,如果找不到合适的工头和工人,就会抛出异常;另外一个是autoMatch触发职业介绍所进行一次自动匹配。之所以没有做在职业介绍所内部开启线程是为了给外部提供更方便的控制。 
    所以,其总体设计思想就是开个职业介绍所,工人或工头、工作就可以注册进来,由职业介绍所来撮合成合适的虚拟小组来达成任务。是不是很容易理解? 
    下面就是所有的接口与实现类了: 

    155218_thit_1245989.jpg (16.92 KB, 下载次数: 0)

    下载附件

    2015-5-27 22:14 上传

     

      
    对于做并行开发的人员来说: 
    职业介绍所,工人,工头都不用开发,框架自带的已经足够用了。开发人员只要开发工人和工作分解合并器即可。 
    工人继承AbstractWorker之后,只有一个方法实现即可。工作分解一个方法,工作合并一个方法,其它的都交给Tiny并行计算框架吧。 
    总结: 
    此框架对并行计算的参与者进行了分解,分解为职业介绍所、工头、工人及工作,职业介绍所是中心,职业介绍所停止运行,将无法进行并行计算。工头、工人是可以在任意计算机的,工作是可以在任意节点添加的,由于框架提供了工作序列化功能,因此只要设置工作需要序列化是true,此工作将一直存在,直到被完成,利用此特性可以方便的实现简单的MQ。同时为方便业务实现,工人有抽象类,建议直接继承抽象类实现要关业务方法即可。 
    注意:目前在执行过程中工人注销或停止服务,会导致整个工作执行失败,下次继续进行执行,后续拟改成考虑用其它工人继续工作。 
    今天比较忙,写得比较匆忙,如果有不清楚的可以在下面提问。

  • 相关阅读:
    2017.2.16 开涛shiro教程-第十七章-OAuth2集成(二)客户端
    7.编写Java应用程序。首先,定义一个Print类,它有一个方法void output(int x),如果x的值是1,在控制台打印出大写的英文字母表;如果x的值是2,在 控制台打印出小写的英文字母表。其次,再定义一个主类——TestClass,在主类 的main方法中创建Print类的对象,使用这个对象调用方法output ()来打印出大 小写英文字母表。
    6.编写一个Java应用程序,该应用程序包括2个类:Print类和主类E。Print 类里有一个方法output()功能是输出100 ~ 999之间的所有水仙花数(各位数字的 立方和等于这个三位数本身,如: 371 = 33 + 73 + 13。)在主类E的main方法中来 测试类Print。
    5.编写Java应用程序。首先,定义描述学生的类——Student,包括学号(int)、 姓名(String)、年龄(int)等属性;二个方法:Student(int stuNo,String name,int age) 用于对对象的初始化,outPut()用于输出学生信息。其次,再定义一个主类—— TestClass,在主类的main方法中创建多个Student类的对象,使用这些对象来测 试St
    17.从键盘上输入一个正整数n,请按照以下五行杨辉三角形的显示方式, 输出杨辉三角形的前n行。请采用循环控制语句来实现。 (三角形腰上的数为1,其他位置的数为其上一行相邻两个数之和。) 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 5 1
    16.按要求编写Java应用程序。 编写一个名为Test的主类,类中只有一个主方法; 在主方法中定义一个大小为50的一维整型数组,数组名为x,数组中存放着{1, 3,5,…,99}输出这个数组中的所有元素,每输出十个换一行;在主方法中定义一 个大小为10*10的二维字符型数组,数组名为y,正反对角线上存的是‘*’,其余 位置存的是‘#’;输出这个数组中的所有元素。
    15.找出如下数组中最大的元素和最小的元素, a[][]={{3,2,6},{6,8,2,10},{5},{12,3,23}}
    已知2个一维数组:a[]={3,4,5,6,7},b[]={1,2,3,4,5,6,7};把数组a与数组b 对应的元素乘积再赋值给数组b,如:b[2]=a[2]*b[2];最后输出数组b的元素。
    2.采用字符的移位方式实现字符文本加密解密。
    1. 用自己的算法实现startsWith和endsWith功能。
  • 原文地址:https://www.cnblogs.com/j2eetop/p/4612477.html
Copyright © 2011-2022 走看看