zoukankan      html  css  js  c++  java
  • 基于PySpark的网络服务异常检测系统 (四) Mysql与SparkSQL对接同步数据 kmeans算法计算预测异常

    基于Django Restframework和Spark的异常检测系统,数据库为MySQL、Redis, 消息队列为Celery,分析服务为Spark SQL和Spark Mllib,使用kmeans和随机森林算法对网络服务数据进行分析;数据分为全量数据和正常数据,每天通过自动跑定时job从全量数据中导入正常数据供算法做模型训练。

    使用celery批量导入(指定时间段)正常样本到数据库

    def add_normal_cat_data(data):
        """
        构建数据model  用yield每次返回1000条数据
        :param data
        :return:
        """
        tmp_cat_normal_models = []
    
        for cat_data in data:
            response_time = cat_data.get('response_time')
            request_count = cat_data.get('request_count') or 1
            fail_count = cat_data.get('fail_count') or 1
            cat_data['id'] = str(uuid4())
            if response_time < 1.2 and (fail_count / request_count) < 0.2:
                cat_obj = CatNormalResource(
                    **cat_data
                )
                tmp_cat_normal_models.append(cat_obj)
    
            if len(tmp_cat_normal_models) >= 1000:
                yield tmp_cat_normal_models
                tmp_cat_normal_models = []
    
        yield tmp_cat_normal_models
    
    
    @celery_app.task
    def insert_normal_cat_data(data):
        """
        使用异步,每次用bulk 批量插入 1000条数据
        :param data:
        :return:
        """
        try:
            for i in add_normal_cat_data(data):
                CatNormalResource.objects.bulk_create(i)
        except Exception as e:
            print(e)
            raise RsError('插入数据库失败')

    通过contab定时job,每天自动导入正常样本

     1 def get_current_timestamp():
     2     """
     3     获取当前时间戳
     4     :return:
     5     """
     6     return int(time.time()) * 1000
     7 
     8 
     9 def convert_datetime_to_timestamp(dtime):
    10     """
    11     把datetime转换为时间戳
    12     :param datetime:
    13     :return:
    14     """
    15     timestamp = time.mktime(dtime.timetuple())
    16     return int(timestamp) * 1000
    17 
    18 
    19 def get_cache_cat_data(start_time, end_time, force=False):
    20     """
    21     获取指定时间段的cat数据
    22     :param start_time:
    23     :param end_time:
    24     :return:
    25     """
    26     key = 'GET_CAT_RES_DATA_{0}_TO_{1}'.format(
    27         start_time, end_time
    28     )
    29     content = cache.get(key)
    30     if force or not content:
    31         content = get_cat_res_data(start_time, end_time)
    32         if content:
    33             cache.set(key, content, timeout=CACHE_TIMEOUT_DEFAULT)
    34 
    35     return content
    36 
    37 
    38 def add_normal_cat_data(data):
    39     """
    40     构建数据model  用yield每次返回1000条数据
    41     :param data
    42     :return:
    43     """
    44     tmp_cat_normal_models = []
    45 
    46     for cat_data in data:
    47         response_time = cat_data.get('response_time')
    48         request_count = cat_data.get('request_count') or 1
    49         fail_count = cat_data.get('fail_count') or 1
    50         cat_data['id'] = str(uuid4())
    51         if response_time < 1.2 and (fail_count / request_count) < 0.2:
    52             cat_obj = CatNormalResource(
    53                 **cat_data
    54             )
    55             tmp_cat_normal_models.append(cat_obj)
    56 
    57         if len(tmp_cat_normal_models) >= 1000:
    58             yield tmp_cat_normal_models
    59             tmp_cat_normal_models = []
    60 
    61     yield tmp_cat_normal_models
    62 
    63 
    64 @celery_app.task
    65 def insert_normal_cat_data(data):
    66     """
    67     使用异步,每次用bulk 批量插入 1000条数据
    68     :param data:
    69     :return:
    70     """
    71     try:
    72         for i in add_normal_cat_data(data):
    73             CatNormalResource.objects.bulk_create(i)
    74     except Exception as e:
    75         print(e)
    76         raise RsError('插入数据库失败')
    77 
    78 
    79 def insert_normal_cat_job():
    80     """
    81     定时导入前一天的正常数据
    82     :return:
    83     """
    84     logger.info('insert_normal_cat_job  ....')
    85     dt_time = datetime.datetime.now() + datetime.timedelta(days=-1)
    86     start_time = convert_datetime_to_timestamp(dt_time)
    87     end_time = get_current_timestamp()
    88     data = get_cache_cat_data(start_time, end_time)
    89     insert_normal_cat_data.delay(data)

    SparkSQL读取指定时间段数据,使用Kmeans预测新数据异常

     1 class SparkAnomaly(object):
     2     def __init__(self, appid, start_time, end_time):
     3         self.appid = appid
     4         self.start_time = start_time
     5         self.end_time = end_time
     6         self.spark_sql = SparkSql()
     7         self.cat_res = self.spark_sql.load_table_dataframe('cat_resource')
     8         self.cat_normal_res = self.spark_sql.load_table_dataframe(
     9             'cat_normal_resource'
    10         )
    11         self.filter_str = "appid = {0} " 
    12                           "and create_time >= {1} " 
    13                           "and update_time <= {2}".format(
    14             self.appid, self.start_time, self.end_time,
    15         )
    16         self.model_filter_str = "appid = {0}".format(self.appid)
    17 
    18     def get_kmeans_model(self):
    19         """
    20         得到kmeans聚类模型
    21         :return:
    22         """
    23         df = self.cat_normal_res.filter(self.model_filter_str)
    24         parsed_data_rdd = df.rdd.map(lambda x: array([x[4], x[5], x[6]]))
    25 
    26         # 建立聚类模型
    27         clusters = KMeans.train(
    28             parsed_data_rdd, 3,
    29             maxIterations=10,
    30             initializationMode="random"
    31         )
    32 
    33         return clusters
    34 
    35     def get_kmeans_predict(self):
    36         """
    37         获取appid指定时间段的预测结果
    38         :return:
    39         """
    40         df = self.cat_res.filter(self.filter_str)
    41         parsed_data_rdd = df.rdd.map(lambda x: array([x[4], x[5], x[6]]))
    42         clusters = self.get_kmeans_model()
    43         predict_result = clusters.predict(parsed_data_rdd)
    44         return predict_result.collect()
    45 
    46 
    47 def get_kmeans_result(appid, start_time, end_time):
    48     """
    49     获取appid指定时间段的cat数据
    50     :param appid:
    51     :param start_time:
    52     :param end_time:
    53     :return:
    54     """
    55     cat_result_obj = CatResultData.objects.filter(
    56         appid=appid,
    57         start_time=start_time,
    58         end_time=end_time,
    59         algorithm_name="kmeans"
    60     ).first()
    61     if not cat_result_obj:
    62         arg_result = SparkAnomaly(appid, start_time, end_time)
    63         content = arg_result.get_kmeans_predict()
    64         cat_result_obj = CatResultData.objects.create(
    65             appid=appid,
    66             start_time=start_time,
    67             end_time=end_time,
    68             algorithm_name="kmeans",
    69             result_data=content
    70         )
    71     ser_data = CatResultDataSerializer(cat_result_obj).data
    72     ser_data['result_data'] = json.loads(ser_data['result_data'])
    73     return ser_data

    以上代码为系统的部分代码,详细代码请见我的github  https://github.com/a342058040/network_anomaly_detection

  • 相关阅读:
    列表推导式,生成器表达式
    迭代器,生成器初始
    装饰器的进阶
    函数名用途,初始装饰器
    函数参数,和空间
    py文件的操作
    字符串相关操作
    python基础二
    Django简介
    Django初识
  • 原文地址:https://www.cnblogs.com/FG123/p/7209294.html
Copyright © 2011-2022 走看看