zoukankan      html  css  js  c++  java
  • Storm自定义调度器实现--DirectScheduler

    前言

    最近在研究Storm的任务调度相关的知识,于是就想要试着去改造一下Storm的任务调度,来满足一下现实状况中的一些场景。

    Storm调度的相关术语

    在看Storm的Scheduler代码么之前,得要弄明白几个概念,这样可以帮助大家更好的理解后面的调度过程。
    1、slot。这代表一个Supervisor节点上的一个单位资源。每个slot对应一个port,一个slot只能被一个Worker占用。
    2、Worker,Executor.Task,1个Worker包含1个或多个Executor执行器,每个执行器包含多个Task。
    3、Executor的表现形式为[1-1],[2-2],中括号内的数字代表该Executor中的起始Task id到末尾Task id,1个Worker就相当于在外面加个大括号{[1-1],[2-2]}
    4.Component。Storm中的每个组件就是指一类Spout或1个类型的Bolt,这里指的是名称类型,不包含个数。
    下面是调度器的核心实现。

    代码实现

    import backtype.storm.scheduler.*;
    import clojure.lang.PersistentArrayMap;
    import java.util.*;
    
    /**
     * 直接分配调度器,可以分配组件到指定节点中
     * Created by zhexuan on 15/7/6.
     */
    public class DirectScheduler implements IScheduler{
    
    
    @Override
    public void prepare(Map conf) {
    
    }
    
    @Override
    public void schedule(Topologies topologies, Cluster cluster) {
        System.out.println("DirectScheduler: begin scheduling");
        // Gets the topology which we want to schedule
        Collection<TopologyDetails> topologyDetailes;
        TopologyDetails topology;
        //作业是否要指定分配的标识
        String assignedFlag;
        Map map;
        Iterator<String> iterator = null;
    
        topologyDetailes = topologies.getTopologies();
        for(TopologyDetails td: topologyDetailes){
            map = td.getConf();
            assignedFlag = (String)map.get("assigned_flag");
    
            //如何找到的拓扑逻辑的分配标为1则代表是要分配的,否则走系统的调度
            if(assignedFlag != null && assignedFlag.equals("1")){
                System.out.println("finding topology named " + td.getName());
                topologyAssign(cluster, td, map);
            }else {
                System.out.println("topology assigned is null");
            }
        }
    
        //其余的任务由系统自带的调度器执行
        new EvenScheduler().schedule(topologies, cluster);
    }
    
    
    /**
     * 拓扑逻辑的调度
     * @param cluster
     * 集群
     * @param topology
     * 具体要调度的拓扑逻辑
     * @param map
     * map配置项
     */
    private void topologyAssign(Cluster cluster, TopologyDetails topology, Map map){
        Set<String> keys;
        PersistentArrayMap designMap;
        Iterator<String> iterator;
    
        iterator = null;
        // make sure the special topology is submitted,
        if (topology != null) {
            designMap = (PersistentArrayMap)map.get("design_map");
            if(designMap != null){
                System.out.println("design map size is " + designMap.size());
                keys = designMap.keySet();
                iterator = keys.iterator();
    
                System.out.println("keys size is " + keys.size());
            }
    
            if(designMap == null || designMap.size() == 0){
                System.out.println("design map is null");
            }
    
            boolean needsScheduling = cluster.needsScheduling(topology);
    
            if (!needsScheduling) {
                System.out.println("Our special topology does not need scheduling.");
            } else {
                System.out.println("Our special topology needs scheduling.");
                // find out all the needs-scheduling components of this topology
                Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
    
                System.out.println("needs scheduling(component->executor): " + componentToExecutors);
                System.out.println("needs scheduling(executor->components): " + cluster.getNeedsSchedulingExecutorToComponents(topology));
                SchedulerAssignment currentAssignment = cluster.getAssignmentById(topology.getId());
                if (currentAssignment != null) {
                    System.out.println("current assignments: " + currentAssignment.getExecutorToSlot());
                } else {
                    System.out.println("current assignments: {}");
                }
    
                String componentName;
                String nodeName;
                if(designMap != null && iterator != null){
                    while (iterator.hasNext()){
                        componentName = iterator.next();
                        nodeName = (String)designMap.get(componentName);
    
                        System.out.println("现在进行调度 组件名称->节点名称:" + componentName + "->" + nodeName);
                        componentAssign(cluster, topology, componentToExecutors, componentName, nodeName);
                    }
                }
            }
        }
    }
    
    /**
     * 组件调度
     * @param cluster
     * 集群的信息
     * @param topology
     * 待调度的拓扑细节信息
     * @param totalExecutors
     * 组件的执行器
     * @param componentName
     * 组件的名称
     * @param supervisorName
     * 节点的名称
     */
    private void componentAssign(Cluster cluster, TopologyDetails topology, Map<String, List<ExecutorDetails>> totalExecutors, String componentName, String supervisorName){
        if (!totalExecutors.containsKey(componentName)) {
            System.out.println("Our special-spout does not need scheduling.");
        } else {
            System.out.println("Our special-spout needs scheduling.");
            List<ExecutorDetails> executors = totalExecutors.get(componentName);
    
            // find out the our "special-supervisor" from the supervisor metadata
            Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
            SupervisorDetails specialSupervisor = null;
            for (SupervisorDetails supervisor : supervisors) {
                Map meta = (Map) supervisor.getSchedulerMeta();
    
                if(meta != null && meta.get("name") != null){
                    System.out.println("supervisor name:" + meta.get("name"));
    
                    if (meta.get("name").equals(supervisorName)) {
                        System.out.println("Supervisor finding");
                        specialSupervisor = supervisor;
                        break;
                    }
                }else {
                    System.out.println("Supervisor meta null");
                }
    
            }
    
            // found the special supervisor
            if (specialSupervisor != null) {
                System.out.println("Found the special-supervisor");
                List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);
    
                // 如果目标节点上已经没有空闲的slot,则进行强制释放
                if (availableSlots.isEmpty() && !executors.isEmpty()) {
                    for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
                        cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));
                    }
                }
    
                // 重新获取可用的slot
                availableSlots = cluster.getAvailableSlots(specialSupervisor);
    
                // 选取节点上第一个slot,进行分配
                cluster.assign(availableSlots.get(0), topology.getId(), executors);
                System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]");
            } else {
                System.out.println("There is no supervisor find!!!");
            }
        }
    }
    

    }

    说明部分

    Storm自定义实现直接分配调度器,代码修改自Twitter Storm核心贡献者徐明明,此处为链接.

    开发背景

    在准备开发Storm自定义之前,事先已经了解了下现有Storm使用的调度器,默认是DefaultScheduler,调度原理大体如下:
    * 在新的调度开始之前,先扫描一遍集群,如果有未释放掉的slot,则先进行释放
    * 然后优先选择supervisor节点中有空闲的slot,进行分配,以达到最终平均分配资源的目标

    现有scheduler的不足之处

    上述的调度器基本可以满足一般要求,但是针对下面个例还是无法满足:
    * 让spout分配到固定的机器上去,因为所需的数据就在那上面
    * 不想让2个Topology运行在同一机器上,因为这2个Topology都很耗CPU

    DirectScheduler的作用

    DirectScheduler把划分单位缩小到组件级别,1个Spout和1个Bolt可以指定到某个节点上运行,如果没有指定,还是按照系统自带的调度器进行调度.这个配置在Topology提交的Conf配置中可配.

    使用方法

    集群配置

    • 打包此项目,将jar包拷贝到STORM_HOME/lib目录下,在nimbus节点上的Storm包
    • 在nimbus节点的storm.yaml配置中,进行如下的配置:

      storm.scheduler: "storm.DirectScheduler"
    • 然后是在supervisor的节点中进行名称的配置,配置项如下:


      supervisor.scheduler.meta:
      name: "your-supervisor-name"

    在集群这部分的配置就结束了,然后重启nimbus,supervisor节点即可,集群配置只要1次配置即可.

    拓扑逻辑配置

    见下面的代码设置,主要是把组件名和节点名称作为映射值传入

    int numOfParallel;
    TopologyBuilder builder;
    StormTopology stormTopology;
    Config config;
    //待分配的组件名称与节点名称的映射关系
    HashMap<String, String> component2Node;
    
    //任务并行化数设为10个
    numOfParallel = 2;
    
    builder = new TopologyBuilder();
    
    String desSpout = "my_spout";
    String desBolt = "my_bolt";
    
    //设置spout数据源
    builder.setSpout(desSpout, new TestSpout(), numOfParallel);
    
    builder.setBolt(desBolt, new TestBolt(), numOfParallel)
                .shuffleGrouping(desSpout);
    
    config = new Config();
    config.setNumWorkers(numOfParallel);
    config.setMaxSpoutPending(65536);
    config.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 40000);
    config.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 40000);
    
    component2Node = new HashMap<>();
    
    component2Node.put(desSpout, "special-supervisor1");
    component2Node.put(desBolt, "special-supervisor2");
    
    //此标识代表topology需要被调度
    config.put("assigned_flag", "1");
    //具体的组件节点对信息
    config.put("design_map", component2Node);
    
    StormSubmitter.submitTopology("test", config, builder.createTopology());
    

    拓扑逻辑作业具体要被调度时,传入配置参数即可.

    调度器后期优化

    DirectScheduler只是针对原有的调度实现做了1层包装,后期可以进行更深层次的改造,涉及到节点在分配的时候slot的排序等等.

    完整代码地址

    https://github.com/linyiqun/storm-scheduler

  • 相关阅读:
    Kettle 使用入门
    git mac客户端使用提交与同步
    MAC 远程桌面链接 证书或链接无效
    mac下wifi无法连接的问题
    mysql时间段内查询
    mybatis 特殊符号及like的使用
    mac下剪切文件或文件夹
    eclipse下使用git下载和上传项目
    unbutu下搭建FTP服务
    mybatis 的if else
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183908.html
Copyright © 2011-2022 走看看