zoukankan      html  css  js  c++  java
  • newWebMethod

    #!/bin/env python
    #-*- encoding=utf8 -*-
    import traceback
    import os,ssl,random
    import string,zipfile
    import PublicMethod as PM
    from time import *
    from _weakref import proxy
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.select import Select
    from selenium.webdriver.common.action_chains import  ActionChains
    
    opera_list=['Opera','opera','op']
    firefox_list=['Firefox','firefox','ff']
    chrome_list=['Chrome','chrome','Google','google','gg']
    
    # 再操作一次元素,修饰器方法
    def try_again(func):
        def wrapper(self,*args,**kwargs):
            try:
                result=func(self,*args,**kwargs)
            except:
                error_msg=traceback.format_exc()
                if self.retry_error(error_msg):
                    result=func(self,*args,**kwargs)
                else:
                    raise Exception(error_msg)
            PM.random_sleep(1,3)
            return result
        return wrapper
    
    # Web.异常截图修饰器
    def save_screen_shot(func):
        def wrapper(self,*args,**kwargs):
            try:
                func(self,*args,**kwargs)
            except:
                # 输出并保存错误信息
                traceback.print_exc()
                error_msg=traceback.format_exc()
                time=PM.get_time('%m%d_%H%M%S')
                log_path=self.task_path+'\log_%s.txt'%time
                traceback.print_exc(file=open(log_path,'w+'))
                # 根据错误类型判断是否需要重新运行
                self.web.rerun_error(error_msg)
                PM.output_result(self.task_path+'\output.txt')
                print "PM.fail['reason']:
    ",PM.fail['reason']
                # 获取当前页面的URL、HTML、截图
                self.web.get_error_page_info(log_path)
                print 'sleep 1000s'
                sleep(1000)
                # 关闭浏览器,并清除相关进程
                try:
                    self.driver.quit()
                    print 'Close Window by modifier!'
                except:
                    print u'浏览器已关闭(E)!'
                close_browser_by_pid(self.python_pid,self.driver_pid,self.mozilla_pid)
                raise Exception('Error thrown by modifier !')
        return wrapper
    
    # 获取任务相关进程PID
    def get_all_pid(browser):
    
        if browser in chrome_list:
            driver='chromedriver'
            mozilla='chrome'
        elif browser in firefox_list:
            driver='geckodriver'
            mozilla='firefox'
        elif browser in opera_list:
            driver='operadriver'
            mozilla='opera'
        # 获取python_pid
        python_pid=os.getpid()
        # 获取driver_pid
        try:
            result=os.popen('wmic process where (ParentProcessId=%s) get Caption,ProcessId'%python_pid)
            res=result.read()
            for line in res.splitlines():
                if driver in line:
                    driver_pid=line.split('  ')[1]
        except:
            driver_pid=None
        # 获取mozilla_pid
        try:
            result=os.popen('wmic process where (ParentProcessId=%s) get Caption,ProcessId'%driver_pid)
            res=result.read()
            for line in res.splitlines():
                if mozilla in line:
                    mozilla_pid=line.split('  ')[1]
        except:
            mozilla_pid=None
        print python_pid,driver_pid,mozilla_pid
        return python_pid,driver_pid,mozilla_pid
    
    # 关闭浏览器,及其相关进程
    def close_browser_by_pid(python_pid,driver_pid,mozilla_pid):
        try:
            os.popen('taskkill /F /pid:%s /T'%mozilla_pid)
            print 'kill mozilla by pid !'
        except:
            print 'kill mozilla by pid failed !'
        try:
            os.popen('taskkill /F /pid:%s /T'%driver_pid)
            print 'kill driver by pid !'
        except:
            print 'kill driver by pid failed !'
        print 'python PID:%s'%python_pid
    
    # 配置浏览器,并启动
    class StartBrowser():
    
        def __init__(self,url,param_dict,task_path=None):
            self.url=url
            self.headless=False
            self.browser=param_dict['browser']
            self.cookies=param_dict['cookies']
            self.task_path=param_dict['task_path']
            self.ip=param_dict['ip']
            self.port=param_dict['port']
            self.ip_username=param_dict['ip_username']
            self.ip_passwd=param_dict['ip_passwd']
            self.binary_location=param_dict['binary_location']
    
        # 生成Chrome代理IP插件
        def create_proxyauth_extension(self,scheme='http',plugin_path=None):
    
            manifest_json = """
            {
                "version": "1.0.0",
                "manifest_version": 2,
                "name": "Chrome Proxy",
                "permissions": [
                    "proxy",
                    "tabs",
                    "unlimitedStorage",
                    "storage",
                    "<all_urls>",
                    "webRequest",
                    "webRequestBlocking"
                ],
                "background": {
                    "scripts": ["background.js"]
                },
                "minimum_chrome_version":"22.0.0"
            }
            """
    
            background_js = string.Template(
            """
            var config = {
                    mode: "fixed_servers",
                    rules: {
                      singleProxy: {
                        scheme: "${scheme}",
                        host: "${host}",
                        port: parseInt(${port})
                      },
                      bypassList: ["foobar.com"]
                    }
                  };
    
            chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
    
            function callbackFn(details) {
                return {
                    authCredentials: {
                        username: "${username}",
                        password: "${password}"
                    }
                };
            }
    
            chrome.webRequest.onAuthRequired.addListener(
                        callbackFn,
                        {urls: ["<all_urls>"]},
                        ['blocking']
            );
            """
            ).substitute(host=self.ip,port=self.port,username=self.ip_username,password=self.ip_passwd,scheme=scheme)
            with zipfile.ZipFile(plugin_path,'w') as zp:
                zp.writestr("manifest.json", manifest_json)
                zp.writestr("background.js", background_js)
    
            return plugin_path
    
        # Chrome代理登录
        def chrome_proxy_options(self):
            chrome_options=webdriver.ChromeOptions()
            chrome_options.add_argument('disable-infobars')# 不显示浏览器正被自动化软件控制的提示
            chrome_options.add_argument("--start-maximized")# 最大化窗口启动
            chrome_options.add_argument('--ignore-certificate-errors')
            # 设置为无头浏览器
            if self.headless:
                chrome_options.add_argument('--headless')
                chrome_options.add_argument('--disable-gpu')
            # 添加代理ip
            if self.ip is not None:
                chrome_options.add_argument('--proxy-server=http://%s:%s'%(self.ip,self.port))
            # Opera浏览器需要添加binary_location
            if self.browser in opera_list:
                chrome_options.binary_location=self.binary_location
            # 添加用户名/密码认证IP
            if self.ip_username is not None:
                print u'  创建一个代理IP插件!'
                path='%s/vimm_chrome_proxyauth_plugin.zip'%self.task_path
                proxyauth_plugin_path=self.create_proxyauth_extension(plugin_path=path)
                chrome_options.add_extension(proxyauth_plugin_path)
            return chrome_options
    
        # Firefox代理登录
        def firefox_proxy_options(self):
            profile=webdriver.FirefoxProfile()
            # 设置为无头浏览器
            if self.headless:
                profile.set_preference('-headless') #设置无头模式
            # 添加代理ip
            if self.ip is not None:
                profile.set_preference('network.proxy.type',1)
                profile.set_preference('network.proxy.http',self.ip)
                profile.set_preference('network.proxy.http_port',self.port)
                profile.set_preference('network.proxy.ssl',self.ip)
                profile.set_preference('network.proxy.ssl_port',self.port)
            profile.update_preferences()
            return profile
    
        # 检查IP是否正确
        def check_IP(self,driver):
            try_again=False
            try:
                driver.get('http://pv.sohu.com/cityjson?ie=utf-8')
                sleep(5)
                page_source=driver.page_source
            except:
                page_source=u'打开检查IP地址的URL失败!'
            print 'page_source:
    %s'%page_source
            # if self.ip is not None:
            # 对比IP地址
            if self.ip in page_source:
                print u'  代理IP设置正确!'
            else:
                driver.quit()
                if self.port=='3128':
                    self.port=58378
                elif self.port=='58378':
                    self.port=3128
                try_again=True
            return try_again
    
        # 打开浏览器
        def _open_browser(self):
            # 创建driver
            for i in range(2):
                if self.browser in opera_list:
                    options=self.chrome_proxy_options()
                    driver=webdriver.Opera(options=options)
                elif self.browser in firefox_list:
                    profile=self.firefox_proxy_options()
                    driver=webdriver.Firefox(firefox_profile=profile)
                elif self.browser in chrome_list:
                    chrome_options=self.chrome_proxy_options()
                    driver=webdriver.Chrome(chrome_options=chrome_options)
                # 检查IP是否正确
                if self.ip is not None:
                    r=self.check_IP(driver)
                    if r:
                        if i==1:
                            PM.fail['reason']=u'代理IP设置错误,退出浏览器!'
                            raise Exception
                    else:
                        # 如果返回无需重新设置代理,退出循环
                        break
                else:
                    break
            # driver.execute_script("function(){Object.defineProperties(navigator,{webdriver:{get:() =&amp;gt; false}})}")
            # sleep(1000)
            # 打开URL
            # 判断是否需要添加cookies
            if self.cookies!=None:
                print u'使用cookie认证登录'
                driver.get(self.url)# 初次建立连接,随后方可修改cookie
                driver.delete_all_cookies()# 删除第一次建立连接时的cookie
                for cookie in self.cookies:
                    if self.browser=='Firefox':
                        del cookie['domain']
                    driver.add_cookie(cookie)
            # 最大化浏览器,打开url,返回driver
            driver.maximize_window()
            sleep(3)
            # 使用cookie时再次访问页面,便可实现免登录访问
            driver.get(self.url)
            return driver
    
        # 打开浏览器。如果打开异常,返回异常信息
        def open_browser(self):
            # 启动浏览器,创建driver
            try:
                driver=self._open_browser()
            except:
                error_msg=traceback.format_exc()
                PM.fail['reason']=u'浏览器启动失败!
    原因:
    %s'%error_msg
                print PM.fail['reason']
                PM.output_result(self.task_path+'\output.txt')
                python_pid,driver_pid,mozilla_pid=get_all_pid(self.browser)
                driver.quit()
                close_browser_by_pid(python_pid,driver_pid,mozilla_pid)
                raise Exception(error_msg)
            return driver
    
    # 定义操作元素的方法
    class WebMethod():
    
        def __init__(self,driver,task_path=None):
            self.driver=driver
            self.task_path=task_path
    
        def c_time(self):
            return strftime('%Y-%m-%d %H:%M:%S',localtime())
    
        # **
        # 异常信息处理方法
        # *
        # 从页面获取信息,判断异常原因
        def get_page_error(self):
            # 网络异常错误信息收集
            try:
                reload_button=self.is_exists('//button[@id="reload-button"]')
                print 'reload_button:%s'%reload_button
            except:
                reload_button=False
            try:
                error_page=self.is_exists('//div[id="errorPageContainer"]')
                print 'error_page:%s'%error_page
            except:
                error_page=False
            try:
                network_error=self.is_exists('//div[@id="main-message"]/h1[@jsselect="heading"]')
                print 'network_error:%s'%network_error
            except:
                network_error=False
            try:
                tryAgain_button=web.is_exists('//button[@id="errorTryAgain"]')
                print 'tryAgain_button:%s'%tryAgain_button
            except:
                tryAgain_button=False
            # 其它异常信息收集
            try:
                is_relogin=self.is_exists('//span[@id="GREET-SIGN-IN-TO-EBAY"]')
            except:
                is_relogin=False
            if reload_button or network_error or error_page:
                PM.fail['reason']=u'网络连接错误,请重新运行!'
            elif is_relogin:
                PM.fail['reason']=u'eBay需要反复登录,请检查账号状态!'
            # 如果错误原因不再是默认的,引用该方法时将错误原因写入output
            if PM.fail['reason']!='unknown reasons,please contact IT':
                return True
            else:
                return False
    
        # 可以再操作一次元素的异常情况
        def retry_error(self,error_msg):
            try_again=False
            # if 'NoSuchElementException:' in error_msg:
            #     try_again=True
            if 'Command.GO_BACK' in error_msg:
                try_again=True
            elif 'Command.CLICK_ELEMENT' in error_msg:
                try_again=True
            elif 'Command.GET_CURRENT_URL' in error_msg:
                try_again=True
            elif 'Command.GET_PAGE_SOURCE' in error_msg:
                try_again=True
            elif 'ElementClickInterceptedException' in error_msg:
                try_again=True
                # driver.set_elmt_not_visb()
                PM.random_sleep(5,10)
            if try_again:
                print u'  **再次执行当前操作···'
            return try_again
    
        # 可以重新运行程序的异常情况
        def rerun_error(self,error_msg):
            rerun=False
            '''
            TimeoutException: Message: Timeout loading page after 300000ms
            WebDriverException: Message: Failed to decode response from marionette
            WebDriverException: Message: Reached error page: about:neterror?e=netTimeout
            WebDriverException: Message: Reached error page: about:neterror?e=nssFailure
            '''
            if self.get_page_error():
                rerun=True
            elif self.retry_error(error_msg):
                rerun=True
            elif 'WebDriverException:' in error_msg:
                rerun=True
            elif 'unknown session' in error_msg:
                rerun=True
            elif 'unknown sessionId' in error_msg:
                rerun=True
            elif 'invalid session id' in error_msg:
                rerun=True
            elif 'Message: no such session' in error_msg:
                rerun=True
            elif 'session deleted because of page crash' in error_msg:
                rerun=True
            elif 'error: [Errno' in error_msg:
                rerun=True
            elif 'MaxRetryError' in error_msg:
                rerun=True
            elif 'in check_response' in error_msg:
                if 'Message: timeout' in error_msg:
                    rerun=True
            if rerun:
                PM.fail['reason']=u'未知原因失败,可以尝试重新运行一次。如再次相同原因失败,请联系开发人员!
    错误原因:
    %s'%error_msg
            return rerun
    
    
        # **
        # locator处理方法
        # *
        # 判断定位元素使用的定位方式
        def return_loc_type(self,locator):
            if locator.startswith('/'):
                return By.XPATH
            else:
                return By.CSS_SELECTOR
    
        # 判断元素是否存在
        def is_exists(self,locator,timeout=1):
            element_obj=0
            loc_list=PM.return_loc_list(locator)
            # print 'locator:',locator
            for i in range(timeout):
                for loc in loc_list:
                    loc_type=self.return_loc_type(loc)
                    elmt_list=self.driver.find_elements(by=loc_type,value=loc)
                    length=len(elmt_list)
                    # print loc_type,length,loc
                    if length>=1:
                        element_obj=1
                        break
                if length>=1:
                    print(loc)
                    break
                elif timeout>1:
                    sleep(1)
            # 超时设置超过1s的都是需要操作元素的,所以需要返回元素对象
            if timeout>1:
                # 找到了元素,就返回元素对象。如果找不到元素,直接就报出了错误
                element_obj=self.driver.find_element(by=loc_type,value=loc)
            return element_obj
    
        # 判断元素是否可见
        def is_displayed(self,locator,timeout=1):
            result=0
            locator=PM.return_loc_list(locator)
            for i in range(timeout):
                for loc in locator:
                    loc_type=self.return_loc_type(loc)
                    elmt_list=self.driver.find_elements(by=loc_type,value=loc)
                    length=len(elmt_list)
                    if length>=1:
                        # 元素已存在当前页面,判断元素是否可见
                        visible=self.driver.find_element(by=loc_type,value=loc).is_displayed()
                        if visible:
                            result=1
                            print 'This element is visible:%s'%loc
                        break
                if length>=1:
                    break
                elif timeout>1:
                    sleep(1)
            return result
    
        # **
        # locator操作方法
        # *
        # 等待元素出现
        def wait_element_OK(self,locator,timeout=20):
            print('[%s] waiting element is visible...By locator:'%self.c_time())
            self.is_exists(locator,timeout)
            PM.random_sleep(1,3)
    
        # 点击元素
        @try_again
        def click_element(self,locator,timeout=20):
            print('[%s] clicking element...By locator:'%self.c_time())
            element_obj=self.is_exists(locator,timeout)
            element_obj.click()
    
        # 输入文本
        @try_again
        def input_text(self,text,locator,timeout=20):
            print('[%s] entering text...By locator:'%self.c_time())
            element_obj=self.is_exists(locator,timeout)
            element_obj.clear()
            sleep(1)
            for str in text:
                element_obj.send_keys(str)
                PM.random_sleep(0.2,1) 
    
        # 获取元素指定属性值
        @try_again
        def get_attr(self,attr,locator,timeout=20):
            print('[%s] getting element attribute values...By locator:'%self.c_time())
            element_obj=self.is_exists(locator,timeout)
            return element_obj.get_attribute(attr)
    
        # 获取标签对中间的文本
        @try_again
        def get_text(self,locator,timeout=20):
            print('[%s] getting element text...By locator:'%self.c_time())
            element_obj=self.is_exists(locator,timeout)
            return element_obj.text
    
        # 鼠标悬停于某元素上
        @try_again
        def mouse_over(self,locator,timeout=20):
            print('[%s] mouse is hovering...By locator:'%self.c_time())
            element_obj=self.is_exists(locator,timeout)
            ActionChains(self.driver).move_to_element(element_obj).perform()
    
        # 选择选项,使用option的文本 
        @try_again
        def select_by_attr(self,locator,attr,attr_type='value',timeout=20):
            print('[%s] selecting drop-down box elements by %s...By locator:'%(self.c_time(),attr_type))
            element_obj=self.is_exists(locator,timeout)
            if attr_type=='text':
                Select(element_obj).select_by_visible_text(attr)
            elif attr_type=='index':
                Select(element_obj).select_by_index(attr)
            else:
                Select(element_obj).select_by_value(attr)
    
        # 返 回
        @try_again
        def _go_back(self):
            print('[%s] returning to the previous page...By locator:'%self.c_time())
            self.driver.back()
    
        # 返回至上一级页面
        def go_back(self,url=None,desc=u'上一个页面'):
            result=False
            url1=self.driver.current_url
            for i in range(2):
                self._go_back()
                for j in range(5):
                    sleep(2)
                    url2=self.driver.current_url
                    if url1!=url2:
                        print u'  点击go back,成功返回至%s'%desc
                        result=True
                        break
                if result:
                    break
                elif i==1:
                    if url is None:
                        print u'  返回%s时,页面跳转失败!'%desc
                    else:
                        self.driver.get(url)
                        print u'  返回至%s,URL:%s'%(desc,url)
            return result
    
        # 判断点击元素后页面是否发生跳转,如果没有再点击一次
        def click_goTo(self,locator,timeout=20):
            result=False
            print('[%s] clicking to jump the page...By locator:'%self.c_time())
            url1=self.driver.current_url
            for i in range(2):
                self.click_element(locator,timeout)
                for j in range(5):
                    sleep(2)
                    url2=self.driver.current_url
                    if url1!=url2:
                        print u'  页面跳转成功!Locator:%s'%locator
                        result=True
                        break
                if result:
                    break
                elif i==1:
                    print u'  页面跳转失败!Locator:%s'%locator
            return result
    
        # **
        # Others
        # *
        # 从某个元素位置滑动到另一个元素位置
        def adjust_element(self,start_loc,end_loc,timeout=20):
            print('[%s] adjusting element position...By locator:'%self.c_time())
            if type(start_loc)==int:
                start_Y=start_loc# 起始位置Y坐标
            else:
                element_obj=self.is_exists(start_loc,timeout)
                start_position=element_obj.location
                start_Y=start_position['y']# 起始位置Y坐标
            if type(end_loc)==int:
                end_Y=end_loc# 结束位置Y坐标
            else:
                element_obj=self.is_exists(end_loc,timeout)
                end_position=element_obj.location
                end_Y=end_position['y']# 结束位置Y坐标
            # 向下滑
            if start_Y<end_Y:
                next_Y=start_Y+random.randint(330,590)# 首次滑动距离
                for i in range(20):
                    PM.random_sleep(1,3)
                    if next_Y>end_Y:
                        break
                    else:
                        self.driver.execute_script('window.scrollTo(0,%d)'%next_Y)
                        next_Y+=random.randint(330,590)# 每次增加的滑动距离
            # 向上滑
            elif start_Y>end_Y:
                next_Y=start_Y-random.randint(330,590)# 首次滑动距离
                for j in range(20):
                    PM.random_sleep(1,3)
                    if next_Y<end_Y:
                        break
                    else:
                        self.driver.execute_script('window.scrollTo(0,%d)'%next_Y)
                        next_Y-=random.randint(330,590)# 每次增加的滑动距离
            # 最终直接滑动到结束位置
            self.driver.execute_script('window.scrollTo(0,%d)'%(end_Y-200))
    
        # 在页面中随意滚动鼠标滚轮,并滚动回页面顶部
        def random_scroll_page(self,min,max):
            print('[%s] random sliding of pages...By locator:'%self.c_time())
            max_count=random.randint(min,max)
            # 第一次滑动到的Y坐标位置
            y_position=random.randint(310,380)
            for i in range(max_count):
                self.driver.execute_script('window.scrollTo(0,%d)'%y_position)
                y_position+=random.randint(330,390)
                PM.random_sleep(1,3)
            for j in range(max_count+1):
                self.driver.execute_script('window.scrollTo(0,%d)'%y_position)
                y_position-=random.randint(430,590)
                PM.random_sleep(0.5,2)
            # 回到页面顶部
            self.driver.execute_script('window.scrollTo(0,0)')
            PM.random_sleep(1,3)
    
        # 选择新打开的窗口
        def select_new_window(self):
            print('[%s] selecting the newly opened browser window...By locator:'%self.c_time())
            for i in range(10):
                sleep(2)
                all_handles=self.driver.window_handles
                if len(all_handles)>1:
                    break
                elif i==9:
                    print u'新窗口打开失败,一共有%s个窗口!'%len(all_handles)
                    result=False
            current_handle=self.driver.current_window_handle
            for handle in all_handles:
                if handle!=current_handle:
                    self.driver.switch_to_window(handle)
                    result=True
            return result
    
        # 收集错误页面的信息,并写入log.txt
        def get_error_page_info(self,log_path):
            print('[%s] Error occurred. Getting the current page information...By locator:'%self.c_time())
            fp=open(log_path,'a+')
            try:
                # 保存当前页面URL
                fp.writelines(self.driver.current_url)
                # 保存当前页面HTML
                with open("%s\error_page_source.html"%self.task_path,"w") as f:
                    p.writelines(self.driver.page_source.encode("utf-8"))
                # 截取当前页面图片
                time=PM.get_time('%m%d_%H%M%S')
                file_path='%s\error_page%s.png'%(self.task_path,time)
                self.driver.get_screenshot_as_file(path)
                print u'
    页面截图保存路径:
    %'%file_path
            except:
                fp.writelines(u'无法执行webdriver提供的方法,请检查session是否丢失!')
            finally:
                fp.close()

    未完成***

  • 相关阅读:
    页码数求0到9共有多少个
    reduce
    map,filter
    匿名函数 lambda
    递归
    python 切片
    函数
    集合(set)
    python 中random 库的使用
    printf 输出?
  • 原文地址:https://www.cnblogs.com/liuyun66535309/p/10337275.html
Copyright © 2011-2022 走看看