zoukankan      html  css  js  c++  java
  • python3爬虫-通过selenium获取TB商品

    from selenium import webdriver
    from selenium.webdriver.support.wait import WebDriverWait
    from selenium.webdriver.common.by import By
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.chrome.options import Options
    from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException
    from selenium.webdriver import ActionChains
    import time, json,datetime
    from lxml import etree
    
    TB_LOGIN_URL = 'https://login.taobao.com/member/login.jhtml'
    
    
    class MyException(Exception):
        def __init__(self, status, msg):
            self.status = status
            self.msg = msg
    
    
    class TBClass:
        def __init__(self):
            self.browser = None
            self.log_file = open("error.log", "a", encoding="utf-8")
    
        def __init_browser(self):
            options = Options()
            options.add_experimental_option('excludeSwitches', ['enable-automation'])
            # options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
            self.browser = webdriver.Chrome(options=options)
            self.browser.implicitly_wait(3)
            self.browser.maximize_window()
            self.browser.get(TB_LOGIN_URL)
            self.wait = WebDriverWait(self.browser, 10)
            self.ac = ActionChains(self.browser)
    
        def __switch_to_pwd_mode(self):
            if not self.browser.find_element_by_id("J_Static2Quick").is_displayed():
                self.browser.find_element_by_id("J_Quick2Static").click()
    
        def __input_user(self, username):
            input_user_tag = self.browser.find_element_by_id("TPL_username_1")
            input_user_tag.clear()
            input_user_tag.send_keys(username)
    
        def __input_pwd(self, password):
            input_pwd_tag = self.browser.find_element_by_id("TPL_password_1")
            input_pwd_tag.clear()
            input_pwd_tag.send_keys(password)
    
        def __is_tag_exist_css(self, selector):
            try:
                self.browser.find_element_by_css_selector(selector)
                return True
            except NoSuchElementException:
                return False
    
        def __lock_exist(self):
            return self.__is_tag_exist_css('#nc_1_wrapper') and self.browser.find_element_by_id(
                'nc_1_wrapper').is_displayed()
    
        def __slide_tag(self):
            bar_element = self.browser.find_element_by_id('nc_1_n1z')
            ActionChains(self.browser).drag_and_drop_by_offset(bar_element, 350, 0).perform()
            time.sleep(0.5)
            self.browser.get_screenshot_as_file('error.png')
            if self.__is_tag_exist_css('.errloading > span'):
                error_message_element = self.browser.find_element_by_css_selector('.errloading > span')
                error_message = error_message_element.text
                self.browser.execute_script('noCaptcha.reset(1)')
                raise MyException(10000, '滑动验证失败, message = ' + error_message)
    
        def __submit(self):
            self.browser.find_element_by_id('J_SubmitStatic').click()
            time.sleep(0.5)
            if self.__is_tag_exist_css("#J_Message"):
                error_message_element = self.browser.find_element_by_css_selector('#J_Message > p')
                error_message = error_message_element.text
                raise MyException(10001, '登录出错, message = ' + error_message)
    
        def login_tb(self, username, password):
            '''
            登录tb账号
            :param username: 用户名
            :param password: 密码
            :return:
            '''
            self.__init_browser()
            self.__switch_to_pwd_mode()
            time.sleep(0.5)
            self.__input_user(username)
            time.sleep(0.5)
            self.__input_pwd(password)
            if self.__lock_exist():
                self.__slide_tag()
            self.__submit()
    
        def goto_Tb_Home(self):
            '''
            进入tb首页
            :return:
            '''
            self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, "site-nav-menu-hd")))
            self.browser.find_element_by_link_text("淘宝网首页").click()
    
        def __search_goods(self, goods):
            '''
            在搜索框搜索商品
            :param goods: 商品名
            :return:
            '''
            self.wait.until(EC.presence_of_element_located((By.ID, "q")))
            searchTag = self.browser.find_element_by_id("q")
            searchTag.clear()
            searchTag.send_keys(goods, Keys.ENTER)
    
        def __page_slide(self):
            '''
            滑动页面功能,能更好获取到所有信息
            :return:
            '''
            height = self.browser.execute_script("return document.body.clientHeight;")
            js = "window.scrollTo(0, {});".format(height)
            self.browser.execute_script(js)
            html = self.browser.page_source
            return html
            # while True:
            #     nowHeight = self.browser.execute_script("return document.body.clientHeight;")
            #     if height == nowHeight:
            #         html = self.browser.page_source
            #         return html
            #     js = "window.scrollTo({}, {});".format(height, nowHeight)
            #     self.browser.execute_script(js)
            #     height = nowHeight
    
        def __page_turing(self):
            '''
            进行翻页操作
            :return:
            '''
            try:
                try:
                    next_tag = self.browser.find_elements_by_css_selector(".J_Ajax.num.icon-tag")[1]
                except IndexError:
                    next_tag = self.browser.find_element_by_css_selector(".J_Ajax.num.icon-tag")
                next_tag.click()
            except NoSuchElementException:
                raise MyException(10001, "商品访问完毕")
    
        def __parse_html(self, html):
            html_selector = etree.HTML(html)
            # 商品名
            goods_name = html_selector.xpath(
                "//div[@class='pic']//img/@alt")
    
            # 购买人数
            buy_people_num = html_selector.xpath("//div[@class='deal-cnt']/text()")
    
            # 商品价格
            goods_price = html_selector.xpath(
                "//div[contains(@class,price) and contains(@class,g_price) and contains(@class,g_price-highlight)]/strong/text()")
    
            # 店铺名字
            shop_name = html_selector.xpath("//div[@class='shop']/a/span[2]/text() ")
    
            # 店铺地址
            shop_addr = html_selector.xpath("//div[@class='location']/text()")
    
            zip_data = zip(goods_name, buy_people_num, goods_price, shop_name, shop_addr)
            for data in zip_data:
                dic = {}
                dic["goods_name"] = data[0]
                dic["buy_people_num"] = data[1]
                dic["goods_price"] = data[2] + ""
                dic["shop_name"] = data[3]
                dic["shop_addr"] = data[4]
                # print("商品名>>:", data[0])
                # print("购买人数>>:", data[1])
                # print("商品价格>>:", data[2] + "元")
                # print("店铺名字>>:", data[3])
                # print("店铺地址>>:", data[4])
                # print("*" * 100)
                yield dic
    
        def __write_to_json(self, dic: dict):
            data_json = json.dumps(dic, ensure_ascii=False)
            self.file.write(data_json + "
    ")
    
        def __is_tag_exist_xpath(self, xpath):
            try:
                self.browser.find_element_by_xpath(xpath)
                return True
            except NoSuchElementException:
                return False
    
        def login_verification(self):
            '''判断是否有验证登陆,有的话返回True,没有的话返回False'''
            time.sleep(5)
            iframe = self.browser.find_elements_by_tag_name('iframe')[0]
            self.browser.switch_to.frame(iframe)
            if not self.__is_tag_exist_xpath("//a[@class='ui-form-other']"):
                if not self.__is_tag_exist_xpath("//*[id='J_GetCode']"):
                    return False
                self.browser.find_element_by_id("J_GetCode").click()
                print("请打开手机获取验证码,并输入....")
                code_verify = input("验证码>>:").split()
                # 提交验证码
                self.browser.find_element_by_css_selector(".ui-input-checkcode-new").send_keys(code_verify, Keys.ENTER)
                return True
    
            self.browser.find_element_by_class_name("ui-form-other").click()
            # 等待元素被加载出来
            self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, ".ui-button-text.ui-button.ui-button-morange")))
    
            # 会获取到两个符合tag,取第二个就行,第二个就是通过短信验证码验证
            self.browser.find_elements_by_css_selector(".ui-button-text.ui-button.ui-button-morange")[1].click()
    
            # 等待id=J_GetCode被加载出来
            self.wait.until(EC.presence_of_element_located((By.ID, "J_GetCode")))
    
            self.browser.find_element_by_id("J_GetCode").click()
            print("请打开手机获取验证码,并输入....")
            code_verify = input("验证码>>:").split()
            # 提交验证码
            self.browser.find_element_by_css_selector(".ui-input-checkcode-new").send_keys(code_verify, Keys.ENTER)
            return True
    
        def search_goods(self, goods):
            self.__search_goods(goods)
            self.file = open("tb-{}.json".format(goods), "a", encoding="utf-8")
            n = 1
            while True:
                print("获取商品<{}>--第{}页数据".format(goods,n))
                html = self.__page_slide()
                for dic in self.__parse_html(html):
                    self.__write_to_json(dic)
                try:
                    self.__page_turing()  # 执行这个方法运行一段时间会抛出下面这个异常,直接进行下一个商品的爬取,这里就是一个小小的bug
                except MyException as e:
                    if e.status == 10001:
                        error_msg = "{}----{}信息访问完毕".format(datetime.datetime.now(),goods)
                        self.log_file.write(error_msg + "
    ")
                        self.file.close()
                        try:
                            goods = goods_list.pop(0)
                            self.search_goods(goods)
                        except IndexError:
                            error_msg = "{}----goods_list列表为空,爬取完毕".format(datetime.datetime.now())
                            self.log_file.write(error_msg + "
    ")
                            return
                except Exception as error:
                    error_msg = "{}----访问商品{}第{}页报错,错误信息为:{}".format(datetime.datetime.now(),goods, n, error.__str__())
                    self.log_file.write(error_msg + "
    ")
                    self.file.close()
                    try:
                        goods = goods_list.pop(0)
                        self.search_goods(goods)
                    except IndexError:
                        error_msg = "goods_list列表为空,爬取完毕"
                        self.log_file.write(error_msg + "
    ")
                        return
    
                time.sleep(20)
                n += 1
    
        def close_browser(self):
            '''
            关闭浏览器
            :return:
            '''
            self.browser.close()
    
        def __del__(self):
            '''
            进行关闭一些资源的操作
            :return:
            '''
    
            self.file.close()
            self.browser.close()
            self.log_file.close()
    
    
    if __name__ == '__main__':
    
        goods_list = ["水杯", "床上用品", "运动短裤"]
        username = ""     # 用户名
        password = ""     # 密码
        try:
            goods = goods_list.pop(0)
        except IndexError:
            raise MyException(10005, "goods_list不能为空")
        tb = TBClass()
    
        try:
            try:
                tb.login_tb(username, password)
            except MyException as e:
                print(e.msg)
                print("自动重启浏览器中....")
                tb.close_browser()
                tb.login_tb(username, password)
    
            if not tb.login_verification():
                tb.goto_Tb_Home()
            tb.search_goods(goods)
        finally:
            del tb
  • 相关阅读:
    hdu 3790 最短路径问题
    hdu 2112 HDU Today
    最短路问题 以hdu1874为例
    hdu 1690 Bus System Floyd
    hdu 2066 一个人的旅行
    hdu 2680 Choose the best route
    hdu 1596 find the safest road
    hdu 1869 六度分离
    hdu 3339 In Action
    序列化和反序列化
  • 原文地址:https://www.cnblogs.com/zhuchunyu/p/10765887.html
Copyright © 2011-2022 走看看