zoukankan      html  css  js  c++  java
  • Spark延长SparkContext初始化时间

    有些应用中可能希望先在driver上运行一段java单机程序,然后再初始化SparkContext用集群模式操作java程序返回值。从而避免过早建立SparkContext对象分配集群资源,使资源长时间空闲。

    这里涉及到两个yarn参数:

      <property> 
        <name>yarn.am.liveness-monitor.expiry-interval-ms</name>  
        <value>6000000</value> 
      </property>
       <property> 
        <name>yarn.resourcemanager.am.max-retries</name>  
        <value>10</value> 
      </property>

    Yarn会周期性遍历所有的ApplicationMaster,如果一个ApplicationMaster在一定时间(可通过参数yarn.am.liveness-monitor.expiry-interval-ms配置,默认为10min)内未汇报心跳信息,则认为它死掉了,它上面所有正在运行的Container将被置为运行失败(RM不会重新执行这些Container,它只会通过心跳机制告诉对应的AM,由AM决定是否重新执行,如果需要,则AM重新向RM申请资源),AM本身会被重新分配到另外一个节点上(管理员可通过参数yarn.resourcemanager.am.max-retries指定每个ApplicationMaster的尝试次数,默认是1次)执行。

    还需要两个spark参数:

    <property> 
        <name>spark.yarn.am.waitTime</name>  
        <value>6000000</value> 
      </property>
       <property> 
        <name>spark.yarn.applicationMaster.waitTries</name>  
        <value>200</value> 
      </property>

    集群管理

    Spark On YARN

    属性名称默认值含义
    spark.yarn.scheduler.heartbeat.interval-ms 5000 Spark AppMaster发送心跳信息给YARN RM的时间间隔
    spark.yarn.am.waitTime 100000 启动时等待时间
    spark.yarn.applicationMaster.waitTries 10 RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败

    下面是一个测试用例,现在driver打印30分钟的信息,然后再初始化SparkContext

    import iie.udps.common.hcatalog.SerHCatInputFormat;
    import iie.udps.common.hcatalog.SerHCatOutputFormat;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hive.hcatalog.data.DefaultHCatRecord;
    import org.apache.hive.hcatalog.data.HCatRecord;
    import org.apache.hive.hcatalog.data.schema.HCatSchema;
    import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
    import org.apache.spark.SerializableWritable;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.Function2;
    import scala.Tuple2;
    
    /**
     * 实现功能:首先在driver上单机打印30分钟数据,然后初始化SparkContext开启集群模式,用spark+hcatlog 读hive表数据,实现GroupByAge功能,
     * 输出结果到hive表中,同时打印xml信息到hdfs文件。
     * spark-submit --class iie.udps.example.spark.SparkTest --master yarn-cluster 
     * --num-executors 2 --executor-memory 1g --executor-cores 1 --driver-memory 1g 
     * --conf spark.yarn.applicationMaster.waitTries=200,--conf spark.yarn.am.waitTime=1800000 --jars /home/xdf/udps-sdk-0.3.jar,/home/xdf/udps-sdk-0.3.jar
     *  /home/xdf/sparktest.jar -c /user/hdfs/TestStdin2.xml
     */
    public class SparkTest {
    
    	@SuppressWarnings("rawtypes")
    	public static void main(String[] args) throws Exception {
    		if (args.length < 2) {
    			System.err.println("Usage: <-c> <stdin.xml>");
    			System.exit(1);
    		}
    		
    		String stdinXml = args[1];
    		OperatorParamXml operXML = new OperatorParamXml();
    		List<java.util.Map> stdinList = operXML.parseStdinXml(stdinXml);// 参数列表
    
    		// 获得输入参数
    		String inputDBName = stdinList.get(0).get("inputDBName").toString();
    		String inputTabName = stdinList.get(0).get("inputTabName").toString();
    		String outputDBName = stdinList.get(0).get("outputDBName").toString();
    		String outputTabName = stdinList.get(0).get("outputTabName").toString();
    		String tempHdfsBasePath = stdinList.get(0).get("tempHdfsBasePath")
    				.toString();
    		String jobinstanceid = stdinList.get(0).get("jobinstanceid").toString();
    		
    		System.out.println(inputDBName+": "+ inputTabName +": "+outputDBName+": "+ outputTabName
    				+": "+ tempHdfsBasePath+": "+ jobinstanceid);
    
    		long begin = System.currentTimeMillis(); 
    		int count = 600;// 写文件行数
    		for (int i = 0; i < count; i++) {
    			System.out.println("aaaaaaaaaaaaaaa"+i);
    			Thread.sleep(3000);
    		}
    		long end = System.currentTimeMillis();   
            System.out.println("FileOutputStream执行耗时:" + (end - begin) + "ms");   
    		
    		
    		if (inputDBName == "" || inputTabName == "" || jobinstanceid == ""
    				|| outputDBName == "" || outputTabName == ""
    				|| tempHdfsBasePath == "" || jobinstanceid == "") {
    
    			// 设置异常输出参数
    			java.util.Map<String, String> stderrMap = new HashMap<String, String>();
    			String errorMessage = "Some operating parameters is empty!!!";
    			String errotCode = "80001";
    			stderrMap.put("errorMessage", errorMessage);
    			stderrMap.put("errotCode", errotCode);
    			stderrMap.put("jobinstanceid", jobinstanceid);
    			String fileName = "";
    			if (tempHdfsBasePath.endsWith("/")) {
    				fileName = tempHdfsBasePath + "stderr.xml";
    			} else {
    				fileName = tempHdfsBasePath + "/stderr.xml";
    			}
    			
    			// 生成异常输出文件
    			operXML.genStderrXml(fileName, stderrMap);
    		} else {			
    			// 根据输入表结构,创建与输入表同样结构的输出表
    			HCatSchema schema = operXML
    					.getHCatSchema(inputDBName, inputTabName);
    
    			// Spark程序第一件事情就是创建一个JavaSparkContext告诉Spark怎么连接集群
    			SparkConf sparkConf = new SparkConf().setAppName("SparkExample");
    			
    			JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    			
    			// 读取并处理hive表中的数据,生成RDD数据并处理后返回
    			JavaRDD<SerializableWritable<HCatRecord>> LastRDD = getProcessedData(
    					jsc, inputDBName, inputTabName, schema);
    			
    			// 将处理后的数据存到hive输出表中
    			storeToTable(LastRDD, outputDBName, outputTabName);
    
    			jsc.stop();
    
    			// 设置正常输出参数
    			java.util.Map<String, String> stdoutMap = new HashMap<String, String>();
    			stdoutMap.put("outputDBName", outputDBName);
    			stdoutMap.put("outputTabName", outputTabName);
    			stdoutMap.put("jobinstanceid", jobinstanceid);
    			String fileName = "";
    			if (tempHdfsBasePath.endsWith("/")) {
    				fileName = tempHdfsBasePath + "stdout.xml";
    			} else {
    				fileName = tempHdfsBasePath + "/stdout.xml";
    			}
    			
    			// 生成正常输出文件
    			operXML.genStdoutXml(fileName, stdoutMap);
    		}
    		System.out.println(inputDBName+": "+ inputTabName +": "+outputDBName+": "+ outputTabName
    				+": "+ tempHdfsBasePath+": "+ jobinstanceid);
    		System.exit(0);
    	}
    
    	/**
    	 * 
    	 * @param jsc
    	 * @param dbName
    	 * @param inputTable
    	 * @param fieldPosition
    	 * @return
    	 * @throws IOException
    	 */
    	@SuppressWarnings("rawtypes")
    	public static JavaRDD<SerializableWritable<HCatRecord>> getProcessedData(
    			JavaSparkContext jsc, String dbName, String inputTable,
    			final HCatSchema schema) throws IOException {
    		// 获取hive表数据
    		Configuration inputConf = new Configuration();
    		Job job = Job.getInstance(inputConf);
    		SerHCatInputFormat.setInput(job.getConfiguration(), dbName, inputTable);
    		JavaPairRDD<WritableComparable, SerializableWritable> rdd = jsc
    				.newAPIHadoopRDD(job.getConfiguration(),
    						SerHCatInputFormat.class, WritableComparable.class,
    						SerializableWritable.class);
    
    		// 获取表记录集
    		JavaPairRDD<Integer, Integer> pairs = rdd
    				.mapToPair(new PairFunction<Tuple2<WritableComparable, SerializableWritable>, Integer, Integer>() {
    					private static final long serialVersionUID = 1L;
    
    					@SuppressWarnings("unchecked")
    					@Override
    					public Tuple2<Integer, Integer> call(
    							Tuple2<WritableComparable, SerializableWritable> value)
    							throws Exception {
    						HCatRecord record = (HCatRecord) value._2.value();
    						return new Tuple2((Integer) record.get(1), 1);
    					}
    				});
    
    		JavaPairRDD<Integer, Integer> counts = pairs
    				.reduceByKey(new Function2<Integer, Integer, Integer>() {
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Integer call(Integer i1, Integer i2) {
    						return i1 + i2;
    					}
    				});
    
    		JavaRDD<SerializableWritable<HCatRecord>> messageRDD = counts
    				.map(new Function<Tuple2<Integer, Integer>, SerializableWritable<HCatRecord>>() {
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public SerializableWritable<HCatRecord> call(
    							Tuple2<Integer, Integer> arg0) throws Exception {
    						HCatRecord record = new DefaultHCatRecord(2);
    						record.set(0, arg0._1);
    						record.set(1, arg0._2);
    						return new SerializableWritable<HCatRecord>(record);
    					}
    				});
    		// 返回处理后的数据
    		return messageRDD;
    	}
    
    	/**
    	 * 将处理后的数据存到输出表中
    	 * 
    	 * @param rdd
    	 * @param dbName
    	 * @param tblName
    	 */
    	@SuppressWarnings("rawtypes")
    	public static void storeToTable(
    			JavaRDD<SerializableWritable<HCatRecord>> rdd, String dbName,
    			String tblName) {
    		Job outputJob = null;
    		try {
    			outputJob = Job.getInstance();
    			outputJob.setJobName("SparkExample");
    			outputJob.setOutputFormatClass(SerHCatOutputFormat.class);
    			outputJob.setOutputKeyClass(WritableComparable.class);
    			outputJob.setOutputValueClass(SerializableWritable.class);
    			SerHCatOutputFormat.setOutput(outputJob,
    					OutputJobInfo.create(dbName, tblName, null));
    			HCatSchema schema = SerHCatOutputFormat
    					.getTableSchemaWithPart(outputJob.getConfiguration());
    			SerHCatOutputFormat.setSchema(outputJob, schema);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    
    		// 将RDD存储到目标表中
    		rdd.mapToPair(
    				new PairFunction<SerializableWritable<HCatRecord>, WritableComparable, SerializableWritable<HCatRecord>>() {
    					private static final long serialVersionUID = -4658431554556766962L;
    
    					public Tuple2<WritableComparable, SerializableWritable<HCatRecord>> call(
    							SerializableWritable<HCatRecord> record)
    							throws Exception {
    						return new Tuple2<WritableComparable, SerializableWritable<HCatRecord>>(
    								NullWritable.get(), record);
    					}
    				}).saveAsNewAPIHadoopDataset(outputJob.getConfiguration());
    
    	}
    	
    
    }
     
    
    

    输入表数据:

    hive> select * from test_in; 
    OK
    120
    220
    321
    420
    521
    620
    721
    819
    919
    1021

    输出表数据:

    hive> select * from test_out;
    OK
    192
    214
    204
    

     

  • 相关阅读:
    打sql server pack4后打开网站报错的解决办法
    北京大学的三角形文章
    一次SQL Server 2000修复实践的说明
    今天重看了几集《将爱情进行到底》
    MakeFile的写法
    [经验杂谈]与大虾对话:领悟设计模式zz
    论函数调用约定(zz)
    用标准模板库STL实现文件比较(zz)
    C++中的虚函数(virtual function)
    为学院科研办做的个小应用管理程序
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5027171.html
Copyright © 2011-2022 走看看