AuthoritySlot
这个slot相对简单,看一下配置
实际上就是校验资源的来源白名单,允许或者不允许该资源被配置的来源访问。这里不过多的阐述。
SystemSlot
先说下LOAD和CPU使用率,这是使用了JVM自带的Mbean来处理
public void run() { try { OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); currentLoad = osBean.getSystemLoadAverage(); /* * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:</br> * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval. * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the * system. If the system recent cpu usage is not available, the method returns a negative value. */ double systemCpuUsage = osBean.getSystemCpuLoad(); // calculate process cpu usage to support application running in container environment RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class); long newProcessCpuTime = osBean.getProcessCpuTime(); long newProcessUpTime = runtimeBean.getUptime(); int cpuCores = osBean.getAvailableProcessors(); long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS .toMillis(newProcessCpuTime - processCpuTime); long processUpTimeDiffInMs = newProcessUpTime - processUpTime; double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores; processCpuTime = newProcessCpuTime; processUpTime = newProcessUpTime; currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage); if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) { writeSystemStatusLog(); } } catch (Throwable e) { RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e); } }
RT,线程数,入口QPS这些都属于全局控制,在statisticSlot中,这么这个步骤是统计了这些全局指标,然后在systemSlot进行校验即可
// Request passed, add thread count and pass count. node.increaseThreadNum(); if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); }
LogSlot
这个slot实际上是做埋点,关于鹰眼EagleEye的介绍可以参考这篇文章,接下来会分享一下EagleEye做埋点的源码实现。
一个logName对应一个StatLogger对象。
public class EagleEyeLogUtil { // 定义文件 public static final String FILE_NAME = "sentinel-block.log"; // 一个logname 对应一个 StatLogger private static StatLogger statLogger; static { // 日志配置文件路径 LogBase中已经缓存了配置 String path = LogBase.getLogBaseDir() + FILE_NAME; statLogger = EagleEye.statLoggerBuilder("sentinel-block-log") .intervalSeconds(1) .entryDelimiter('|') .keyDelimiter(',') .valueDelimiter(',') .maxEntryCount(6000) .configLogFilePath(path) .maxFileSizeMB(300) .maxBackupIndex(3) .buildSingleton(); } public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, int count) { statLogger.stat(resource, exceptionName, ruleLimitApp, origin).count(count); } public static void main(String[] args) throws InterruptedException { // 2021-09-18 23:46:14|1|resource1,exceptionName,ruleLimitApp,origin|1,0 EagleEyeLogUtil.log("resource1", "exceptionName", "ruleLimitApp", "origin", 1); Thread.sleep(1000 * 3); } }
看上面的main方法使用,很简单。接下来一步步讲解,
EagleEye源码分析
加载日志配置,包括日志目录 是否使用PID 日志编码格式 日志输出类型( file | console)
String path = LogBase.getLogBaseDir() + FILE_NAME;
在LogBase的static代码块中,初始化了一些默认值,这些值可能来自于用户指定的文件,环境变量指定的文件或者classpath文件。具体逻辑可以参考Properties properties = LogConfigLoader.getProperties();中LogConfigLoader的静态方法load() 的 ConfigUtil.loadProperties(fileName);
public static Properties loadProperties(String fileName) { if (StringUtil.isNotBlank(fileName)) { if (absolutePathStart(fileName)) { // 绝对路径 return loadPropertiesFromAbsoluteFile(fileName); } else if (fileName.startsWith(CLASSPATH_FILE_FLAG)) { // classpath下 使用加载器 return loadPropertiesFromClasspathFile(fileName); } else { // 相对路径 则在工程目录下 return loadPropertiesFromRelativeFile(fileName); } } else { return null; } }
至此已经指定了日志输出文件名。
接下来看代码
statLogger = EagleEye.statLoggerBuilder("sentinel-block-log") .intervalSeconds(1) .entryDelimiter('|') .keyDelimiter(',') .valueDelimiter(',') .maxEntryCount(6000) .configLogFilePath(path) .maxFileSizeMB(300) .maxBackupIndex(3) .buildSingleton();
在EagleEye的static变量中,初始化了selfAppender 和 initEagleEye
首先createSelfLogger是打印一些EagleEye相关的信息,关于EagleEyeRollingFileAppender后续介绍
static private final EagleEyeAppender createSelfLogger() { // eagleeye-self.log EagleEyeRollingFileAppender selfAppender = new EagleEyeRollingFileAppender(EAGLEEYE_SELF_LOG_FILE, EagleEyeCoreUtils.getSystemPropertyForLong("EAGLEEYE.LOG.SELF.FILESIZE", MAX_SELF_LOG_FILE_SIZE), false); // 线程安全 return new SyncAppender(selfAppender); }
initEagleEye 目的是启动守护线程去定时清理过期的日志文件,以及reload文件
private static void initEagleEye() { try { /** * [2021-09-18 20:49:42.966] [INFO] EagleEye started (file:/Users/gaojiayi/Sentinel/sentinel-core/target/classes/), * classloader=sun.misc.Launcher$AppClassLoader@18b4aac2 */ selfLog("[INFO] EagleEye started (" + CLASS_LOCATION + ")" + ", classloader=" + EagleEye.class.getClassLoader()); } catch (Throwable e) { selfLog("[INFO] EagleEye started (" + CLASS_LOCATION + ")"); } try { // 守护线程 定时去清理日志文件 并且reload EagleEyeLogDaemon.start(); } catch (Throwable e) { selfLog("[ERROR] fail to start EagleEyeLogDaemon", e); } try { // 设置调度器 rollerThreadPool 和 writerThreadPool参数 StatLogController.start(); } catch (Throwable e) { selfLog("[ERROR] fail to start StatLogController", e); } }
EagleEyeLogDaemon#run()
@Override public void run() { while (running.get()) { // 对带有deleted后缀的文件删除 包括埋点日志和EagleEye本身日志 cleanupFiles(); try { Thread.sleep(LOG_CHECK_INTERVAL); } catch (InterruptedException e) { } flushAndReload(); } }
cleanupFile这边不再阐述,现在来看一下flushAndReload方法,把所有的日志做reload处理,当然也包括了selfAppender。调用EagleEyeRollingFileAppender#reload
不过在EagleEyeRollingFileAppender reload之前,初始化的时候调用了setFile(),目的是生成新的日志文件,并初始化输出流,以及日志输出大小outputByteSize,以及最新的日志输出时间
日志文件在写的时候执行的是append方法,如果日志文件满了,里面回去创建新的日志。 接下来看一下reload的实现
@Override public void reload() { // flush flush(); File logFile = new File(filePath); long fileSize = logFile.length(); boolean fileNotExists = fileSize <= 0 && !logFile.exists(); if (this.bos == null || fileSize < outputByteSize || fileNotExists) {
//重新初始化文件 doSelfLog("[INFO] Log file rolled over by outside: " + filePath + ", force reload"); close(); setFile(); } else if (fileSize > outputByteSize) { this.outputByteSize = fileSize; if (!this.multiProcessDetected) { this.multiProcessDetected = true; if (selfLogEnabled) { doSelfLog("[WARN] Multi-process file write detected: " + filePath); } } } else { } }
EagleEye完成了类初始化,接下来是通过构建器模式来初始化StatLogger对象,并且每一个longname保存的StaLogger对象保存在内存。接下来看一下具体的实现过程
static StatLogger createLoggerIfNotExists(StatLoggerBuilder builder) { String loggerName = builder.getLoggerName(); StatLogger statLogger = statLoggers.get(loggerName); if (statLogger == null) { synchronized (StatLogController.class) { if ((statLogger = statLoggers.get(loggerName)) == null) { /** * 这里面回去创建statLogger 同时会创建一个EagleEyeLogDaemon任务中, * 添加EagleEyeRollingFileAppender rfAppender到守护线程中,用于维护管理 一个StatLoger对应一个 EagleEyeAppender
*/ statLogger = builder.create(); statLoggers.put(loggerName, statLogger); writerThreadPool.setMaximumPoolSize(Math.max(1, statLoggers.size())); // 并对该statLogger开始轮询调度 scheduleNextRollingTask(statLogger); EagleEye.selfLog("[INFO] created statLogger[" + statLogger.getLoggerName() + "]: " + statLogger.getAppender()); } } } return statLogger; }
接下来看一下scheduleNextRollingTask的实现,这边会涉及到递归调用
private static void scheduleNextRollingTask(StatLogger statLogger) { if (!running.get()) { EagleEye.selfLog("[INFO] stopped rolling statLogger[" + statLogger.getLoggerName() + "]"); return; } // 写日志的任务 以及下一次触发rolltask的时间都在这个run方法中
StatLogRollingTask rollingTask = new StatLogRollingTask(statLogger);
// 获取一下次轮询时间 long rollingTimeMillis = statLogger.getRollingData().getRollingTimeMillis(); // 延迟执行时间 long delayMillis = rollingTimeMillis - System.currentTimeMillis(); if (delayMillis > 5) { rollerThreadPool.schedule(rollingTask, delayMillis, TimeUnit.MILLISECONDS); } else if (-delayMillis > statLogger.getIntervalMillis()) { EagleEye.selfLog("[WARN] unusual delay of statLogger[" + statLogger.getLoggerName() + "], delay=" + (-delayMillis) + "ms, submit now"); rollerThreadPool.submit(rollingTask); } else { rollerThreadPool.submit(rollingTask); } }
StatLogRollingTask# run -> StatLogWriteTask#run
private static class StatLogRollingTask implements Runnable { final StatLogger statLogger; StatLogRollingTask(StatLogger statLogger) { this.statLogger = statLogger; } @Override public void run() { //这个statLogger.rolling()是获取到下一个rolldata // 每一秒会创建一个新的StatRollingData 用于存放新的日志 scheduleWriteTask(statLogger.rolling()); scheduleNextRollingTask(statLogger); } }
@Override public void run() { final StatRollingData data = statRollingData; final StatLogger logger = data.getStatLogger(); try { // 打印时间 final FastDateFormat fmt = new FastDateFormat(); final StringBuilder buffer = new StringBuilder(256); final String timeStr = fmt.formatWithoutMs(data.getTimeSlot()); final EagleEyeAppender appender = logger.getAppender(); /** * 将日志打印任务写到文件 */ final Set<Entry<StatEntry, StatEntryFunc>> entrySet = data.getStatEntrySet(); final char entryDelimiter = logger.getEntryDelimiter(); final char keyDelimiter = logger.getKeyDelimiter(); final char valueDelimiter = logger.getValueDelimiter(); // 循环打印日志 for (Entry<StatEntry, StatEntryFunc> entry : entrySet) { buffer.delete(0, buffer.length()); StatEntryFunc func = entry.getValue(); // time|statType|keys|values buffer.append(timeStr).append(entryDelimiter); buffer.append(func.getStatType()).append(entryDelimiter); entry.getKey().appendTo(buffer, keyDelimiter); buffer.append(entryDelimiter); func.appendTo(buffer, valueDelimiter); buffer.append(EagleEyeCoreUtils.NEWLINE); appender.append(buffer.toString()); } // flush appender.flush(); } catch (Throwable t) { EagleEye.selfLog("[WARN] fail to write statLogger[" + logger.getLoggerName() + "]", t); } } }
整个日志的StatLogger已经初始化完成,包括appender。那么在打印请求的时候,时间上就是创建一个新的StatEntry来保存到StatRollingData的statMap中,让写调度去执行。
StatEntry#count
public void count(long count) { getFunc(StatEntryFuncFactory.COUNT_SUM).count(count); }
这里边会调用到StatRollingData#getStatEntry
StatEntryFunc getStatEntryFunc( final StatEntry statEntry, final StatEntryFuncFactory factory) { // 对于相同的key的statEntry,只初始化一份 StatEntryFunc func = statMap.get(statEntry); if (func == null) { StatRollingData clone = null; writeLock.lock(); try { int entryCount = statMap.size(); if (entryCount < statLogger.getMaxEntryCount()) { func = statMap.get(statEntry); if (func == null) { func = factory.create(); statMap.put(statEntry, func); } } else { // 找过限制,则主动调度写 而不用等待 rollingThreadPoll到触发点才写 Map<StatEntry, StatEntryFunc> cloneStatMap = new HashMap<StatEntry, StatEntryFunc>(statMap); statMap.clear(); func = factory.create(); statMap.put(statEntry, func); clone = new StatRollingData(statLogger, timeSlot, rollingTimeMillis, cloneStatMap); } } finally { writeLock.unlock(); } if (clone != null) { StatLogController.scheduleWriteTask(clone); } } return func; }
另外对于一秒内打印一次错误的EagleEye日志,sentine安装令牌桶的算法来实现,控制错误日志输出速率
在EagleEye中 static private TokenBucket exceptionBucket = new TokenBucket(10, TimeUnit.SECONDS.toMillis(10)); 表示10s内最多生成10个token
public static void selfLog(String log, Throwable e) { long now = System.currentTimeMillis(); if (exceptionBucket.accept(now)) { try { String timestamp = EagleEyeCoreUtils.formatTime(now); StringWriter sw = new StringWriter(4096); PrintWriter pw = new PrintWriter(sw, false); pw.append('[').append(timestamp).append("] ").append(log).append(EagleEyeCoreUtils.NEWLINE); e.printStackTrace(pw); pw.println(); pw.flush(); selfAppender.append(sw.toString()); } catch (Throwable t) { } } }
public boolean accept(long now) { long currTokens; if (now > nextUpdate) { currTokens = tokens.get(); if (tokens.compareAndSet(currTokens, maxTokens)) { // nextUpdate 根据当前时间重置 nextUpdate = System.currentTimeMillis() / 1000 * 1000 + intervalMillis; } } do { currTokens = tokens.get(); // 发现有token 并且还拿不走的时候 一致循环,直到成功扣减一个或者 没有token } while (currTokens > 0 && !tokens.compareAndSet(currTokens, currTokens - 1)); return currTokens > 0; }
参考
https://www.jianshu.com/p/21e1794dc660
https://www.jianshu.com/p/87bec2187912?utm_campaign=haruki