zoukankan      html  css  js  c++  java
  • 改造xxl-job的客户端日志文件生成体系

    为什么要改造XXL-JOB原有的日志文件生成体系

       xxl-job原本自己的客户端日志文件生成策略是:一个日志记录就生成一个文件,也就是当数据库存在一条日志logId,对应的客户端就会生成一个文件,由于定时任务跑批很多,并且有些任务间隔时间很短,比如几秒触发一次,这样的结果就是客户端会生成大量的文件,但是每个文件的内容其实不多,但大量的单独文件相比会占用更多的磁盘,造成磁盘资源紧张,同时对文件系统性能存在影响,长久以来,就会触发资源报警,所以,如果不想经常去清理日志文件的话,那么将零碎的文件通过某种方式进行整合就显得迫切需要了。

    本文篇幅较长,代码涉及较多啦~

    改造后的日志文件生成策略

    基本描述

       减少日志文件个数,其实就是要将分散的日志文件进行合并,同时建立外部索引文件维护各自当次日志Id所对应的起始日志内容。在读取的时候先通过读取索引文件,进而再读取真实的日志内容。

       下面是日志文件描述分析图:

    • 对于一个执行器,在执行器下的所有的定时任务每天只生成一个logId_jobId_index.log的索引文件,用来维护日志Id与任务Id的对应关系;

    • 对于一个定时任务,每天只生成一个jobId.log的日志内容文件,里面保存当天所有的日志内容;

    • 对于日志索引文件jobId_index.log,用来维护日志内容中的索引,便于知道jobId.log中对应行数是属于哪个logId的,便于查找。

       大致思路就是如此,可以预期,日志文件大大的减少,磁盘占用应该会得到改善.

       笔者改造的这个功能已经在线上跑了快接近一年了,目前暂未出现问题,如有需要可结合自身业务进行相应的调整与改造,以及认真测试,以防未知错误。

    代码实操

    本文改造是基于XXL-JOB 1.8.2 版本改造,其他版本暂未实验

       打开代码目录xxl-job-core模块,主要涉及以下几个文件的改动:

    • XxlJobFileAppender.java
    • XxlJobLogger.java
    • JobThread.java
    • ExecutorBizImpl.java
    • LRUCacheUtil.java

    XxlJobFileAppender.java

       代码中部分原有未涉及改动的方法此处不再粘贴。

    package com.xxl.job.core.log;
    
    import com.xxl.job.core.biz.model.LogResult;
    import com.xxl.job.core.util.LRUCacheUtil;
    import org.apache.commons.io.FilenameUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.StringUtils;
    
    import java.io.*;
    import java.text.DecimalFormat;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;
    
    /**
     * store trigger log in each log-file
     * @author xuxueli 2016-3-12 19:25:12
     */
    public class XxlJobFileAppender {
    	private static Logger logger = LoggerFactory.getLogger(XxlJobFileAppender.class);
    
    	// for JobThread (support log for child thread of job handler)
    	//public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
    	public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<String>();
    	// for logId,记录日志Id
    	public static final InheritableThreadLocal<Integer> contextHolderLogId = new InheritableThreadLocal<>();
    	// for JobId,定时任务的Id
    	public static final InheritableThreadLocal<Integer> contextHolderJobId = new InheritableThreadLocal<>();
    
    	// 使用一个缓存map集合来存取索引偏移量信息
    	public static final LRUCacheUtil<Integer, Map<String ,Long>> indexOffsetCacheMap = new LRUCacheUtil<>(80);
    
    	private static final String DATE_FOMATE = "yyyy-MM-dd";
    
    	private static final String UTF_8 = "utf-8";
    
    	// 文件名后缀
    	private static final String FILE_SUFFIX = ".log";
    
    	private static final String INDEX_SUFFIX = "_index";
    
    	private static final String LOGID_JOBID_INDEX_SUFFIX = "logId_jobId_index";
    
    	private static final String jobLogIndexKey = "jobLogIndexOffset";
    
    	private static final String indexOffsetKey = "indexOffset";
    
    	/**
    	 * log base path
    	 *
    	 * strut like:
    	 * 	---/
    	 * 	---/gluesource/
    	 * 	---/gluesource/10_1514171108000.js
    	 * 	---/gluesource/10_1514171108000.js
    	 * 	---/2017-12-25/
    	 * 	---/2017-12-25/639.log
    	 * 	---/2017-12-25/821.log
    	 *
    	 */
    	private static String logBasePath = "/data/applogs/xxl-job/jobhandler";
    	private static String glueSrcPath = logBasePath.concat("/gluesource");
    	public static void initLogPath(String logPath){
    		// init
    		if (logPath!=null && logPath.trim().length()>0) {
    			logBasePath = logPath;
    		}
    		// mk base dir
    		File logPathDir = new File(logBasePath);
    		if (!logPathDir.exists()) {
    			logPathDir.mkdirs();
    		}
    		logBasePath = logPathDir.getPath();
    
    		// mk glue dir
    		File glueBaseDir = new File(logPathDir, "gluesource");
    		if (!glueBaseDir.exists()) {
    			glueBaseDir.mkdirs();
    		}
    		glueSrcPath = glueBaseDir.getPath();
    	}
    	public static String getLogPath() {
    		return logBasePath;
    	}
    	public static String getGlueSrcPath() {
    		return glueSrcPath;
    	}
    
    	/**
    	 * 重写生成日志目录和日志文件名的方法:
    	 * log filename,like "logPath/yyyy-MM-dd/jobId.log"
    	 * @param triggerDate
    	 * @param jobId
    	 * @return
    	 */
    	public static String makeLogFileNameByJobId(Date triggerDate, int jobId) {
    
    		// filePath/yyyy-MM-dd
    		// avoid concurrent problem, can not be static
    		SimpleDateFormat sdf = new SimpleDateFormat(DATE_FOMATE);
    		File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
    		if (!logFilePath.exists()) {
    			logFilePath.mkdir();
    		}
    		// 生成日志索引文件
    		String logIndexFileName = logFilePath.getPath()
    				.concat("/")
    				.concat(String.valueOf(jobId))
    				.concat(INDEX_SUFFIX)
    				.concat(FILE_SUFFIX);
    		File logIndexFilePath = new File(logIndexFileName);
    		if (!logIndexFilePath.exists()) {
    			try {
    				logIndexFilePath.createNewFile();
    				logger.debug("生成日志索引文件,文件路径:{}", logIndexFilePath);
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
    		// 在yyyy-MM-dd文件夹下生成当天的logId对应的jobId的全局索引,减少对后管的修改
    		String logIdJobIdIndexFileName = logFilePath.getPath()
    				.concat("/")
    				.concat(LOGID_JOBID_INDEX_SUFFIX)
    				.concat(FILE_SUFFIX);
    		File logIdJobIdIndexFileNamePath = new File(logIdJobIdIndexFileName);
    		if (!logIdJobIdIndexFileNamePath.exists()) {
    			try {
    				logIdJobIdIndexFileNamePath.createNewFile();
    				logger.debug("生成logId与jobId的索引文件,文件路径:{}", logIdJobIdIndexFileNamePath);
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
    		// filePath/yyyy-MM-dd/jobId.log log日志文件
    		String logFileName = logFilePath.getPath()
    				.concat("/")
    				.concat(String.valueOf(jobId))
    				.concat(FILE_SUFFIX);
    		return logFileName;
    	}
    
    
    	/**
    	 * 后管平台读取详细日志查看,生成文件名
    	 * admin read log, generate logFileName bu logId
    	 * @param triggerDate
    	 * @param logId
    	 * @return
    	 */
    	public static String makeFileNameForReadLog(Date triggerDate, int logId) {
    		// filePath/yyyy-MM-dd
    		SimpleDateFormat sdf = new SimpleDateFormat(DATE_FOMATE);
    		File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
    		if (!logFilePath.exists()) {
    			logFilePath.mkdir();
    		}
    		String logIdJobIdFileName = logFilePath.getPath().concat("/")
    				.concat(LOGID_JOBID_INDEX_SUFFIX)
    				.concat(FILE_SUFFIX);
    		// find logId->jobId mapping
    		// 获取索引映射
    		String infoLine = readIndex(logIdJobIdFileName, logId);
    		String[] arr = infoLine.split("->");
    		int jobId = 0;
    		try {
    			jobId = Integer.parseInt(arr[1]);
    		} catch (Exception e) {
    			logger.error("makeFileNameForReadLog StringArrayException,{},{}", e.getMessage(), e);
    			throw new RuntimeException("StringArrayException");
    		}
    		String logFileName = logFilePath.getPath().concat("/")
    				.concat(String.valueOf(jobId)).concat(FILE_SUFFIX);
    		return logFileName;
    	}
    
    	/**
    	 * 向日志文件中追加内容,向索引文件中追加索引
    	 * append log
    	 * @param logFileName
    	 * @param appendLog
    	 */
    	public static void appendLogAndIndex(String logFileName, String appendLog) {
    
    		// log file
    		if (logFileName == null || logFileName.trim().length() == 0) {
    			return;
    		}
    		File logFile = new File(logFileName);
    
    		if (!logFile.exists()) {
    			try {
    				logFile.createNewFile();
    			} catch (Exception e) {
    				logger.error(e.getMessage(), e);
    				return;
    			}
    		}
    		// start append, count line num
    		long startLineNum = countFileLineNum(logFileName);
    		logger.debug("开始追加日志文件,开始行数:{}", startLineNum);
    		// log
    		if (appendLog == null) {
    			appendLog = "";
    		}
    		appendLog += "
    ";
    
    		// append file content
    		try {
    			FileOutputStream fos = null;
    			try {
    				fos = new FileOutputStream(logFile, true);
    				fos.write(appendLog.getBytes("utf-8"));
    				fos.flush();
    			} finally {
    				if (fos != null) {
    					try {
    						fos.close();
    					} catch (IOException e) {
    						logger.error(e.getMessage(), e);
    					}
    				}
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage(), e);
    		}
    		// end append, count line num,再次计数
    		long endLineNum = countFileLineNum(logFileName);
    		Long lengthTmp = endLineNum - startLineNum;
    		int length = 0;
    		try {
    			length = lengthTmp.intValue();
    		} catch (Exception e) {
    			logger.error("Long to int Exception", e);
    		}
    		logger.debug("结束追加日志文件,结束行数:{}, 长度:{}", endLineNum, length);
    		Map<String, Long> indexOffsetMap = new HashMap<>();
    		appendIndexLog(logFileName, startLineNum, length, indexOffsetMap);
    		appendLogIdJobIdFile(logFileName, indexOffsetMap);
    	}
    
    
    	/**
    	 * 创建日志Id与JobId的映射关系
    	 * @param logFileName
    	 * @param indexOffsetMap
    	 */
    	public static void appendLogIdJobIdFile(String logFileName, Map indexOffsetMap) {
    		// 获取ThreadLocal中变量保存的值
    		int logId = XxlJobFileAppender.contextHolderLogId.get();
    		int jobId = XxlJobFileAppender.contextHolderJobId.get();
    		File file = new File(logFileName);
    		// 获取父级目录,寻找同文件夹下的索引文件
    		String parentDirName = file.getParent();
    		// logId_jobId_index fileName
    		String logIdJobIdIndexFileName = parentDirName.concat("/")
    				.concat(LOGID_JOBID_INDEX_SUFFIX)
    				.concat(FILE_SUFFIX);
    		// 从缓存中获取logId
    		boolean jobLogIndexOffsetExist = indexOffsetCacheMap.exists(logId);
    		Long jobLogIndexOffset = null;
    		if (jobLogIndexOffsetExist) {
    			jobLogIndexOffset = indexOffsetCacheMap.get(logId).get(jobLogIndexKey);
    		}
    		if (jobLogIndexOffset == null) {
    			// 为空则添加
    			StringBuffer stringBuffer = new StringBuffer();
    			stringBuffer.append(logId).append("->").append(jobId).append("
    ");
    			Long currentPoint = getAfterAppendIndexLog(logIdJobIdIndexFileName, stringBuffer.toString());
    			indexOffsetMap.put(jobLogIndexKey, currentPoint);
    			indexOffsetCacheMap.save(logId, indexOffsetMap);
    		}
    		// 不为空说明缓存已经存在,不做其他处理
    	}
    
    	/**
    	 * 追加索引文件内容,返回偏移量
    	 * @param fileName
    	 * @param content
    	 * @return
    	 */
    	private static Long getAfterAppendIndexLog(String fileName, String content) {
    		RandomAccessFile raf = null;
    		Long point = null;
    		try {
    			raf = new RandomAccessFile(fileName, "rw");
    			long end = raf.length();
    			// 因为是追加内容,所以将指针放入到文件的末尾
    			raf.seek(end);
    			raf.writeBytes(content);
    			// 获取当前的指针偏移量
    			/**
    			 * 偏移量放入到缓存变量中:注意,此处获取偏移量,是获取开始的地方的偏移量,不能再追加内容后再获取,否则会获取到结束
    			 * 时的偏移量
    			 */
    			point = end;
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    		} finally {
    			try {
    				raf.close();
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
    		return point;
    	}
    
    	/**
    	 * 追加索引日志,like "345->(577,10)"
    	 * @param logFileName
    	 * @param from
    	 * @param length
    	 * @param indexOffsetMap
    	 */
    	public static void appendIndexLog(String logFileName, Long from, int length, Map indexOffsetMap) {
    		int strLength = logFileName.length();
    		// 通过截取获取索引文件名
    		String prefixFilePath = logFileName.substring(0, strLength - 4);
    		String logIndexFilePath = prefixFilePath.concat(INDEX_SUFFIX).concat(FILE_SUFFIX);
    		File logIndexFile = new File(logIndexFilePath);
    		if (!logIndexFile.exists()) {
    			try {
    				logIndexFile.createNewFile();
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    				return;
    			}
    		}
    		int logId = XxlJobFileAppender.contextHolderLogId.get();
    
    		StringBuffer stringBuffer = new StringBuffer();
    		// 判断是添加还是修改
    		boolean indexOffsetExist = indexOffsetCacheMap.exists(logId);
    		Long indexOffset = null;
    		if (indexOffsetExist) {
    			indexOffset = indexOffsetCacheMap.get(logId).get(indexOffsetKey);
    		}
    		if (indexOffset == null) {
    			// append
    			String lengthStr = getFormatNum(length);
    			stringBuffer.append(logId).append("->(")
    					.append(from).append(",").append(lengthStr).append(")
    ");
    			// 添加新的索引,记录偏移量
    			Long currentIndexPoint = getAfterAppendIndexLog(logIndexFilePath, stringBuffer.toString());
    			indexOffsetMap.put(indexOffsetKey, currentIndexPoint);
    		} else {
    			String infoLine = getIndexLineIsExist(logIndexFilePath, logId);
    			// 修改索引文件内容
    			int startTmp = infoLine.indexOf("(");
    			int endTmp = infoLine.indexOf(")");
    			String[] lengthTmp = infoLine.substring(startTmp + 1, endTmp).split(",");
    			int lengthTmpInt = 0;
    			try {
    				lengthTmpInt = Integer.parseInt(lengthTmp[1]);
    				from = Long.valueOf(lengthTmp[0]);
    			} catch (Exception e) {
    				logger.error("appendIndexLog StringArrayException,{},{}", e.getMessage(), e);
    				throw new RuntimeException("StringArrayException");
    			}
    			int modifyLength = length + lengthTmpInt;
    			String lengthStr2 = getFormatNum(modifyLength);
    			stringBuffer.append(logId).append("->(")
    					.append(from).append(",").append(lengthStr2).append(")
    ");
    			modifyIndexFileContent(logIndexFilePath, infoLine, stringBuffer.toString());
    		}
    	}
    
    	/**
    	 * handle getFormatNum
    	 * like 5 to 005
    	 * @return
    	 */
    	private static String getFormatNum(int num) {
    		DecimalFormat df = new DecimalFormat("000");
    		String str1 = df.format(num);
    		return str1;
    	}
    
    	/**
    	 * 查询索引是否存在
    	 * @param filePath
    	 * @param logId
    	 * @return
    	 */
    	private static String getIndexLineIsExist(String filePath, int logId) {
    		// 读取索引问价判断是否存在,日志每生成一行就会调用一次,所以索引文件需要将同一个logId对应的进行合并
    		String prefix = logId + "->";
    		Pattern pattern = Pattern.compile(prefix + ".*?");
    		String indexInfoLine = "";
    		RandomAccessFile raf = null;
    		try {
    			raf = new RandomAccessFile(filePath, "rw");
    			String tmpLine = null;
    			// 偏移量
    			boolean indexOffsetExist = indexOffsetCacheMap.exists(logId);
    			Long cachePoint = null;
    			if (indexOffsetExist) {
    				cachePoint = indexOffsetCacheMap.get(logId).get(indexOffsetKey);
    			}
    			if (null == cachePoint) {
    				cachePoint = Long.valueOf(0);
    			}
    			raf.seek(cachePoint);
    			while ((tmpLine = raf.readLine()) != null) {
    				final long point = raf.getFilePointer();
    				boolean matchFlag = pattern.matcher(tmpLine).find();
    				if (matchFlag) {
    					indexInfoLine = tmpLine;
    					break;
    				}
    				cachePoint = point;
    			}
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    		} finally {
    			try {
    				raf.close();
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
    		return indexInfoLine;
    	}
    
    	/**
    	 * 在后管页面上需要查询执行日志的时候,获取索引信息,
    	 * 此处不能与上个通用,因为读取时没有map存取偏移量信息,相对隔离
    	 * @param filePath
    	 * @param logId
    	 * @return
    	 */
    	private static String readIndex(String filePath, int logId) {
    		filePath = FilenameUtils.normalize(filePath);
    		String prefix = logId + "->";
    		Pattern pattern = Pattern.compile(prefix + ".*?");
    		String indexInfoLine = "";
    		BufferedReader bufferedReader = null;
    		try {
    			bufferedReader = new BufferedReader(new FileReader(filePath));
    			String tmpLine = null;
    			while ((tmpLine = bufferedReader.readLine()) != null) {
    				boolean matchFlag = pattern.matcher(tmpLine).find();
    				if (matchFlag) {
    					indexInfoLine = tmpLine;
    					break;
    				}
    			}
    			bufferedReader.close();
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    		} finally {
    			if (bufferedReader != null) {
    				try {
    					bufferedReader.close();
    				} catch (IOException e) {
    					logger.error(e.getMessage(), e);
    				}
    			}
    		}
    		return indexInfoLine;
    	}
    
    	/**
    	 * 修改 logIndexFile 内容
    	 * @param indexFileName
    	 * @param oldContent
    	 * @param newContent
    	 * @return
    	 */
    	private static boolean modifyIndexFileContent(String indexFileName, String oldContent, String newContent) {
    		RandomAccessFile raf = null;
    		int logId = contextHolderLogId.get();
    		try {
    			raf = new RandomAccessFile(indexFileName, "rw");
    			String tmpLine = null;
    			// 偏移量
    			boolean indexOffsetExist = indexOffsetCacheMap.exists(logId);
    			Long cachePoint = null;
    			if (indexOffsetExist) {
    				cachePoint = indexOffsetCacheMap.get(logId).get(indexOffsetKey);
    			}
    			if (null == cachePoint) {
    				cachePoint = Long.valueOf(0);
    			}
    			raf.seek(cachePoint);
    			while ((tmpLine = raf.readLine()) != null) {
    				final long point = raf.getFilePointer();
    				if (tmpLine.contains(oldContent)) {
    					String str = tmpLine.replace(oldContent, newContent);
    					raf.seek(cachePoint);
    					raf.writeBytes(str);
    				}
    				cachePoint = point;
    			}
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    		} finally {
    			try {
    				raf.close();
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
    		return true;
    	}
    
    	/**
    	 * 统计文件内容行数
    	 * @param logFileName
    	 * @return
    	 */
    	private static long countFileLineNum(String logFileName) {
    		File file = new File(logFileName);
    		if (file.exists()) {
    			try {
    				FileReader fileReader = new FileReader(file);
    				LineNumberReader lineNumberReader = new LineNumberReader(fileReader);
    				lineNumberReader.skip(Long.MAX_VALUE);
    				// getLineNumber() 从0开始计数,所以加1
    				long totalLines = lineNumberReader.getLineNumber() + 1;
    				fileReader.close();
    				lineNumberReader.close();
    				return totalLines;
    			} catch (IOException e) {
    				logger.error(e.getMessage(), e);
    			}
    		}
    		return 0;
    	}
    
    
    	/**
    	 * 重写读取日志:1. 读取logIndexFile;2.logFile
    	 * @param logFileName
    	 * @param logId
    	 * @param fromLineNum
    	 * @return
    	 */
    	public static LogResult readLogByIndex(String logFileName, int logId, int fromLineNum) {
    		int strLength = logFileName.length();
    		// 获取文件名前缀出去.log
    		String prefixFilePath = logFileName.substring(0, strLength-4);
    		String logIndexFilePath = prefixFilePath.concat(INDEX_SUFFIX).concat(FILE_SUFFIX);
    		// valid logIndex file
    		if (StringUtils.isEmpty(logIndexFilePath)) {
    			return new LogResult(fromLineNum, 0, "readLogByIndex fail, logIndexFile not found", true);
    		}
    		logIndexFilePath = FilenameUtils.normalize(logIndexFilePath);
    		File logIndexFile = new File(logIndexFilePath);
    		if (!logIndexFile.exists()) {
    			return new LogResult(fromLineNum, 0, "readLogByIndex fail, logIndexFile not exists", true);
    		}
    		// valid log file
    		if (StringUtils.isEmpty(logFileName)) {
    			return new LogResult(fromLineNum, 0, "readLogByIndex fail, logFile not found", true);
    		}
    		logFileName = FilenameUtils.normalize(logFileName);
    		File logFile = new File(logFileName);
    		if (!logFile.exists()) {
    			return new LogResult(fromLineNum, 0, "readLogByIndex fail, logFile not exists", true);
    		}
    
    		// read logIndexFile
    		String indexInfo = readIndex(logIndexFilePath, logId);
    		int startNum = 0;
    		int endNum = 0;
    		if (!StringUtils.isEmpty(indexInfo)) {
    			int startTmp = indexInfo.indexOf("(");
    			int endTmp = indexInfo.indexOf(")");
    			String[] fromAndTo = indexInfo.substring(startTmp + 1, endTmp).split(",");
    			try {
    				startNum = Integer.parseInt(fromAndTo[0]);
    				endNum = Integer.parseInt(fromAndTo[1]) + startNum;
    			} catch (Exception e) {
    				logger.error("readLogByIndex StringArrayException,{},{}", e.getMessage(), e);
    				throw new RuntimeException("StringArrayException");
    			}
    		}
    
    		// read File
    		StringBuffer logContentBuffer = new StringBuffer();
    		int toLineNum = 0;
    		LineNumberReader reader = null;
    		try {
    			reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), UTF_8));
    			String line = null;
    
    			while ((line = reader.readLine()) != null) {
    				// [from, to], start as fromNum(logIndexFile)
    				toLineNum = reader.getLineNumber();
    				if (toLineNum >= startNum && toLineNum < endNum) {
    					logContentBuffer.append(line).append("
    ");
    				}
    				// break when read over
    				if (toLineNum >= endNum) {
    					break;
    				}
    			}
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    		} finally {
    			if (reader != null) {
    				try {
    					reader.close();
    				} catch (IOException e) {
    					logger.error(e.getMessage(), e);
    				}
    			}
    		}
    		LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
    		return logResult;
    
    	}
    }
    

    XxlJobLogger.java

    package com.xxl.job.core.log;
    
    import com.xxl.job.core.util.DateUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.slf4j.helpers.FormattingTuple;
    import org.slf4j.helpers.MessageFormatter;
    
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.Date;
    
    /**
     * Created by xuxueli on 17/4/28.
     */
    public class XxlJobLogger {
        private static Logger logger = LoggerFactory.getLogger("xxl-job logger");
    
        /**
         * append log
         *
         * @param callInfo
         * @param appendLog
         */
        private static void logDetail(StackTraceElement callInfo, String appendLog) {
    
    
            /*// "yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log";
            StackTraceElement[] stackTraceElements = new Throwable().getStackTrace();
            StackTraceElement callInfo = stackTraceElements[1];*/
    
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append(DateUtil.formatDateTime(new Date())).append(" ")
                .append("["+ callInfo.getClassName() + "#" + callInfo.getMethodName() +"]").append("-")
                .append("["+ callInfo.getLineNumber() +"]").append("-")
                .append("["+ Thread.currentThread().getName() +"]").append(" ")
                .append(appendLog!=null?appendLog:"");
            String formatAppendLog = stringBuffer.toString();
    
            // appendlog
            String logFileName = XxlJobFileAppender.contextHolder.get();
            if (logFileName!=null && logFileName.trim().length()>0) {
                // XxlJobFileAppender.appendLog(logFileName, formatAppendLog);
    			  // 此处修改方法调用
                // modify appendLogAndIndex for addIndexLogInfo
                XxlJobFileAppender.appendLogAndIndex(logFileName, formatAppendLog);
            } else {
                logger.info(">>>>>>>>>>> {}", formatAppendLog);
            }
        }
    }
    

    JobThread.java

    @Override
    	public void run() {
    	......
    		// execute
    		while(!toStop){
    			running = false;
    			idleTimes++;
    
                TriggerParam triggerParam = null;
                ReturnT<String> executeResult = null;
                try {
    				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
    				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
    				if (triggerParam!=null) {
    					running = true;
    					idleTimes = 0;
    					triggerLogIdSet.remove(triggerParam.getLogId());
    
    					// log filename, like "logPath/yyyy-MM-dd/9999.log"
    					// String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
    
    					// modify 将生成的日志文件重新命名,以jobId命名
    					String logFileName = XxlJobFileAppender.makeLogFileNameByJobId(new Date(triggerParam.getLogDateTim()), triggerParam.getJobId());
    					XxlJobFileAppender.contextHolderJobId.set(triggerParam.getJobId());
    					// 此处根据xxl-job版本号做修改
    					XxlJobFileAppender.contextHolderLogId.set(Integer.parseInt(String.valueOf(triggerParam.getLogId())));
    
    					XxlJobFileAppender.contextHolder.set(logFileName);
    					ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
    
    					......
    	}
    

    ExecutorBizImpl.java

    package com.xxl.job.core.biz.impl;
    
    import com.xxl.job.core.biz.ExecutorBiz;
    import com.xxl.job.core.biz.model.LogResult;
    import com.xxl.job.core.biz.model.ReturnT;
    import com.xxl.job.core.biz.model.TriggerParam;
    import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
    import com.xxl.job.core.executor.XxlJobExecutor;
    import com.xxl.job.core.glue.GlueFactory;
    import com.xxl.job.core.glue.GlueTypeEnum;
    import com.xxl.job.core.handler.IJobHandler;
    import com.xxl.job.core.handler.impl.GlueJobHandler;
    import com.xxl.job.core.handler.impl.ScriptJobHandler;
    import com.xxl.job.core.log.XxlJobFileAppender;
    import com.xxl.job.core.thread.JobThread;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    
    /**
     * Created by xuxueli on 17/3/1.
     */
    public class ExecutorBizImpl implements ExecutorBiz {
        private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class);
    
    
        /**
         * 重写读取日志方法
         * @param logDateTim
         * @param logId
         * @param fromLineNum
         * @return
         */
        @Override
        public ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum) {
            // log filename: logPath/yyyy-MM-dd/9999.log
            String logFileName = XxlJobFileAppender.makeFileNameForReadLog(new Date(logDateTim), (int)logId);
    
            LogResult logResult = XxlJobFileAppender.readLogByIndex(logFileName, Integer.parseInt(String.valueOf(logId)), fromLineNum);
            return new ReturnT<LogResult>(logResult);
        }
    }
    

    LRUCacheUtil.java

       通过LinkedHashMap实现一个缓存容器

    package com.xxl.job.core.util;
    
    import java.util.LinkedHashMap;
    import java.util.Map;
    
    /**
     * @Author: liangxuanhao
     * @Description: 使用LinkedHashMap实现一个固定大小的缓存器
     * @Date:
     */
    public class LRUCacheUtil<K, V> extends LinkedHashMap<K, V> {
    
        // 最大缓存容量
        private static final int CACHE_MAX_SIZE = 100;
    
        private int limit;
    
        public LRUCacheUtil() {
            this(CACHE_MAX_SIZE);
        }
    
        public LRUCacheUtil(int cacheSize) {
            // true表示更新到末尾
            super(cacheSize, 0.75f, true);
            this.limit = cacheSize;
        }
    
    	/**
    	 * 加锁同步,防止多线程时出现多线程安全问题
    	 */
        public synchronized V save(K key, V val) {
            return put(key, val);
        }
    
        public V getOne(K key) {
            return get(key);
        }
    
        public boolean exists(K key) {
            return containsKey(key);
        }
    
        /**
         * 判断是否超限
         * @param elsest
         * @return 超限返回true,否则返回false
         */
        @Override
        protected boolean removeEldestEntry(Map.Entry elsest) {
            // 在put或者putAll方法后调用,超出容量限制,按照LRU最近最少未使用进行删除
            return size() > limit;
        }
    
        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<K, V> entry : entrySet()) {
                sb.append(String.format("%s:%s ", entry.getKey(), entry.getValue()));
            }
            return sb.toString();
        }
    }
    

    结果

       执行效果如图所示:达到预期期望,且效果良好。

  • 相关阅读:
    Java8新特性
    为什么要使用ORM技术?和 JDBC 有何不一样?
    HTTP Status 500
    重装Oracle时出现SID已存在问题的解决办法
    数据库模式显示的Swing表格
    自然连接和等值连接
    雷林鹏分享:Java 循环结构
    雷林鹏分享:Java 运算符
    雷林鹏分享:Java 修饰符
    雷林鹏分享:Java 变量类型
  • 原文地址:https://www.cnblogs.com/xuanhaoo/p/14356352.html
Copyright © 2011-2022 走看看