zoukankan      html  css  js  c++  java
  • 流式计算-Jstorm提交Topology过程(上)

    Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt、bolt和bolt之间的关系。它能够被提交到Jstorm集群。

    本文以Jstorm自带的SequenceTopology简介一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及详细业务,

    1、 SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.SetBuilder(TopologyBuilder builder, Map conf)。该方法主要依据配置文件,使用TopologyBuilder构造Topology的spout和bolt。以及spout和bolt之间的关系

    2、TopologyBuilder构造好Topology之后,通过Jstorm Client的StormSubmitter.submitTopology(streamName, conf,builder.createTopology())提交Topology到Jstorm集群,

    3、在StormSubmitter.submitTopology方法中。首先会对配置项进行检查、然后将Topology自己的配置项和Jstorm的配置项组装成一个大的Map。之后上传用户在命令行提交的Jar包,然后通过NimbusClient 的submitTopologyWithOpts(String name, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) 方法将Topology提交到Jstorm集群,其核心代码例如以下:

    if (!Utils.isValidConf(stormConf)) {
    			throw new IllegalArgumentException(
    					"Storm conf is not valid. Must be json-serializable");
    		}
    		stormConf = new HashMap(stormConf);
    		stormConf.putAll(Utils.readCommandLineOpts());
    		Map conf = Utils.readStormConfig();
    		conf.putAll(stormConf);
    		putUserInfo(conf, stormConf);
    		String serConf = JSON.toJSONString(stormConf);
    		if (localNimbus != null) {
    			localNimbus.submitTopology(name, null, serConf, topology);
    		} else {
    			NimbusClient client = NimbusClient.getConfiguredClient(conf);
    			if (topologyNameExists(conf, name)) {//检查名字是否反复,Jstorm要求每一个topology名称必须唯一
    					throw new RuntimeException("Topology with name `" + name
    							+ "` already exists on cluster");
    				}
    			submitJar(conf);//上传Jar包到ZK			
    			client.getClient().submitTopologyWithOpts(name, path,
    								serConf, topology, opts);//通过Thrift将topology提交到集群
    					

    4、NimbusClient提交之后,NimbusSever通过com.alibaba.jstorm.daemon.nimbus.ServiceHandler.submitTopologyWithOpts(String topologyname, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)处理接收到的topology。其详细逻辑例如以下(代码已经精简)

    public void submitTopologyWithOpts(String topologyname,
    			String uploadedJarLocation, String jsonConf,
    			StormTopology topology, SubmitOptions options)
    			throws AlreadyAliveException, InvalidTopologyException,
    			TopologyAssignException, TException {
    		//首先检查topology是否已经存在
    		checkTopologyActive(data, topologyname, false);		
    		//生成topology的唯一标识
    		int counter = data.getSubmittedCount().incrementAndGet();
    		String topologyId = topologyname + "-" + counter + "-"
    				+ TimeUtils.current_time_secs();
    		try {
    			//反序列化topology配置项
    			Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils
    					.from_json(jsonConf);
    			if (serializedConf == null) {
    				LOG.warn("Failed to serialized Configuration");
    				throw new InvalidTopologyException(
    						"Failed to serilaze topology configuration");
    			}
    			//将topology的名称和ID添加到配置项中
    			serializedConf.put(Config.TOPOLOGY_ID, topologyId);
    			serializedConf.put(Config.TOPOLOGY_NAME, topologyname);			
    			Map<Object, Object> stormConf;
    			stormConf = NimbusUtils.normalizeConf(conf, serializedConf,
    					topology);
    			Map<Object, Object> totalStormConf = new HashMap<Object, Object>(
    					conf);
    			totalStormConf.putAll(stormConf);
    			StormTopology normalizedTopology = NimbusUtils.normalizeTopology(
    					stormConf, topology);
    
    			// this validates the structure of the topology
    			Common.validate_basic(normalizedTopology, totalStormConf,
    					topologyId);
    			// don't need generate real topology, so skip Common.system_topology
    			// Common.system_topology(totalStormConf, topology);
    			StormClusterState stormClusterState = data.getStormClusterState();
    			// 创建topology在ZK上的文件夹
    			setupStormCode(conf, topologyId, uploadedJarLocation, stormConf,
    					normalizedTopology);
    
    			// 为每个spout或者bolt生成Task,并在ZK上创建对应的task文件夹<span style="font-family: Arial, Helvetica, sans-serif;">/ZK/tasks/topoologyId/xxx</span>
    			setupZkTaskInfo(conf, topologyId, stormClusterState);
    			// 进行任务分配
    			TopologyAssignEvent assignEvent = new TopologyAssignEvent();
    			assignEvent.setTopologyId(topologyId);
    			assignEvent.setScratch(false);
    			assignEvent.setTopologyName(topologyname);
    			assignEvent.setOldStatus(Thrift
    					.topologyInitialStatusToStormStatus(options
    							.get_initial_status()));
    
    			TopologyAssign.push(assignEvent);
    			LOG.info("Submit for " + topologyname + " with conf "
    					+ serializedConf);
    
    			boolean isSuccess = assignEvent.waitFinish();
    			if (isSuccess == true) {
    				LOG.info("Finish submit for " + topologyname);
    			} 


  • 相关阅读:
    mysql数据库安装与配置
    redis主从配置+sentinel哨兵模式
    Oracle 本地验证和密码文件
    Oracle 12c hub和leaf的转换
    oracle 12c CPU资源隔离
    oracle12 listagg 与 wm_concat行列转换
    Oracle 12c rac搭建
    ClassLoader.loadClass()与Class.forName()的区别《 转》
    docker 安装mysql8.0
    spring boot @EnableWebMvc禁用springMvc自动配置原理。
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/5184749.html
Copyright © 2011-2022 走看看