# -*- coding: utf-8 -*-
#!/usr/bin/python
#
# FILE: monitor.py
#
# USAGE: ---
#
# DESCRIPTION:
# markit 外部数据资料数据(reference data)预计早上9:00左右接入;监控,系统配置表写入
# OPTIONS: ---
# REQUIREMENTS: ---
# BUGS: ---
# NOTES: ---
# AUTHOR: zl
# COMPANY: CCDC
# VERSION: 1.0
# CREATED: 2020.09.21
# REVISION: 0.1
# REVISION: ---
# SRC_TABLE: ---
# TGT_TABLE: ---
import cx_Oracle
from xml.dom.minidom import parse
import xml.dom.minidom
import datetime
from core.dataoracle import *
import sys
from conf.config import *
import time
from core.getconfig import get_config
import pandas as pd
import numpy as np
import paramiko
from core.downfile import *
import os
import logging
from core.dataoracle import SQL
from urllib.parse import quote
#from markit.parse.dataoracle import OracleConnect
from conf import settings # 导入自定义的logging配置
import re
settings.load_my_logging_cfg() # 在你程序文件的入口加载自定义logging配置
logger = logging.getLogger(__name__) # 生成logger实例
os.environ['NLS_LANG']='SIMPLIFIED CHINESE_CHINA.AL32UTF8'
class alarm:
def __init__(self,con_str):
self.__con_str = con_str
self.__oracle_connect = OracleConnect(con_str)
self.__tosql = SQL()
def data_hitory(self,file_byte,time_start,task_id,file,table_name,file_count,success_count,fail_count,status):
'''
# 写入配置表信息dcs_collect_batch
:param file_byte: 文件大小
:param time_start: 开始时间
:param task_id: 任务id
:param file: 文件名
:param table_name: 表名
:param file_count: 数据条数
:return:
'''
time_end = time.time()
collect_number = round((time_end - time_start), 3)
dic_batch = {}
for i, key in enumerate(['task_type', 'task_id', 'collect_user', 'collect_time', 'collect_long',
'parse_long', 'file_byte', 'file_count', 'success_count', 'fail_count', 'status',
'create_time', 'file_url']):
dic_batch[i] = key
hi_dic = dic_batch
# file_count = len(df_new.index)
# success_count = file_count
# status = 1
parse_number = int(time.time()) - collect_number
hitory = pd.DataFrame(['FILE', int(task_id), 'system',
pd.to_datetime(time.strftime("%Y%m%d %H%M%S", time.localtime()),
format=("%Y%m%d %H%M%S")),
collect_number, parse_number, file_byte, file_count, success_count, fail_count, status,
pd.to_datetime(time.strftime("%Y%m%d %H%M%S", time.localtime()),
format=("%Y%m%d %H%M%S")), table_name]).T
hitory = hitory.rename(columns=hi_dic)
self.__tosql.tooracle('dcs_collect_batch', hitory)
def data_hitory_no(self, task_id, table_name, status_msg):
time_end = time.time()
collect_number = 0
dic_batch = {}
for i, key in enumerate(['task_type', 'task_id', 'collect_user', 'collect_time', 'collect_long',
'parse_long', 'file_byte', 'file_count', 'success_count', 'fail_count', 'status',
'create_time',
'file_url', 'status_msg']):
dic_batch[i] = key
hi_dic = dic_batch
success_count = 0
file_count = 0
status = 0
# status_msg = '没有新的接入文件,请联系相关人员进行处理!'
parse_number = int(time.time()) - collect_number
hitory = pd.DataFrame(['FILE', int(task_id), 'system',
pd.to_datetime(time.strftime("%Y%m%d %H%M%S", time.localtime()),
format=("%Y%m%d %H%M%S")),
collect_number, parse_number, 0, file_count, success_count, 0, status,
pd.to_datetime(time.strftime("%Y%m%d %H%M%S", time.localtime()),
format=("%Y%m%d %H%M%S")), table_name, status_msg]).T
hitory = hitory.rename(columns=hi_dic)
self.__tosql.tooracle('dcs_collect_batch', hitory)
def file_record(self, batch_id, localpath, remoteDir, dcs_flag, file_time):
'''
写入配置表信息:dcs_file_record
:param batch_id: 批次号 取自dcs_collect_batch的dcs_id
:param remoteDir: 下载到路径
:return:
'''
cur_dir = remoteDir.split('/')[0]
day = datetime.datetime.now().strftime("%Y%m%d")
sftp = get_config('sftp')
host = sftp['hostdata']['ip']
# path = os.path.join(cur_dir, day)
path = remoteDir
# file_name = localpath.split('/')[2]
file_name = localpath.split('/')[-1]
# file_time = day
# dcs_flag = 1
create_time = pd.to_datetime(time.strftime("%Y%m%d %H%M%S", time.localtime()), format=("%Y%m%d %H%M%S"))
dic_record = {}
for i, key in enumerate(['batch_id', 'host', 'path', 'file_name', 'file_time', 'dcs_flag', 'create_time']):
dic_record[i] = key
record_dic = dic_record
record_hitory = pd.DataFrame([batch_id, host, path, file_name, file_time, dcs_flag, create_time]).T
record_hitory = record_hitory.rename(columns=record_dic)
self.__tosql.tooracle('dcs_file_record', record_hitory)
def no_file_record(self, task, localpath, remoteDir):
'''
:param task:
:param localpath:
:param remoteDir:
:param file_time: 文件时间
:return:
'''
for i in task:
task_id = task[i]
status_msg = '没有新的接入文件,请联系相关人员进行处理!'
#status_msg = status_msg.encode("GBK")
#status_msg = quote(status_msg,safe=string.printable)
j = i.lower()
table = 'dcs_reserved_markit_' + j
#self.data_hitory_no(task_id, 'dcs_reserved_markit_' + i, status_msg)
self.data_hitory_no(task_id, 'dcs_reserved_markit_' + i, status_msg)
# sql_batch_id = "select max(dcs_id) from dcs_collect_batch a where a.task_type='FILE' and a.file_url='{0}'".format(
# table)
# res = self.__oracle_connect.fetchall(sql_batch_id)
# batch_id = int(res[0][0])
# self.file_record(batch_id, localpath, remoteDir, 0, file_time)
def repeat_data(self, task, localpath, remoteDir, file_time):
'''
:param task:
:param localpath:
:param remoteDir:
:param file_time: 文件时间
:return:
'''
for i in task:
print('############repeat_data')
task_id = task[i]
j = i.lower()
table = 'dcs_reserved_markit_' + j
sql_batch_id = "select max(dcs_id) from dcs_collect_batch a where a.task_type='FILE' and a.file_url='{0}'".format(
table)
res = self.__oracle_connect.fetchall(sql_batch_id)
print('sql_batch_id',sql_batch_id)
batch_id = int(res[0][0])
self.file_record(batch_id, localpath, remoteDir, 0, file_time)
#!/bin/bash
export NLS_LANG='SIMPLIFIED CHINESE_CHINA.AL32UTF8'
#source ~/.bash_profile
data_date=`date +%Y%m%d%H%M%S`
sub_dir=$(cd `dirname $0`;pwd)
cd $sub_dir
python delete_mid_table_data.py