zoukankan      html  css  js  c++  java
  • python3爬虫-通过selenium获取TB商品

    from selenium import webdriver
    from selenium.webdriver.support.wait import WebDriverWait
    from selenium.webdriver.common.by import By
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.chrome.options import Options
    from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException
    from selenium.webdriver import ActionChains
    import time, json,datetime
    from lxml import etree
    
    TB_LOGIN_URL = 'https://login.taobao.com/member/login.jhtml'
    
    
    class MyException(Exception):
        def __init__(self, status, msg):
            self.status = status
            self.msg = msg
    
    
    class TBClass:
        def __init__(self):
            self.browser = None
            self.log_file = open("error.log", "a", encoding="utf-8")
    
        def __init_browser(self):
            options = Options()
            options.add_experimental_option('excludeSwitches', ['enable-automation'])
            # options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
            self.browser = webdriver.Chrome(options=options)
            self.browser.implicitly_wait(3)
            self.browser.maximize_window()
            self.browser.get(TB_LOGIN_URL)
            self.wait = WebDriverWait(self.browser, 10)
            self.ac = ActionChains(self.browser)
    
        def __switch_to_pwd_mode(self):
            if not self.browser.find_element_by_id("J_Static2Quick").is_displayed():
                self.browser.find_element_by_id("J_Quick2Static").click()
    
        def __input_user(self, username):
            input_user_tag = self.browser.find_element_by_id("TPL_username_1")
            input_user_tag.clear()
            input_user_tag.send_keys(username)
    
        def __input_pwd(self, password):
            input_pwd_tag = self.browser.find_element_by_id("TPL_password_1")
            input_pwd_tag.clear()
            input_pwd_tag.send_keys(password)
    
        def __is_tag_exist_css(self, selector):
            try:
                self.browser.find_element_by_css_selector(selector)
                return True
            except NoSuchElementException:
                return False
    
        def __lock_exist(self):
            return self.__is_tag_exist_css('#nc_1_wrapper') and self.browser.find_element_by_id(
                'nc_1_wrapper').is_displayed()
    
        def __slide_tag(self):
            bar_element = self.browser.find_element_by_id('nc_1_n1z')
            ActionChains(self.browser).drag_and_drop_by_offset(bar_element, 350, 0).perform()
            time.sleep(0.5)
            self.browser.get_screenshot_as_file('error.png')
            if self.__is_tag_exist_css('.errloading > span'):
                error_message_element = self.browser.find_element_by_css_selector('.errloading > span')
                error_message = error_message_element.text
                self.browser.execute_script('noCaptcha.reset(1)')
                raise MyException(10000, '滑动验证失败, message = ' + error_message)
    
        def __submit(self):
            self.browser.find_element_by_id('J_SubmitStatic').click()
            time.sleep(0.5)
            if self.__is_tag_exist_css("#J_Message"):
                error_message_element = self.browser.find_element_by_css_selector('#J_Message > p')
                error_message = error_message_element.text
                raise MyException(10001, '登录出错, message = ' + error_message)
    
        def login_tb(self, username, password):
            '''
            登录tb账号
            :param username: 用户名
            :param password: 密码
            :return:
            '''
            self.__init_browser()
            self.__switch_to_pwd_mode()
            time.sleep(0.5)
            self.__input_user(username)
            time.sleep(0.5)
            self.__input_pwd(password)
            if self.__lock_exist():
                self.__slide_tag()
            self.__submit()
    
        def goto_Tb_Home(self):
            '''
            进入tb首页
            :return:
            '''
            self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, "site-nav-menu-hd")))
            self.browser.find_element_by_link_text("淘宝网首页").click()
    
        def __search_goods(self, goods):
            '''
            在搜索框搜索商品
            :param goods: 商品名
            :return:
            '''
            self.wait.until(EC.presence_of_element_located((By.ID, "q")))
            searchTag = self.browser.find_element_by_id("q")
            searchTag.clear()
            searchTag.send_keys(goods, Keys.ENTER)
    
        def __page_slide(self):
            '''
            滑动页面功能,能更好获取到所有信息
            :return:
            '''
            height = self.browser.execute_script("return document.body.clientHeight;")
            js = "window.scrollTo(0, {});".format(height)
            self.browser.execute_script(js)
            html = self.browser.page_source
            return html
            # while True:
            #     nowHeight = self.browser.execute_script("return document.body.clientHeight;")
            #     if height == nowHeight:
            #         html = self.browser.page_source
            #         return html
            #     js = "window.scrollTo({}, {});".format(height, nowHeight)
            #     self.browser.execute_script(js)
            #     height = nowHeight
    
        def __page_turing(self):
            '''
            进行翻页操作
            :return:
            '''
            try:
                try:
                    next_tag = self.browser.find_elements_by_css_selector(".J_Ajax.num.icon-tag")[1]
                except IndexError:
                    next_tag = self.browser.find_element_by_css_selector(".J_Ajax.num.icon-tag")
                next_tag.click()
            except NoSuchElementException:
                raise MyException(10001, "商品访问完毕")
    
        def __parse_html(self, html):
            html_selector = etree.HTML(html)
            # 商品名
            goods_name = html_selector.xpath(
                "//div[@class='pic']//img/@alt")
    
            # 购买人数
            buy_people_num = html_selector.xpath("//div[@class='deal-cnt']/text()")
    
            # 商品价格
            goods_price = html_selector.xpath(
                "//div[contains(@class,price) and contains(@class,g_price) and contains(@class,g_price-highlight)]/strong/text()")
    
            # 店铺名字
            shop_name = html_selector.xpath("//div[@class='shop']/a/span[2]/text() ")
    
            # 店铺地址
            shop_addr = html_selector.xpath("//div[@class='location']/text()")
    
            zip_data = zip(goods_name, buy_people_num, goods_price, shop_name, shop_addr)
            for data in zip_data:
                dic = {}
                dic["goods_name"] = data[0]
                dic["buy_people_num"] = data[1]
                dic["goods_price"] = data[2] + ""
                dic["shop_name"] = data[3]
                dic["shop_addr"] = data[4]
                # print("商品名>>:", data[0])
                # print("购买人数>>:", data[1])
                # print("商品价格>>:", data[2] + "元")
                # print("店铺名字>>:", data[3])
                # print("店铺地址>>:", data[4])
                # print("*" * 100)
                yield dic
    
        def __write_to_json(self, dic: dict):
            data_json = json.dumps(dic, ensure_ascii=False)
            self.file.write(data_json + "
    ")
    
        def __is_tag_exist_xpath(self, xpath):
            try:
                self.browser.find_element_by_xpath(xpath)
                return True
            except NoSuchElementException:
                return False
    
        def login_verification(self):
            '''判断是否有验证登陆,有的话返回True,没有的话返回False'''
            time.sleep(5)
            iframe = self.browser.find_elements_by_tag_name('iframe')[0]
            self.browser.switch_to.frame(iframe)
            if not self.__is_tag_exist_xpath("//a[@class='ui-form-other']"):
                if not self.__is_tag_exist_xpath("//*[id='J_GetCode']"):
                    return False
                self.browser.find_element_by_id("J_GetCode").click()
                print("请打开手机获取验证码,并输入....")
                code_verify = input("验证码>>:").split()
                # 提交验证码
                self.browser.find_element_by_css_selector(".ui-input-checkcode-new").send_keys(code_verify, Keys.ENTER)
                return True
    
            self.browser.find_element_by_class_name("ui-form-other").click()
            # 等待元素被加载出来
            self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, ".ui-button-text.ui-button.ui-button-morange")))
    
            # 会获取到两个符合tag,取第二个就行,第二个就是通过短信验证码验证
            self.browser.find_elements_by_css_selector(".ui-button-text.ui-button.ui-button-morange")[1].click()
    
            # 等待id=J_GetCode被加载出来
            self.wait.until(EC.presence_of_element_located((By.ID, "J_GetCode")))
    
            self.browser.find_element_by_id("J_GetCode").click()
            print("请打开手机获取验证码,并输入....")
            code_verify = input("验证码>>:").split()
            # 提交验证码
            self.browser.find_element_by_css_selector(".ui-input-checkcode-new").send_keys(code_verify, Keys.ENTER)
            return True
    
        def search_goods(self, goods):
            self.__search_goods(goods)
            self.file = open("tb-{}.json".format(goods), "a", encoding="utf-8")
            n = 1
            while True:
                print("获取商品<{}>--第{}页数据".format(goods,n))
                html = self.__page_slide()
                for dic in self.__parse_html(html):
                    self.__write_to_json(dic)
                try:
                    self.__page_turing()  # 执行这个方法运行一段时间会抛出下面这个异常,直接进行下一个商品的爬取,这里就是一个小小的bug
                except MyException as e:
                    if e.status == 10001:
                        error_msg = "{}----{}信息访问完毕".format(datetime.datetime.now(),goods)
                        self.log_file.write(error_msg + "
    ")
                        self.file.close()
                        try:
                            goods = goods_list.pop(0)
                            self.search_goods(goods)
                        except IndexError:
                            error_msg = "{}----goods_list列表为空,爬取完毕".format(datetime.datetime.now())
                            self.log_file.write(error_msg + "
    ")
                            return
                except Exception as error:
                    error_msg = "{}----访问商品{}第{}页报错,错误信息为:{}".format(datetime.datetime.now(),goods, n, error.__str__())
                    self.log_file.write(error_msg + "
    ")
                    self.file.close()
                    try:
                        goods = goods_list.pop(0)
                        self.search_goods(goods)
                    except IndexError:
                        error_msg = "goods_list列表为空,爬取完毕"
                        self.log_file.write(error_msg + "
    ")
                        return
    
                time.sleep(20)
                n += 1
    
        def close_browser(self):
            '''
            关闭浏览器
            :return:
            '''
            self.browser.close()
    
        def __del__(self):
            '''
            进行关闭一些资源的操作
            :return:
            '''
    
            self.file.close()
            self.browser.close()
            self.log_file.close()
    
    
    if __name__ == '__main__':
    
        goods_list = ["水杯", "床上用品", "运动短裤"]
        username = ""     # 用户名
        password = ""     # 密码
        try:
            goods = goods_list.pop(0)
        except IndexError:
            raise MyException(10005, "goods_list不能为空")
        tb = TBClass()
    
        try:
            try:
                tb.login_tb(username, password)
            except MyException as e:
                print(e.msg)
                print("自动重启浏览器中....")
                tb.close_browser()
                tb.login_tb(username, password)
    
            if not tb.login_verification():
                tb.goto_Tb_Home()
            tb.search_goods(goods)
        finally:
            del tb
  • 相关阅读:
    字节输入流抽象类InputStream
    CentOS6.2使用yum安装LAMP及phpMyadmin(转)
    文件字节流类(不是抽象的)FileInputStream和FileOutputStream
    字节输出流抽象类OutputStream
    编译安装GD库出错解决(转)
    关于empty函数的判断
    PHP获取用户的用户的真实IP地址(转)
    字符输入输出流与字节输入输出流
    关于empty函数的输出
    thinkphp 多表 事务(转)
  • 原文地址:https://www.cnblogs.com/zhuchunyu/p/10765887.html
Copyright © 2011-2022 走看看