基本思路
一种客户较多使用的方式,这里用python代码的方式来进行实现; 通过调参,可以更加灵活地实现异常点判断。
下面示例:周期为周(7天),取每7天的同一周几的同一时刻的值进行比较,计算最大值和最小值,超出一定阈值,判断为异常点,并对该异常点进行平滑处理。
示例代码 可以直接运行
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
import datetime
def paint(dfs=[], labels=[], title='暂无'):
assert len(dfs) == len(labels)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 显示中文标签
plt.rcParams['axes.unicode_minus'] = False
plt.figure(figsize=(16, 8))
for i in range(0, len(dfs)):
plt.plot(dfs[i]['data_value'], label=labels[i])
plt.legend(loc='best')
plt.title(title + str(datetime.datetime.now()))
plt.show()
def paint_array(dfs=[], labels=[], title='输出字典测试'):
assert len(dfs) == len(labels)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 显示中文标签
plt.rcParams['axes.unicode_minus'] = False
plt.figure(figsize=(16, 8))
for i in range(0, len(dfs)):
plt.plot(dfs[i], label=labels[i])
plt.legend(loc='best')
plt.title(title + str(datetime.datetime.now()))
plt.show()
def same_timestamp_period(data):
input_ids = [165]
output_ids = [182]
output_multiples = [1]
for input_id, output_id, output_multiple in zip(input_ids, output_ids, output_multiples):
cycle_period, cycle_times, max_threshold, min_threshold, influence = 8, 8, 1.2, 0.8, 0.5
"""
取每周期内同一时刻的值
cycle_period表示比较的时间段范围的天的总分段数, 7天 * 每天的24/48/96分段数
cycle_times 表示相比的cycle_period的次数, 默认8周
max_threshold: 最大值的倍数阈值, 高于该阈值判定为异常点
min_threshold: 最小值的倍数阈值, 低于该阈值判定为异常点
influence : 异常点的折算值, 避免异常值之后的所有的点都无法再判断为异常点
output_multiples: 放大异常值结果的倍数, 默认为1
"""
y = data[input_id]['data_value']
window = cycle_period * cycle_times
if len(y) < window:
return
signals = np.zeros(len(y))
filtered_y = np.array(y)
max_cycle_filter = [0] * len(y)
min_cycle_filter = [0] * len(y)
for i in range(window, len(y)):
# 计算上个周期的最大值和最小值的结果
max_cycle_filter[i - 1] = np.max([filtered_y[i] for i in range(i - window, i, cycle_period)])
min_cycle_filter[i - 1] = np.min([filtered_y[i] for i in range(i - window, i, cycle_period)])
if filtered_y[i] > max_cycle_filter[i - 1] * max_threshold:
signals[i] = 1
filtered_y[i] = filtered_y[i] * influence + filtered_y[i - 1] * (1.0 - influence)
elif filtered_y[i] < min_cycle_filter[i - 1] * min_threshold:
signals[i] = -1
filtered_y[i] = filtered_y[i] * influence + filtered_y[i - 1] * (1.0 - influence)
series_dict = dict(signals=np.asarray(signals * output_multiple),
maxFilters=np.asarray(max_cycle_filter),
minFilters=np.asarray(min_cycle_filter)
)
data[output_id] = data[input_id].copy()
data[output_id]['data_value'] = np.asarray(series_dict['signals'])
############################## 绘图
paint_array(dfs=[signals, np.asarray(y)],
labels=['signals', 'y'])
if __name__ == '__main__':
idx = pd.date_range('2020-01-01', periods=190, freq='D')
# 构造一个周期为4*2的余弦函数
cos_arr = np.arange(len(idx)) * np.pi / 4
for i in range(0, len(cos_arr)):
# 这里加一, 将余弦函数整体上移1个单位, 避免出现负值, 出现负值,会导致大量误判,
cos_arr[i] = math.cos(cos_arr[i]) + 1 + 1
# 人为制造4个异常点
cos_arr[72] -= 1
cos_arr[94] -= 2
# cos_arr[45] -= 2
cos_arr[132] += 4
cos_arr[182] += 4
data_value = pd.Series(cos_arr, index=idx)
df = pd.DataFrame({
'data_time': idx, # 时间列
'data_value': data_value # 数据列
})
data = {}
data[165] = df
data[182] = pd.DataFrame()
same_timestamp_period(data)
# paint(dfs=[data[157], data[161]], labels=['origin', 'signal'], title='origin模拟以天为周期的业务数据,signal模拟脉冲信号')
"""
以上涉及的场景数据暂无负值,如有负值请改造min_threshold,特判最小值小于0时改为乘于一个大于1的数值
"""