一、如何存储

二、目录结构

三、代码调用逻辑关系

四、实现代码
1、data_optimization
1、存筛选出来符合条件的数据
def get_data_slice(self,lastest_data_key,optimization_interval):
'''
:param optimization_interval: e.g: 600, means get latest 10 mins real data from redis
:return:
'''
all_real_data = self.redis_conn_obj.lrange(lastest_data_key,1,-1)
#print("get data range of:",lastest_data_key,optimization_interval)
#print("get data range of:",all_real_data[-1])
data_set = [] #存筛选出来符合条件的数据
for item in all_real_data:
#print(json.loads(item))
data = json.loads(item.decode())
if len(data) ==2:
#print("real data item:",data[0],data[1])
service_data, last_save_time = data
#print('time:',time.time(), time.time()- last_save_time, optimization_interval)
if time.time() - last_save_time <= optimization_interval:# fetch this data point out
#print(time.time()- last_save_time, optimization_interval)
data_set.append(data)
else:
pass
#print('data set:--->',data_set)
return data_set
2、优化筛选出来的数据
def process_and_save(self):
'''
processing data and save into redis
:return:
'''
print(" 33[42;1m---service data----------------------- 33[0m")
#print( self.client_id,self.service_name,self.data)
if self.data['status'] ==0:# service data is valid
for key,data_series_val in settings.STATUS_DATA_OPTIMIZATION.items():
data_series_optimize_interval,max_data_point = data_series_val
data_series_key_in_redis = "StatusData_%s_%s_%s" %(self.client_id,self.service_name,key)
#print(data_series_key_in_redis,data_series_val)
last_point_from_redis = self.redis_conn_obj.lrange(data_series_key_in_redis,-1,-1)
if not last_point_from_redis: #this key is not exist in redis
# 第一次汇报时会执行这段
#so initialize a new key ,the first data point in the data set will only be used to identify that when
#the data got saved last time
self.redis_conn_obj.rpush(data_series_key_in_redis,json.dumps([None,time.time()] ))
if data_series_optimize_interval == 0:#this dataset is for unoptimized data, only the latest data no need to be optimized
self.redis_conn_obj.rpush(data_series_key_in_redis,json.dumps([self.data, time.time()]))
#不需要优化,直接存
else: #data might needs to be optimized
#print("*****>>",self.redis_conn_obj.lrange(data_series_key_in_redis,-2,-1))
last_point_data,last_point_save_time =
json.loads(self.redis_conn_obj.lrange(data_series_key_in_redis,-1,-1)[0].decode())
if time.time() - last_point_save_time >= data_series_optimize_interval: # reached the data point update interval ,
lastest_data_key_in_redis = "StatusData_%s_%s_latest" %(self.client_id,self.service_name)
print("calulating data for key: 33[31;1m%s 33[0m" %data_series_key_in_redis )
#最近n分钟的数据 已经取到了,放到了data_set里
data_set = self.get_data_slice(lastest_data_key_in_redis,data_series_optimize_interval) #拿到要优化的数据
print('--------------------------len dataset :',len(data_set))
if len(data_set)>0:
#接下来拿这个data_set交给下面这个方法,让它算出优化的结果 来
optimized_data = self.get_optimized_data(data_series_key_in_redis, data_set)
if optimized_data:
self.save_optimized_data(data_series_key_in_redis, optimized_data)
#同时确保数据在redis中的存储数量不超过settings中指定 的值
if self.redis_conn_obj.llen(data_series_key_in_redis) >= max_data_point:
self.redis_conn_obj.lpop(data_series_key_in_redis) #删除最旧的一个数据
#self.redis_conn_obj.ltrim(data_series_key_in_redis,0,data_series_val[1])
else:
print("report data is invalid::",self.data)
raise ValueError
3、把数据存储到redis
def save_optimized_data(self,data_series_key_in_redis, optimized_data):
'''
save the optimized data into db
:param optimized_data:
:return:
'''
self.redis_conn_obj.rpush(data_series_key_in_redis, json.dumps([optimized_data, time.time()]))
4、存储临时数据并计算最大值、最小值、平均值
1 def get_optimized_data(self,data_set_key, raw_service_data): 2 ''' 3 calculate out avg,max,min,mid value from raw service data set 4 :param data_set_key: where the optimized data needed to save to in redis db 5 :param raw_service_data: raw service data data list 6 :return: 7 ''' 8 #index_init =[avg,max,min,mid] 9 print("get_optimized_data:",raw_service_data[0] ) 10 service_data_keys = raw_service_data[0][0].keys() #[iowait, idle,system...] 11 first_service_data_point = raw_service_data[0][0] # use this to build up a new empty dic 12 #print("--->",service_data_keys) 13 optimized_dic = {} #set a empty dic, will save optimized data later 14 if 'data' not in service_data_keys: #means this dic has no subdic, works for service like cpu,memory 15 for key in service_data_keys: 16 optimized_dic[key] = [] 17 #optimized_dic = optimized_dic.fromkeys(first_service_data_point,[]) 18 tmp_data_dic = copy.deepcopy(optimized_dic) #为了临时存最近n分钟的数据 ,把它们按照每个指标 都 搞成一个一个列表 ,来存最近N分钟的数据 19 print("tmp data dic:",tmp_data_dic) 20 for service_data_item,last_save_time in raw_service_data: #loop 最近n分钟的数据 21 #print(service_data_item) 22 for service_index,v in service_data_item.items(): #loop 每个数据点的指标service_index=iowait , v=33 23 #print(service_index,v) 24 try: 25 tmp_data_dic[service_index].append(round(float(v),2)) #把这个点的当前这个指标 的值 添加到临时dict中 26 except ValueError as e: 27 pass 28 #print(service_data_item,last_save_time) 29 #算临时字典里每个指标数据的平均值,最大值。。。,然后存到 optimized_dic 里 30 for service_k,v_list in tmp_data_dic.items(): 31 print(service_k, v_list) 32 avg_res = self.get_average(v_list) 33 max_res = self.get_max(v_list) 34 min_res = self.get_min(v_list) 35 mid_res = self.get_mid(v_list) 36 optimized_dic[service_k]= [avg_res,max_res,min_res,mid_res] 37 print(service_k, optimized_dic[service_k]) 38 39 else: # has sub dic inside key 'data', works for a service has multiple independent items, like many ethernet,disks... 40 #print("**************>>>",first_service_data_point ) 41 for service_item_key,v_dic in first_service_data_point['data'].items(): 42 #service_item_key 相当于lo,eth0,... , v_dic ={ t_in:333,t_out:3353} 43 optimized_dic[service_item_key] = {} 44 for k2,v2 in v_dic.items(): 45 optimized_dic[service_item_key][k2] = [] #{etho0:{t_in:[],t_out:[]}} 46 47 tmp_data_dic = copy.deepcopy(optimized_dic) 48 if tmp_data_dic: #some times this tmp_data_dic might be empty due to client report err 49 print('tmp data dic:', tmp_data_dic) 50 for service_data_item,last_save_time in raw_service_data:#loop最近n分钟数据 51 for service_index,val_dic in service_data_item['data'].items(): 52 #print(service_index,val_dic) 53 #service_index这个值 相当于eth0,eth1... 54 for service_item_sub_key, val in val_dic.items(): 55 #上面这个service_item_sub_key相当于t_in,t_out 56 #if service_index == 'lo': 57 #print(service_index,service_item_sub_key,val) 58 tmp_data_dic[service_index][service_item_sub_key].append(round(float(val),2)) 59 #上面的service_index变量相当于 eth0... 60 for service_k,v_dic in tmp_data_dic.items(): 61 for service_sub_k,v_list in v_dic.items(): 62 print(service_k, service_sub_k, v_list) 63 avg_res = self.get_average(v_list) 64 max_res = self.get_max(v_list) 65 min_res = self.get_min(v_list) 66 mid_res = self.get_mid(v_list) 67 optimized_dic[service_k][service_sub_k] = [avg_res,max_res,min_res,mid_res] 68 print(service_k, service_sub_k, optimized_dic[service_k][service_sub_k]) 69 70 else: 71 print("