zoukankan      html  css  js  c++  java
  • 【监控脚本】利用selenium自动化测试样例一

    import argparse
    import logging
    import psycopg2
    import datetime,time
    import os,sys
    from selenium import webdriver
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support.ui import Select
    from concurrent import futures
    
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s-[%(filename)s:%(lineno)s]-[%(threadName)s]- %(message)s',datefmt='%a, %d %b %Y %H:%M:%S')
    pool = futures.ThreadPoolExecutor(max_workers=10)
    WAIT_TIME=30
    
    
    def query_postgres(all_args,sql):
        pg_conn=None
        try:
            pg_conn = psycopg2.connect(database=all_args.database, user=all_args.user, password=all_args.password, host=all_args.host, port=all_args.port)
            cur = pg_conn.cursor()
            cur.execute(sql)
            return cur.fetchall()
        except Exception as e:
            logging.error("Query Postgresql Failed , %s",e)
        finally:
            if pg_conn:
                pg_conn.close()
    
    
    def get_org_user_list(all_args):
        if not all_args.org:
            orgIds=[r[0] for r in query_postgres(all_args,"""select (info->>'_id') as orgId from "sxacc-organizations" ;""")]
        else:
            orgIds=all_args.org.split(',')
        #logging.info("Need Verify Orgs: %s",orgIds)
        sql="select username,organization_id from calixuser where organization_id in ("+(','.join(r for r in orgIds))+") and (username like 'mig-admin@%' or username like 'implementation@%'); "
        result=[]
        org_map={}
        for r in query_postgres(all_args,sql):
            if not org_map.has_key(r[1]):
                result.append({"orgId":str(r[1]),"username":r[0],"password":("Migration%s"%r[1] if str(r[0]).startswith("mig-admin") else "Impl%s"%r[1])})
                org_map[r[1]]=r[0]
        return result
    
    def screen_shapshoot(driver,orgId,name,str_date):
        screen_dir=os.path.join("screen",str_date)
        if not os.path.isdir("screen"):
            os.mkdir("screen")
        if not os.path.isdir(screen_dir):
            os.mkdir(screen_dir)
        screen_dir=os.path.join("screen",str_date,str(orgId))
        if not os.path.isdir(screen_dir):
            os.mkdir(screen_dir)
        file_name=os.path.join(screen_dir,"%s.png"%(name))
        driver.get_screenshot_as_file(file_name)
    
    def get_call_adv_reports(driver,orgId,str_date):
        driver.get("https://xxx/support/")
        locator = (By.ID, "netops")
        WebDriverWait(driver, WAIT_TIME).until(EC.presence_of_element_located(locator))
        time.sleep(5)
        try:
            driver.find_element_by_id('reportops').click()
            i=0
            for s in driver.find_elements_by_class_name('report-summary'):
                name=driver.find_elements_by_class_name('name')[i].text
                s.click()
                time.sleep(10)
                screen_shapshoot(driver,orgId,'call_adv_%s'%str(name).lstrip(),str_date)
                i+=1
        except Exception as e:
            logging.error("call adv error, %s",e)
            logging.error("org %s has no call adv report",orgId)
    
    
    
    def get_checked_device(driver,orgId,str_date):
        driver.find_element_by_id('netops').click()
        driver.find_element_by_id('deviceReports').click()
        s1=Select(driver.find_element_by_id('device_report_type'))
        s1.select_by_value("checkInDeviceReport")
        s2=Select(driver.find_element_by_id('time_period'))
        s2.select_by_value("1")
        driver.find_element_by_id('query-report-btn').click()
        locator = (By.TAG_NAME, "th")
        WebDriverWait(driver, WAIT_TIME).until(EC.presence_of_element_located(locator))
        screen_shapshoot(driver,orgId,"DeviceReportList",str_date)
        sn=driver.find_element_by_class_name('serialnumber').text
        driver.get("https://xxx/support/cpe-troubleshooting?serialNumber=%s"%sn)
        locator=(By.CLASS_NAME,"table-responsive")
        WebDriverWait(driver,WAIT_TIME).until(EC.presence_of_element_located(locator))
        locator=(By.CLASS_NAME,"fa-spinner")
        WebDriverWait(driver,WAIT_TIME).until_not(EC.presence_of_element_located(locator))
        time.sleep(5)
        screen_shapshoot(driver,orgId,'cpeTroubleshooting',str_date)
        driver.find_element_by_id('ccl_support_smartcheck_tab').click()
        locator=(By.CLASS_NAME,"smartcheck-btn-details")
        WebDriverWait(driver,WAIT_TIME).until(EC.presence_of_element_located(locator))
        locator=(By.CLASS_NAME,"fa-spinner")
        WebDriverWait(driver,WAIT_TIME).until_not(EC.presence_of_element_located(locator))
        try:
            for s in driver.find_elements_by_class_name('refresh'):
                s.click()
                time.sleep(5)
            time.sleep(10)
            locator=(By.CLASS_NAME,"fa-spinner")
            WebDriverWait(driver,WAIT_TIME).until_not(EC.presence_of_element_located(locator))
            screen_shapshoot(driver,orgId,'smartcheck',str_date)
            for i in range(0,4):
                print driver.find_elements_by_class_name('type')[i].text
                if i<3:
                    driver.find_elements_by_class_name('smartcheck-btn-details')[i].click()
                else:
                    driver.find_elements_by_class_name('smartcheck-btn-details')[2].click()
                time.sleep(10)
                screen_shapshoot(driver,orgId,'smartcheck_%s'%str(driver.find_elements_by_class_name('type')[i].text).lstrip(),str_date)
                driver.find_element_by_id("detail-close").click()
        except Exception as e:
            logging.error("refresh smart check failed,%s",e)
    
    
    def verify_csc_details(driver,orgId,str_date):
        get_checked_device(driver,orgId,str_date)
        get_call_adv_reports(driver,orgId,str_date)
    
    def close_terms_of_service_page(driver):
        try:
            driver.find_element_by_class_name('close').click()
        except Exception as e:
            logging.error("Close Button not exists")
    
    
    
    def test_cmc_functions(driver,orgId,str_date):
        driver.get("https://xxx/marketing/insights")
        WebDriverWait(driver, WAIT_TIME).until(EC.title_contains("Marketing"))
        close_terms_of_service_page(driver)
        time.sleep(5)
        for item in ['past_28_days','past_month','past_2_months']:
            driver.find_element_by_id(item).click()
            time.sleep(10)
            screen_shapshoot(driver,orgId,'cmc_insights_%s'%item,str_date)
        return
    
    def test_cloud_login(orgId,username,password,str_date,driver):
        driver.get("https://calixcloud.calix.com/login")
        driver.find_element_by_name('userName').send_keys(username)
        driver.find_element_by_name('password').send_keys(password)
        driver.find_element_by_class_name('btn').click()
        try:
            WebDriverWait(driver, WAIT_TIME).until(EC.title_contains("Support"))
            close_terms_of_service_page(driver)
            locator = (By.ID, "netops")
            WebDriverWait(driver, WAIT_TIME).until(EC.presence_of_element_located(locator))
            screen_shapshoot(driver,orgId,"Login_Success",str_date)
            logging.info("org:%s verify Successed",orgId)
        except Exception as e:
            screen_shapshoot(driver,orgId,"Login_Faild",str_date)
            logging.error("Login to org: %s failed , exception: %s",orgId,e)
            logging.error("org:%s verify Failed",orgId)
    
    
    def test_cloud(orgId,username,password,str_date):
        logging.info("Begin verify org:%s",orgId)
        driver = webdriver.Firefox()
        #driver = webdriver.Chrome()
        try:
            test_cloud_login(orgId,username,password,str_date,driver)
            verify_csc_details(driver,orgId,str_date)
            test_cmc_functions(driver,orgId,str_date)
        except Exception as e:
            logging.error("test cloud faild :%s",e)
        finally:
            driver.close()
        return
    
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument('-d', '--database', help='database for postgres', default='postgres')
        parser.add_argument('-u', '--user', help='user for postgres', default='postgres')
        parser.add_argument('-p', '--password', help='password for postgres', default='postgres')
        parser.add_argument('-n', '--host', help='host for postgres', default='127.0.0.1')
        parser.add_argument('-P', '--port', help='port for postgres', default='5432')
        parser.add_argument('-o', '--org', help="orgs list,split by ,")
        all_args = parser.parse_args()
    
        pool = futures.ThreadPoolExecutor(max_workers=3)
        #driver = webdriver.Firefox()
        #all_args.org='7583'
        for i in range(1):
            str_date=datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
            for r in get_org_user_list(all_args):
                #test_csc_functions(r['orgId'],r['username'],r['password'],str_date,driver)
               pool.submit(test_cloud,r['orgId'],r['username'],r['password'],str_date)
            time.sleep(60)
    
    if __name__ == '__main__':
        main()
    

      

  • 相关阅读:
    CF1305 Ozon Tech Challenge 2020 游戏存档
    CF1310A Recommendations 题解
    CF755G PolandBall and Many Other Balls 题解
    关于后缀自动机
    具体数学学习笔记
    Flask-SQLAlchemy中解决1366报错
    常用的SQLalchemy 字段类型
    flask_model防止循环引用
    navicate远程访问ubuntu上的mysql数据库
    flask运行环境搭建(nginx+gunicorn)
  • 原文地址:https://www.cnblogs.com/tben/p/11771034.html
Copyright © 2011-2022 走看看