zoukankan      html  css  js  c++  java
  • 定时任务配合es实现高可用查询

    定时任务统计查询

         1.无论是定时任务还是es本身都有可能被异常停止服务

             当服务恢复正常的时候进行补偿查询  把服务中断期间的数据进行统计查询

         2.单次统计也可能由于程序本身和网络连接问题发生异常

             把异常的统计时间段存入异常表 每次查询都从表中取出所有异常查询点 不断查询直到返回成功

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import datetime
    import time
    import sys
    import os
    import json
    sys.path.append('../')
    
    from common import esConfig
    from common import timetools
    from common import calastTime
    from common.loggerConfig import loggerConfig
    from common import constConfig
    from common import dbtools
    
    
    indexname_prefix="sms-service-"
    logfile="../log/sms_service.log"
    logpath=os.path.abspath(logfile)
    log=loggerConfig("smsService",logpath)
    logger=log.create_logger()
    
    
    
    body={"aggs":{"3":{"terms":{"field":"APP_ID","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"4":{"terms":{"field":"code","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"5":{"terms":{"field":"CHANNEL_ID","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"6":{"terms":{"field":"SWJG_DM","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"7":{"terms":{"field":"STATUS","size":5000,"order":{"1":"desc"}},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}},"8":{"date_histogram":{"field":"mydate","interval":"1h","time_zone":"Asia/Shanghai","min_doc_count":1},"aggs":{"1":{"cardinality":{"field":"RECORD_ID"}}}}}}}}}}}}}}},"size":0,"stored_fields":["*"],"docvalue_fields":[{"field":"@timestamp","format":"date_time"},{"field":"mydate","format":"date_time"}],"query":{"bool":{"must":[{"match_phrase":{"metricsName":{"query":"SmsService"}}},{"range":{"mydate":{"gte":0,"lte":0,"format":"epoch_millis"}}}]}},"timeout":"30000ms"}
    
    def handle_res(res,index_date):
        outlist=[]
        dnow=datetime.datetime.now().strftime('%Y-%m-%d %H')+":00:00"
        for i3 in res["aggregations"]["3"]["buckets"]:
          for i4 in i3["4"]["buckets"]:
            for i5 in i4["5"]["buckets"]:
              for i6 in i5["6"]["buckets"]:
                for i7 in i6["7"]["buckets"]:
                    for i8 in i7["8"]["buckets"]:
                        timestr = i8["key_as_string"][:-6]
                        newtime = timetools.formartTime(timestr)
                        outlist.append({"appId":i3["key"],"code":i4["key"],"channelId":i5["key"],"swjdDm":i6["key"],"status":i7["key"],"fszl":i8["doc_count"],"fsl":i8["1"]["value"],"createTime":newtime,"statisticalTime":dnow})
        indexname = indexname_prefix + index_date
        if esConfig.es.indices.exists(index=indexname):
            logger.info("%s 索引已经存在" % (indexname), extra={'result': '', "querytime": ""})
        else:
            esConfig.es.indices.create(index=indexname)
            logger.info("%s 索引被成功创建" % (indexname), extra={'result': '', "querytime": ""})
    
        for data in outlist:
            res2=esConfig.es.index(index=indexname, doc_type="doc", body=data)
            logger.info("%s被成功添加到%s索引当中" %(json.dumps(data),indexname),extra={'result': 'data', "querytime": ""})
    
    def handler_hours(hours):
        retryCount = constConfig.MAX_RETRY
        cald=calastTime.CalcDaysHours(beginTime=None,endTime=None,logger=logger)
        for i in hours:
            while retryCount >= 1:
                try:
                    index_date = i.strftime("%Y-%m-%d")
                    index_hour = i.strftime("%H")
                    res = cald.get_data_from_index(i,body)
                    handle_res(res, index_date)
                    # 日志中记录最后一次成功统计的时间 最后统计成功时间不一定是当前时间
                    logger.info("成功统计%d条记录" % (res['hits']['total']), extra={'result': 'success', "querytime": i})
                    successtask = dbtools.Task(name='smsService',time=i,status=1)
                    dbtools.TaskOps().delete_FailTask(name='smsService',time=i)
                    dbtools.TaskOps().Add_Task(successtask)
                    time.sleep(10)
                    break
                except Exception as e:
                    retryCount -= 1
                    if retryCount > 0:
                        logger.error("%d次统计发生异常%s,还剩%d次重试" % (retryCount + 1, str(e), retryCount),
                                     extra={'result': 'fail', "querytime": i})
                        time.sleep(60)
                        continue
                    else:
                        failtask = dbtools.Task(name='smsService', time=i, status=0)
                        dbtools.TaskOps().delete_FailTask(name='smsService',time=i)
                        dbtools.TaskOps().Add_Task(failtask)
                        logger.error("%d次统计全部失败,结束异常统计" % (constConfig.MAX_RETRY), extra={'result': 'fail', "querytime": i})
                        retryCount = constConfig.MAX_RETRY
                        break
    
    pro=calastTime.ProccessData("smsService",body,logger)
    logger.info("开始统计补偿任务",extra={'result': 'start', "querytime": ""})
    lost_hours=pro.run_buchang()
    handler_hours(lost_hours)
    logger.info("补偿任务统计结束",extra={'result': '', "querytime": ""})
    logger.info("开始重新统计失败任务",extra={'result': '', "querytime": ""})
    error_hours=pro.run_failtask()
    handler_hours(error_hours)
    logger.info("失败任务统计结束",extra={'result': 'end', "querytime": ""})
    View Code
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import datetime
    import os
    from common import timetools
    from common import esConfig
    from common import dbtools
    
    
    class CalcTime:
        def __init__(self,name,logger):
            self.name=name
            self.logger=logger
    
        def getDifferHours(self):
            # logf=LogFileRead(self.filename,self.logger)
            lastSuccessTime=Querydb().getLastSucessTask(self.name).strftime('%Y-%m-%d %H:%M:%S')
            lastSuccessTime=timetools.logtime_to_datetime(lastSuccessTime)
            #最后执行成功的时间 说明下一个小时的统计没有被成功执行 从下一个小时开始计算
            #开始时间=最后执行成功时间+1h-3h 这段时间开始范围内的数据都没有被统计
            nowTime=datetime.datetime.now()
            #结束时间=当前时间-3h
            #补偿异常期间内所有的数据
            self.logger.info("上次统计的成功时间段是%s" % (lastSuccessTime.strftime('%Y-%m-%d %H')),extra={'result': '', "querytime": ""})
            #说明当前时间-3h之前的数据都被成功统计
            self.logger.info("当前时间%s" %(nowTime),extra={'result': '', "querytime": ""})
            #本次的实际统计开始时间是从上次成功统计后的1h开始
            real_startTime=(lastSuccessTime+datetime.timedelta(hours=1)).replace(minute=0,second=0).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            theory_startTime=(nowTime+datetime.timedelta(hours=-3)).replace(minute=0,second=0).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            endTime = (nowTime + datetime.timedelta(hours=-3)).replace(minute=59, second=59).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            self.logger.info("补偿时间范围%s-%s" %(real_startTime,endTime),extra={'result': '', "querytime": ""})
            self.logger.info("此次理论查询时间范围%s-%s" % (theory_startTime, endTime),extra={'result': '', "querytime": ""})
            self.logger.info("此次实际查询时间范围%s-%s" %(real_startTime,endTime),extra={'result': '', "querytime": ""})
            return real_startTime,endTime
    
    class LogFileRead:
    
        def __init__(self,filename,logger):
            self.filename=filename
            self.logger=logger
    
        def getLastSuccessTime(self):
            # res = os.popen("tac %s | grep 'success'| head -n 1" % (self.filename))
            res = os.popen("tac %s | sed -n '/^success/p'| head -n 1" % (self.filename))
            # res = os.popen("sed -n '/^success/p' %s | head -n 1" %(self.filename))
            con = None
            for re in res.readlines():
                con = re
            if con:
                cons = con.split("_")
                self.logger.info("%s日志记录中最近的成功统计时间是%s" %(self.filename,cons[2]),extra={'result': '',"querytime":""})
                return cons[2]
            else:
                self.logger.info("%s日志文件中读取不到任何成功执行记录" %(self.filename),extra={'result': '',"querytime":""})
                #如果日志文件中读取不到任何记录  则说明统计的是当前时间-3h的记录
                return (datetime.datetime.now()+datetime.timedelta(hours=-3)).strftime('%Y-%m-%d %H:%M:%S')+",000"
    
    class CalcDaysHours:
    
        def __init__(self,beginTime,endTime,logger):
            self.beginTime=beginTime
            self.endTime=endTime
            self.logger=logger
    
        def calcDays(self):
            date1 = datetime.datetime.strptime(self.beginTime[:-13], "%Y-%m-%d")
            date2 = datetime.datetime.strptime(self.endTime[:-13], "%Y-%m-%d")
            date_list = []
            while date1 <= date2:
                date_str = date1
                date_list.append(date_str)
                date1 += datetime.timedelta(days=1)
            return date_list
    
        def calcHours(self):
            date1 = datetime.datetime.strptime(self.beginTime[:-10], "%Y-%m-%d %H")
            date2 = datetime.datetime.strptime(self.endTime[:-10], "%Y-%m-%d %H")
            hour_list=[]
            while date1 <= date2:
                date_str = date1
                hour_list.append(date_str)
                date1 += datetime.timedelta(hours=1)
            return hour_list
    
        def getIndexes(self):
            datelist=self.calcDays()
            indexes=[]
            for i in datelist:
                indexes.append(esConfig.index_prefix+i.strftime("%Y-%m-%d"))
            self.logger.info("此次查询的原始索引为%s" %(indexes),extra={'result': '',"querytime":""})
            return indexes
    
        def getOriginalIndex(self,startTime):
            indexes=[]
            index1=esConfig.index_prefix+startTime.strftime("%Y-%m-%d")
            #es服务异常恢复后会把n天前的数据导入到当前日期的索引中 而不会导入对应日期的索引中
            index2=esConfig.index_prefix+datetime.datetime.now().strftime("%Y-%m-%d")
            indexes.append(index1)
            indexes.append(index2)
            self.logger.info("此次查询的原始索引为%s" % (indexes), extra={'result': '', "querytime": ""})
            return indexes
    
        def get_data_from_index(self,startTime,body):
            sTime=startTime.strftime('%Y-%m-%d %H:%M:%S')
            eTime=startTime.replace(minute=59,second=59).strftime('%Y-%m-%d %H:%M:%S')
            stime = str(timetools.strtime_to_timestamp(sTime))
            etime = str(timetools.strtime_to_timestamp(eTime))
            self.logger.info("本次统计的时间范围是%s-%s|%s-%s" %(sTime,eTime,stime,etime),extra={'result': '',"querytime":""})
            bodydate=body.get("query").get("bool").get("must")[1].get("range").get("mydate")
            bodydate["gte"]=stime
            bodydate["lte"]=etime
            body.get("query").get("bool").get("must")[1].get("range")["mydate"]=bodydate
    
            res = esConfig.es.search(body=body,index=self.getOriginalIndex(startTime))
            return res
    
    class Querydb:
    
        def getFailTasks(self,name):
            res=dbtools.TaskOps().query_FailTask(name)
            return res
    
        def getLastSucessTask(self,name):
            res =dbtools.TaskOps().query_SucTask(name)
            nowTime = datetime.datetime.now()
            if res:
                return res.time
            else:
                return nowTime+datetime.timedelta(hours=-4)
    
    class ProccessData:
    
        def __init__(self,name,body,logger):
            self.name=name
            self.body=body
            self.logger=logger
    
    
        #执行补偿task
        def run_buchang(self):
            cal=CalcTime(self.name,self.logger)
            startTime,endTime=cal.getDifferHours()
            cday=CalcDaysHours(startTime,endTime,self.logger)
            return cday.calcHours()
    
    
        #执行统计失败的任务
        def run_failtask(self):
            res = Querydb().getFailTasks(self.name)
            hours_list=[]
            for re in res:
                # re.time=re.time.strftime('%Y-%m-%d %H')
                hours_list.append(re.time)
            return hours_list
    日期和日志操作
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from sqlalchemy import create_engine,Column,Integer,String,DateTime
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.declarative import declarative_base
    import datetime
    import os
    
    parent_dir=os.path.abspath(".")
    if parent_dir.endswith("auto-task"):
        pass
    else:
        parent_dir=os.path.abspath("../")
    
    # engine = create_engine('sqlite:///./db/estasks.db',echo=True) #echo要求打印sql语句等调试信息
    engine = create_engine('sqlite:///%s/estasks.db' %(parent_dir+"/db"))
    session_maker = sessionmaker(bind=engine)
    session = session_maker()
    Base = declarative_base()
    
    class Task(Base):
      __tablename__ = 'Task'
      id = Column('TaskID',Integer,primary_key=True)
      name = Column('TaskType',String(100),nullable=False)
      status=Column('status',Integer,nullable=False)
      time = Column('TaskTime',DateTime,nullable=False)
      addtime=Column('TaskaddTime',DateTime,default=datetime.datetime.now)
    
      def __repr__(self):
        return '<Task(id:%s, name:%s, time:%s)>' % (self.id,self.name,self.time)
    
    class TaskRecord(Base):
        __tablename__ = 'TaskRecord'
        id = Column('TaskID',Integer,primary_key=True)
        name = Column('TaskName',String(100),nullable=False)
        failtime = Column('TaskTime',DateTime,nullable=False)
        fixtime=Column('TaskSuccessTime',DateTime,default=datetime.datetime.now)
    
        def __repr__(self):
            return '<TaskRecord(id:%s,name:%s,time:%s,fixtime:%s)>' %(self.id,self.name,self.time,self.fixtime)
    
    class TaskOps:
    
        def Add_Task(self,task):
            session.add(task)
            session.commit()
    
        def query_FailTask(self,name):
            res=session.query(Task).filter(Task.name==name,Task.status==0).all()
            return res
    
        def query_SucTask(self,name):
            res=session.query(Task).filter(Task.name==name,Task.status==1).order_by(Task.time.desc()).first()
            return res
    
        def delete_FailTask(self,name,time):
            session.query(Task).filter(Task.name==name,Task.time==time,Task.status==0).delete()
            session.commit()
    
    class TaskRecordOps:
    
        def Add_TaskRecord(self,task):
            session.add(task)
            session.commit()
    
        def query_TaskRecord(self):
            res=session.query(TaskRecord).all()
            return res
    
    Base.metadata.create_all(engine) #若存在表则不做,不存在则创建
    数据库工具

    定时任务设置

      添加定时任务 : crontab -e

      基本格式 :
       *  *  *  *  *  command
      分  时  日  月  周  命令
      解 释:
       第1列表示分钟1~59 每分钟用或者 /1表示
       第2列表示小时1~23(0表示0点)
       第3列表示日期1~31
       第4列表示月份1~12
       第5列标识号星期0~6(0表示星期天)
        第6列要运行的命令

    [root]# crontab -l
        */60 * * * * sh /root/auto-task/shell/hostcron.sh           每个整点时间点执行一次
         1 0 * * * sh /root/auto-task/shell/hostcronday.sh        每天的0点过1分执行一次

  • 相关阅读:
    LDAP2-创建OU创建用户
    GNE: 4行代码实现新闻类网站通用爬虫
    为什么每一个爬虫工程师都应该学习 Kafka
    新闻网页通用抽取器GNEv0.04版更新,支持提取正文图片与源代码
    写了那么久的Python,你应该学会使用yield关键字了
    新闻类网页正文通用抽取器
    为什么Python 3.6以后字典有序并且效率更高?
    为什么你需要少看垃圾博客以及如何在Python里精确地四舍五入
    数据工程师妹子养成手记——数据库篇
    一行js代码识别Selenium+Webdriver及其应对方案
  • 原文地址:https://www.cnblogs.com/yxh168/p/11906392.html
Copyright © 2011-2022 走看看