zoukankan      html  css  js  c++  java
  • 流量计算-Jstorm提交Topology过程(下一个)

    马上部分流量计算-Jstorm提交Topology过程(上)

    5、上篇任务已经ServiceHandler.submitTopologyWithOpts()方法。在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中。详细代码例如以下:

    TopologyAssignEvent assignEvent = new TopologyAssignEvent();
    			assignEvent.setTopologyId(topologyId);
    			assignEvent.setScratch(false);
    			assignEvent.setTopologyName(topologyname);
    			assignEvent.setOldStatus(Thrift
    					.topologyInitialStatusToStormStatus(options
    							.get_initial_status()));
    
    			TopologyAssign.push(assignEvent);

    6、TopologyAssign是Jstorm一个任务分配器。它会依据配置和Topology中spout和bolt的关系来进行Task的创建和分配,可是详细任务的创建和非配并发其自身完毕的,二是调用Jstorm自身的调度器完毕的。当然Jstorm同意用户依据自己业务需求定制调度器,关于Jstorm的调度器分析会本人专门写一篇文章,此处暂不做不论什么说明。回到TopologyAssign。该类是一个实现了Runnable接口的后台线程。随着Nimbus启动,主要完毕topology作业分配、备份和作业均衡的作用。当天还是通过Jstorm的调度器来完毕的。其run方法会採用堵塞的方式获取自身作业队列中的作业,然后进行作业分配,其作业分配核心业务例如以下

    public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
    		String topologyId = event.getTopologyId();
    		TopologyAssignContext context = prepareTopologyAssign(event);
    		//ResourceWorkerSlot是worker的抽象。封装了worker和其task
    		Set<ResourceWorkerSlot> assignments = null;
    		IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    		//通过Jstorm的调度来计算任务的分配
    		assignments = scheduler.assignTasks(context);
    		Assignment assignment = null;
    		Map<String, String> nodeHost = getTopologyNodeHost(
    				context.getCluster(), context.getOldAssignment(), assignments);
    
    		Map<Integer, Integer> startTimes = getTaskStartTimes(context,
    				nimbusData, topologyId, context.getOldAssignment(), assignments);
    		//获取提交到集群的jar包地址,Worker运行任务时须要下载代码
    		String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(),
    				topologyId);
    		assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);
    		StormClusterState stormClusterState = nimbusData.getStormClusterState();
    		//将分配好的任务上传到ZK,通知supervisor
    		stormClusterState.set_assignment(topologyId, assignment);
    		//更新Task的開始时间
    		NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);
    		// 更新元信息到ZK
    		if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE 
    				|| context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_MONITOR)
    			NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment);
    		else
    			metricsMonitor(event);
    		return assignment;
    	}

    7、Nimbus已经将任务分配好了。而且创建到ZK上,此时就须要supervisor认领自己的任务了,supervisor获取任务的详细逻辑封装在SyncSupervisorEvent,其也是一个后台线程,会不停获取ZK上(JSTORM_ROOT/assignments下)的所有任务,然后把自己的任务保存到本地磁盘上。再通过NimbusClient把topology的代码保存到本地,然后启动worker启动线程来运行任务,详细业务逻辑代码例如以下

    public void run() {
    
    			RunnableCallback syncCallback = new EventManagerZkPusher(this,
    					syncSupEventManager);
    
    			/**
    			 *首次启动时主动获取ZK上JSTORM_ROOT/assignments的所有任务,兴许通过ZK的watch以一种回调的方式获取任务,
    			 */
    			Map<String, Assignment> assignments = Cluster.get_all_assignment(
    					stormClusterState, syncCallback);
    			/**
    			 *获取本地已经下载的topology
    			 */
    			List<String> downloadedTopologyIds = StormConfig
    					.get_supervisor_toplogy_list(conf);
    			/**
    			 * 在所有作业中。获取自身的作业
    			 */
    			Map<Integer, LocalAssignment> localAssignment = getLocalAssign(
    					stormClusterState, supervisorId, assignments);
    
    			/**
    			 * 将作业保存到本地磁盘
    			 */
    			localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);			
    			// 获取topology的代码下载地址
    			Map<String, String> topologyCodes = getTopologyCodeLocations(
    					assignments, supervisorId);
    			//通过NimbusClient将代码下载到本地
    			downloadTopology(topologyCodes, downloadedTopologyIds);
    
    			/**
    			 * 删除没用的topology
    			 */
    			removeUselessTopology(topologyCodes, downloadedTopologyIds);
    
    			/**
    			 * 将syncProcesses加到运行队列。syncProcesses复杂启动新的worker来运行任务
    			 */
    			processEventManager.add(syncProcesses);
    
    	}
    
    8、SyncSupervisorEvent将自己的作业选出来,并保存到本地之后,再由SyncProcessEvent来启动worker运行详细的作业。SyncProcessEvent主要干两件事。启动新的worker。杀死没用的worker。此处要涉及启动新的Worker,详细业务逻辑例如以下

    private void startNewWorkers(Set<Integer> keepPorts,
    			Map<Integer, LocalAssignment> localAssignments) throws Exception {
    		/**
    		 * 获取本次新分配的作业
    		 */
    		Map<Integer, LocalAssignment> newWorkers = JStormUtils
    				.select_keys_pred(keepPorts, localAssignments);
    
    		/**
    		 * 给每一个新作业生成一个ID
    		 */
    		Map<Integer, String> newWorkerIds = new HashMap<Integer, String>();
    
    		for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) {
    			Integer port = entry.getKey();
    			LocalAssignment assignment = entry.getValue();
    
    			String workerId = UUID.randomUUID().toString();
    			newWorkerIds.put(port, workerId);
    			//保存每一个Worker的ID到本地
    
    			StormConfig.worker_pids_root(conf, workerId);
    			//启动新的JVM运行作业
    			launchWorker(conf, sharedContext,
    							assignment.getTopologyId(), supervisorId, port,
    							workerId, assignment);
    				
    		}

    以上就是Jstorm提交一个topology的过程。这两篇文章仅仅是给出了一条主线。具体的代码逻辑并未具体给出,兴许会不断完好,同一时候关于Jstrom的调度器兴许也会给详细分析

    版权声明:本文博客原创文章。博客,未经同意,不得转载。

  • 相关阅读:
    动态传参
    函数的介绍
    文件的操作
    send email with formatted table
    minimize and close window with customed winform
    python algorithm
    something important about docker
    book list
    which language is suitable for what to do
    Find Duplicate Items in list fast
  • 原文地址:https://www.cnblogs.com/blfshiye/p/4623034.html
Copyright © 2011-2022 走看看