1. 实例的开启与关闭
var zkClient: ZooKeeper = null try { zkClient = new ZooKeeper(getZkUrl(), 2000, new Watcher { override def process(watchedEvent: WatchedEvent): Unit = {} }) val data = zkClient.getData(s"/hadoop-ha/" + hdfsNameServiceHost + s"/ActiveBreadCrumb", new Watcher { override def process(watchedEvent: WatchedEvent): Unit = {} }, new Stat()) val activeNodeInfo = HAZKInfoProtos.ActiveNodeInfo.parseFrom(data) activeNodeInfo.getHostname } catch { case ex: Exception => { logError(s"ERROR: zookeeper connect failure, can not get the active hdfs namenode according to ${hdfsNameServiceHost}", ex) "" } }finally { if (zkClient != null){ zkClient.close() } }
如上述,代码 zkClient 在try中开启,不能再在try中关闭,因为发生异常的时候是关闭不掉的,切记切记切记!!!必须在finally中zkClient.close()才能把zkClient关闭,不这样做的后果就是会不断报连接zkserver失败
2. 异常捕获
2.1 每个方法都需要try catch,内或外的区别
每个方法里面或者外面都需要try catch。不然会导致程序挂死,占用资源。
2.1 异常后的代码不会执行
注意:try catch里面需要代码异常之后,异常代码剩下的代码将不会执行,如下示例:
try{ assert(argsCheck(args)) if (dbName != null && dbName != "") { logInfo(s"use $dbName") sparkSession.sql(s"use $dbName") } processSparkPara run sparkSession.stop() result = true // determine if the taskVars json file exists, if not write the file, if exists overwrite the file val taskVarsJson = JsonProtocal.taskVarsToJson(TaskVars(systemVars, runtimeVars)) logInfo(s"write taskVarsJsonStr:$taskVarsJson") // if (!isLocal){ HDFSUtil.writeHDFSFile(taskVarsJsonPath, taskVarsJson) // }else{ // createLocalFile(taskVarsJsonPath, taskVarsJson) // } }catch { case e: Throwable => { logError(s"== process failed, please check.",e) cause = e.getMessage result = false } }
示例中的run方法发生异常之后,taskVarsJson文件将不会生成,而这将会导致后续的程序异常。如果taskVarsJson文件是程序所必须的,则不能放在try catch里面
try{ assert(argsCheck(args)) if (dbName != null && dbName != "") { logInfo(s"use $dbName") sparkSession.sql(s"use $dbName") } processSparkPara run sparkSession.stop() result = true }catch { case e: Throwable => { logError(s"== process failed, please check.",e) cause = e.getMessage result = false } } // determine if the taskVars json file exists, if not write the file, if exists overwrite the file val taskVarsJson = JsonProtocal.taskVarsToJson(TaskVars(systemVars, runtimeVars)) logInfo(s"write taskVarsJsonStr:$taskVarsJson") // if (!isLocal){ HDFSUtil.writeHDFSFile(taskVarsJsonPath, taskVarsJson) // }else{ // createLocalFile(taskVarsJsonPath, taskVarsJson) // }
3. 测试规范
测试成功,测试不成功两种。不要只测试成功的,遗留不成功,程序不成功也会导致程序线程挂死(占用资源)、不断打印连接不上日志等等异常情况。好像给别人造成不少麻烦...
4. scala的 case class 类构造优化
原代码: case class Desc(name:String) val taskDesc = TaskDesc("hello") 类扩展: case class Desc(name:String, config: Config) val taskDesc = TaskDesc("hello", null)
评:上述代码扩展性不好,需要修改原来的老代码。参考如下设计,可以在扩展类的时候,避免修改其他老代码:
原代码: case class Desc(name:String) val taskDesc = TaskDesc("hello") 类扩展: case class Desc(name:String, config: Config = null) val taskDesc = TaskDesc("hello)
5. File文件与文件夹, 软连接路径与真实路径
/** * 清除平台本地目录、HDFS目录前n天冗余日志文件 */ def cleanExpiredLog(): Unit ={ logInfo("clean expired log start") try{ val lifeCycleConfig = new LifeCycleConfig var lifeCycle = lifeCycleConfig.getRunInfoLifeCycle if (lifeCycle < 1) lifeCycle = 3 val sdfDay = new java.text.SimpleDateFormat("yyyy-MM-dd") val strDay = sdfDay.format(System.currentTimeMillis() - lifeCycle * 24 * 60 * 60 * 1000) val tasksFile = new File(FileUtil.getParentPath + File.separator + TaskPath.taskPath + File.separator) val tasksSubFiles = tasksFile.listFiles() for(taskFile <- tasksSubFiles){ // 筛选task任务目录下过期的文件夹 val filterFiles = filterExpiredLog(taskFile, strDay) // 删除过期日志 deleteExpiredLog(filterFiles) } }catch{ case ex: Exception => logError("clean expired log error, " + ex.getMessage) } logInfo("clean expired log end") } /** * 根据strDay过滤过期的日志 * @param taskFile 任务文件 * @param strDay 指定需要过滤的日期 * @return */ private def filterExpiredLog(taskFile: File, strDay: String): Array[File] ={ val filterFiles = taskFile.listFiles(new FileFilter { override def accept(pathname: File): Boolean = { if ((pathname.getName.compareTo(strDay)) <= 0 ){ return true }else{ return false } } }) filterFiles } /** * 删除本地和HDFS上的日志 * @param filterFiles */ private def deleteExpiredLog(filterFiles: Array[File]): Unit ={ if (filterFiles.size > 0){ for (file <- filterFiles){ logDebug("need to clean expired log: " + file.getAbsoluteFile) val strDayLogPath = file.getAbsolutePath val remoteStrDayLogPath = FileUtil.getSpecifiedInterception(":", strDayLogPath) // 本地删除日志目录 FileUtil.deleteFile(file) // HDFS删除日志目录 HDFSUtil.deleteHDFSFile(remoteStrDayLogPath) } } }
评:对文件进行操作时,需判断该文件父类是文件还是文件夹,而且如果是根据统一路径在deleteExpiredLog删除本地日志和HDFS日志的话,需要考虑该路径是否为完整路径(即真实路径)
def cleanExpiredLog(pathname: String, task: String): Unit ={ System.out.println("clean expired log start") try{ var lifeCycle = 3 if (lifeCycle < 1) lifeCycle = 3 val sdfDay = new java.text.SimpleDateFormat("yyyy-MM-dd") val strDay = sdfDay.format(System.currentTimeMillis() - lifeCycle * 24 * 60 * 60 * 1000) val tasksFile = new File(pathname + File.separator + task + File.separator) //getCanonicalPath为获取完整路径,因为上述路径可能是软连接路径 val tasksCanonicalFile = new File(tasksFile.getCanonicalPath) val tasksSubFiles = tasksCanonicalFile.listFiles() for(taskFile <- tasksSubFiles){ //这里需要判断文件还是文件夹,否则文件的listFiles会报错 if (tasksFile.isDirectory){ // 筛选task任务目录下过期的文件夹 val filterFiles = filterExpiredLog(taskFile, strDay) // 删除过期日志 deleteExpiredLog(filterFiles) } } }catch{ case ex: Exception => System.out.println("clean expired log error, " + ex.getMessage) } System.out.println("clean expired log end") }