zoukankan      html  css  js  c++  java
  • selenium webdriver (python)

    import os, sys

    os_sep = os.sep
    base_dir = os.path.dirname(os_sep.join(os.path.abspath(file).split(os_sep)[0:-2]))
    sys.path.append(base_dir)

    from core.utils import MysqlHelper
    import time
    import logging
    import requests
    import threading

    from selenium import webdriver

    logging.basicConfig(level=logging.INFO,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s[thread:%(thread)d][process:%(process)d]',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='%s/log/%s%s.log' % (
    base_dir, time.strftime('%Y%m%d', time.localtime(time.time())), file.split(os_sep)[-1]),
    filemode='a')

    class MyThread(threading.Thread):
    def init(self, func, args, name):
    threading.Thread.init(self)
    self.func, self.args, self.name = func, args, name

    def run(self):
        self.func(self.args)
    

    url_counter = 0

    def main():
    # 最近3600秒内url在test_tab0具有唯一性
    mysql_obj = MysqlHelper()
    #
    """
    联合检测
    q = 'SELECT url FROM ( SELECT url FROM test_tab0 WHERE unix_timestamp(now()) - create_time<=3600 UNION ALL SELECT url FROM test_tab0_error where status!=2 ) AS t GROUP BY url;'
    未失效url检测
    q = 'SELECT url FROM test_tab0 WHERE unix_timestamp(now()) <expire_time;'
    当前半小时
    q = 'SELECT url FROM test_tab0 WHERE unix_timestamp(now()) - create_time<= 1800;'

    后期任务:
    test_tab0_error积累一定数据后对url重新检测
    #3个功能点:当前半个小时、当前未失效的url test_tab0内url的异常情况(当前的2个功能点)、(后期任务:test_tab0_error积累一定数据后对url重新检测)
    """
    
    q = 'SELECT url FROM test_tab0 WHERE unix_timestamp(now()) <expire_time;'
    
    tuple_l = mysql_obj.select(q)
    del mysql_obj
    if len(tuple_l) == 0:
        s = '无待检测url,程序退出'
        print(s)
        logging.info(s)
    
    # 考虑到每1小时执行下该脚本,对url异常的处理为:第一次请求为预期则终止请求,反之,间隔30后,再至多请求2次,每次间隔10s
    sleep_counter, sleep_step, sleep_seconds, mycode_l, repeat_times, repeat_sleep_times = 0, 20, 1, [
        'g3user.com', '51g3.com.cn'], 4, 10
    
    # 重构到基类 where list
    # d当前为为了f_l字段的需求改动
    def get_onerow(url, f_l=['title', 'uid', 'money_total'], tab='test_tab0'):
        mysql_obj = MysqlHelper()
        f_s = ','.join(f_l)
        q = 'SELECT %s FROM %s WHERE url="%s" ORDER BY id DESC LIMIT 1' % (f_s, tab, url)
        s = '%s%s' % (' DB ', q)
        logging.info(s)
        t = mysql_obj.select(q)
        if t != -1:
            t = t[0]
        del mysql_obj
        return t
    
    def chk_exception_url(url, sleep_seconds=0, http_tag='http://'):
        time.sleep(sleep_seconds)
        global url_counter
    
        ret = {}
        # db url状态值 状态 0:打不开 1:打开无广告 2:已处理
        ret['ok'], ret['status_code'], s = -1, -1, time.strftime('%Y%m%d %H:%M:%S', time.localtime(time.time()))
        try:
            if url.find('http') == -1:
                url = '%s%s' % (http_tag, url)
            r = requests.get(url)
            ret['status_code'], txt_pos = int(r.status_code), -1
            s = '%s,%s,%s,%s' % (s, ret['status_code'], url, r)
        except Exception as e:
            ret['ok'] = 0
            s = '%s %s %s' % (s, ' SPIDER ', e)
            logging.error(s)
            print(e, url)
    
        # 当前,仅考虑目标站返回200
        if ret['status_code'] == 200:
            driver = webdriver.PhantomJS(
                executable_path='/usr/local/phantomjs/bin/phantomjs')
            driver.get(url)
            time.sleep(1)
            page_source = driver.page_source
            for ii in mycode_l:
                if page_source.find(ii) > -1:
                    ret['ok'], txt_pos = 1, 1
                    break
            if txt_pos == -1:
                s = '%s%s' % (s, '返回200,但是在html中未检测到我公司代码。')
                ret['ok'], ret['info'] = 0, s
    
        # elif ret['status_code'] == 403:
        # www.hsdcw.com/fenlei/41668214.html
        elif ret['status_code'] == 403:
            pass
        else:
            ret['ok'], ret['info'] = 0, s
    
        url_counter += 1
        s = '%s/%s%s%s' % (url_counter, len(tuple_l), 'chk-ret', s)
        print(s)
        if ret['ok'] == 0:
            logging.warning(s)
        else:
            logging.info(s)
        return ret
    
    tn, tl, tstep = len(tuple_l), [], 5000
    
    def tf(ts):
    
        te = ts + tstep
        te = min(te, tn)
        for i in tuple_l[ts:te]:
            url = i[0]
            """
          针对新浪爱问的规则:  不检测
          """
            if url.find('iask.sina.com') > -1:
                continue
            write_db_flag = 1
            for t in range(0, repeat_times, 1):
                print('threadID', threading.get_ident(), url)
                ret = chk_exception_url(url, repeat_sleep_times)
                if ret['ok'] == 1:
                    write_db_flag = 0
                    break
    
            if write_db_flag == 1:
                try:
                    title, uid, money_total = get_onerow(url)
                except Exception as e:
                    s = '%s %s %s' % (' DB Exception-去test_tab0查', url, e)
                    logging.info(s)
                    print(s)
                    break
    
                # 多线程 考虑到原包的 数据库限制,每次均实例化数据库类,用后删除
                mysql_obj = MysqlHelper()
                q = 'SELECT id FROM test_tab0_error WHERE url="%s" LIMIT 1' % (url)
                print(q)
                try:
                    r = mysql_obj.select(q)
                    s = '%s%s' % ('-SQL-', q)
                    logging.info(s)
                    print(q)
                except Exception as e:
                    s = '%s%s %s' % (' DB Exception-', q, e)
                    logging.info(s)
                    print(s)
                    break
    
                ctime = int(time.time())
                db_status = 1 if ret['status_code'] == 200 else 0
                if len(r) == 0:
                    q = 'INSERT INTO test_tab0_error (title,url,status,remarks,update_time,create_time,uid,money) VALUES ("%s","%s","%s","%s","%s","%s","%s","%s")' % (
                        title, url, db_status, ret['info'], ctime, ctime, uid, money_total)
                elif len(r) == 1:
                    continue
                    """
                  q = 'UPDATE test_tab0_error SET title="%s",status="%s",remarks="%s",update_time="%s" ,uid="%s", money="%s"' % (
                        title, db_status, ret['info'], ctime, uid, money_total)    
                  后期处理test_tab0_error二次检测的更新                
                 """
                try:
                    mysql_obj.execute(q)
                    mysql_obj.commit()
                    del mysql_obj
                    s = '%s%s' % (' DB SQL ok ', q)
                    logging.info(s)
                    print(s)
                except Exception as e:
                    s = '%s%s %s' % (' DB Exception-', q, e)
                    logging.error(s)
                    print(s)
    
    for i in range(0, tn, tstep):
        if i >= tn:
            break
        thread_instance = MyThread(tf, (i), tf.__name__)
        tl.append(thread_instance)
    
    for t in tl:
        t.setDaemon = False
        t.start()
    for t in tl:
        t.join()
    

    if name == 'main':
    main()

  • 相关阅读:
    go引入包一直是红色,没有引入的解决办法
    php 把抛出错误记录到日志中
    亚马逊查询接口
    git 合并指定文件到另一个分支
    content-type
    Echarts(饼图Pie)
    DIN 模型速记
    DeepFM 要点速记
    youtube DNN 模型要点速记
    java设计模式之迭代器
  • 原文地址:https://www.cnblogs.com/rsapaper/p/6930764.html
Copyright © 2011-2022 走看看