zoukankan      html  css  js  c++  java
  • python多线程爬虫设计及实现示例

    爬虫的基本步骤分为:获取,解析,存储。假设这里获取和存储为io密集型(访问网络和数据存储),解析为cpu密集型。那么在设计多线程爬虫时主要有两种方案:第一种方案是一个线程完成三个步骤,然后运行多个线程;第二种方案是每个步骤运行一个多线程,比如N个线程进行获取,1个线程进行解析(多个线程之间切换会降低效率),N个线程进行存储。

    下面我们尝试抓取http://www.chembridge.com/ 库存药品信息。

    首先确定url为http://www.chembridge.com/search/search.phpsearchType=MFCD&query='+line+'&type=phrase&results=10&search=1其中line为要搜索的药品信息(要搜索的药品信息保存在本地文件txt中),这里使用requests库进行http请求,获取页面的代码如下:

    url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
    response = requests.get(url,headers=self.headers[0],timeout=20)
    html_doc=response.text

    页面解析使用beautifulsoup库,部分代码如下:

    soup = BeautifulSoup(html_doc, 'lxml')
    div=soup.find(id='BBResults')
    if div:
        links=div.select('a.chemical')
        for link in links:
            try:
                self.get_page_link(link,line)
            except Exception as e:
                print('%s入库失败:'%line,e)
                time.sleep(self.relay*2)
                print('%s重新入库'%line)
                self.get_page_link(link,line)
                continue
    print('%s搜索完成'%line)
    def get_page_link(self,link,line):
            res=[]
            href=link.get('href')
            print(href)
            time.sleep(self.relay*2*random.randint(5,15)/10)
            r=requests.get(href,headers=self.headers[1],timeout=20)
            if r.status_code==200:
                parse_html=r.text
                soup1=BeautifulSoup(parse_html, 'lxml')
                catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
                # print(catalogs)
                table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
                if 'AmountPriceQty.' in table_headers:
                    index=table_headers.index('AmountPriceQty.')
                    catalog=catalogs[0]
                    trs=soup1.select('.form tbody tr')
                    if len(catalogs)>1:
                        catalog=catalogs[index]
                    for tr in trs:
                        if len(tr.select('td'))>1:
                            row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                            res.append(row)

    最后将res保存到mysql数据库:

    conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
    cursor = conn.cursor()
    sql = 'INSERT INTO chembridge VALUES(%s,%s,%s,%s)'
    cursor.executemany(sql,res)
    print('入库')
    conn.commit()
    cursor.close()
    conn.close()

     一、单线程爬虫封装的完整代码如下:

    # -*- coding:utf-8 -*-
    import requests,random,time
    from bs4 import BeautifulSoup
    import mysql.connector
    
    class Spider:
        def __init__(self):
            self.headers=[{
                'Host':'www.chembridge.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate',
                'Referer':'http://www.chembridge.com/search/search.php?search=1',
                'Connection':'keep-alive',
                'Upgrade-Insecure-Requests':'1'
            },
            {
                'Host':'www.hit2lead.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate, br'
            }]
            self.filename='MDL.txt'
    
        def get_page_link(self,link):
            res=[]
            href=link.get('href')
            print(href)
            parse_html=requests.get(href,headers=self.headers[1]).text
            soup1=BeautifulSoup(parse_html, 'lxml')
            catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
            print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
            print(table_headers)
            index=table_headers.index('AmountPriceQty.')
            catalog=catalogs[0]
            trs=soup1.select('.form tbody tr')
            # print(trs)
            if len(catalogs)>1:
                catalog=catalogs[index]
            for tr in trs:
                if len(tr.select('td'))>1:
                    row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                    res.append(row)
            print(res)
            conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
            cursor = conn.cursor()
            sql = 'INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)'
            cursor.executemany(sql,res)
            conn.commit()
            cursor.close()
            conn.close()
    
        def get_page(self,line):
            url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
            try:
                response = requests.get(url,headers=self.headers[0],timeout=20)
                print(response.status_code)
                html_doc=response.text
                # print(html_doc)
                soup = BeautifulSoup(html_doc, 'lxml')
                div=soup.find(id='BBResults')
                if div:
                    links=div.select('a.chemical')
                    for link in links:
                        self.get_page_link(link)
                relay=random.randint(2,5)/10
                print(relay)
                time.sleep(relay)
            except Exception as e:
                print('except:', e)
    
        def get_file(self,filename):
            i=0
            f=open(filename,'r')
            for line in f.readlines():
                line=line.strip()
                print(line)
                self.get_page(line)
                i=i+1
                print('第%s个'%(i))
            f.close()
    
        def run(self):
            self.get_file(self.filename)
    
    spider=Spider()
    starttime=time.time()
    spider.run()
    print('耗时:%f s'%(time.time()-starttime))

    二、多线程爬虫设计代码

    1.第一种设计方案的实现示例:

    # -*- coding:utf-8 -*-
    from threading import Thread
    import threading
    from queue import Queue
    import os,time,random
    import requests,mysql.connector
    from bs4 import BeautifulSoup
    from openpyxl.workbook import Workbook
    from openpyxl.styles import Font
    
    class ThreadCrawl(Thread):
        def __init__(self,tname,relay):
            Thread.__init__(self)
            #super(MyThread2, self).__init__()
            # self.queue=queue
            # self.lock=lock
            # self.conn=conn
            self.relay=relay*random.randint(5,15)/10
            self.tname=tname
            self.num_retries=3  #设置尝试重新搜索次数
            self.headers=[{
                'Host':'www.chembridge.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate',
                'Referer':'http://www.chembridge.com/search/search.php?search=1',
                'Connection':'keep-alive',
                'Upgrade-Insecure-Requests':'1'
            },
            {
                'Host':'www.hit2lead.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate, br'
            }]
    
        def run(self):
            print('%s 开始爬取'%self.tname)
            # line = my_queue.get()
            # print(line)
            # while not self.queue.empty():
            while len(words)>0:
                lock.acquire()
                line = words[0]
                words.pop(0)
                lock.release()
                self.get_page(line,self.num_retries)
                time.sleep(self.relay*random.randint(5,15)/10)
    
            while not my_queue.empty():
                line=my_queue.get()
                print('重新爬取%s...'%line)
                self.get_page(line,num_retries=1)
            print('%s 结束'%self.tname)
    
    
        #获取页面内容
        def get_page(self,line,num_retries=2):
            print('%s正在搜索%s...'%(self.tname,line))
            # write this thread task
            url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
            try:
                response = requests.get(url,headers=self.headers[0],timeout=20)
                status=response.status_code
                if status==200:
                    html_doc=response.text
                    # print(html_doc)
                    soup = BeautifulSoup(html_doc, 'lxml')
                    div=soup.find(id='BBResults')
                    if div:
                        links=div.select('a.chemical')
                        for link in links:
                            try:
                                self.get_page_link(link,line)
                            except Exception as e:
                                print('%s入库失败:'%line,e)
                                time.sleep(self.relay*2)
                                print('%s重新入库'%line)
                                self.get_page_link(link,line)
                                continue
                    print('%s搜索完成'%line)
                    lock.acquire()
                    global count
                    count=count+1
                    print('已完成%s个'%count)
                    lock.release()
                    # time.sleep(self.relay*random.randint(5,15)/10)
                else:
                    print('%s搜索%s网络异常,错误代码:%s'%(self.tname,line,status))
                    # time.sleep(self.relay*random.randint(5,15)/10)
                    if num_retries>0:
                        print('%s尝试重新搜索%s'%(self.tname,line))
                        time.sleep(self.relay*random.randint(5,15)/10)
                        self.get_page(line,num_retries-1)
                    else:
                        print('%s四次搜索失败!!!'%line)
                        my_queue.put(line)
                        # error_list.append(line)
    
            except Exception as e:
                print('%s搜索%s异常,error:'%(self.tname,line), e)
                # time.sleep(self.relay*random.randint(5,15)/10)
                if num_retries>0:
                    print('%s尝试重新搜索%s'%(self.tname,line))
                    time.sleep(self.relay*random.randint(5,15)/10)
                    self.get_page(line,num_retries-1)
                else:
                    print('%s四次搜索失败!!!'%line)
                    my_queue.put(line)
                    # error_list.append(line)
            # self.queue.task_done()
    
        #获取下一页链接并解析入库
        def get_page_link(self,link,line):
            res=[]
            href=link.get('href')
            print(href)
            time.sleep(self.relay*2*random.randint(5,15)/10)
            r=requests.get(href,headers=self.headers[1],timeout=20)
            if r.status_code==200:
                parse_html=r.text
                soup1=BeautifulSoup(parse_html, 'lxml')
                catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
                # print(catalogs)
                table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
                if 'AmountPriceQty.' in table_headers:
                    index=table_headers.index('AmountPriceQty.')
                    catalog=catalogs[0]
                    trs=soup1.select('.form tbody tr')
                    if len(catalogs)>1:
                        catalog=catalogs[index]
                    for tr in trs:
                        if len(tr.select('td'))>1:
                            row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                            res.append(row)
                    # print(res)
                    lock.acquire()
                    conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
                    cursor = conn.cursor()
                    try:
                        print('%s: %s正在入库...'%(line,catalog))
                        sql = 'INSERT INTO chembridge VALUES(%s,%s,%s,%s)'
                        cursor.executemany(sql,res)
                        conn.commit()
                    except Exception as e:
                        print(e)
                    finally:
                        cursor.close()
                        conn.close()
                        lock.release()
    
    def writeToExcel(datas,filename):
        # 在内存创建一个工作簿obj
        result_wb = Workbook()
        #第一个sheet是ws
        ws1 = result_wb.worksheets[0]
        # ws1=wb1.create_sheet('result',0)
        #设置ws的名称
        ws1.title = "爬取结果"
        row0 = ['catalog', 'amount', 'price', 'qty']
        ft = Font(name='Arial', size=11, bold=True)
        for k in range(len(row0)):
            ws1.cell(row=1,column=k+1).value=row0[k]
            ws1.cell(row=1,column=k+1).font=ft
        for i in range(1,len(datas)+1):
            for j in range(1,len(row0)+1):
                ws1.cell(row=i+1,column=j).value=datas[i-1][j-1]
        # 工作簿保存到磁盘
        result_wb.save(filename = filename)
    
    if __name__ == '__main__':
        starttime=time.time()
        lock = threading.Lock()
    
        words=[] # 存放搜索字段的数据
        basedir=os.path.abspath(os.path.dirname(__file__))
        filename='MDL.txt'
        file=os.path.join(basedir,filename) #文件路径
        f=open(file,'r')
        for line in f.readlines():
            line=line.strip()
            words.append(line)
        f.close()
    
        count=0  # 爬取进度计数
        # global my_queue
        my_queue = Queue() #FIFO队列,存放第一次搜索失败的字段,保证线程同步
        error_list=[] #存放最终搜索失败的字段数组
        threads=[]
    
        # 程序开始前清空数据库chembridge表数据
        conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
        cursor = conn.cursor()
        print('清空表...')
        cursor.execute('delete from chembridge')
        conn.commit()
        cursor.close()
        conn.close()
    
        num_threads=10  #设置爬虫数量
        relay=10  # 设置爬取时延,时延=relay*(0.5~1.5之间的随机数)
        threadList = []
        for i in range(1,num_threads+1):
            threadList.append('爬虫-%s'%i)
        # 开启多线程
        for tName in threadList:
            thread = ThreadCrawl(tName,relay)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
            time.sleep(1)
        # 主线程阻塞,等待所有子线程运行结束
        for t in threads:
            t.join()
    
        #将数据保存到excel
        conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
        cursor = conn.cursor()
        cursor.execute('select * from chembridge')
        datas=cursor.fetchall()
        conn.commit()
        cursor.close()
        conn.close()
        writeToExcel(datas,'result.xlsx')
    
        #统计结果
        while not my_queue.empty():
            error_line=my_queue.get()
            error_list.append(error_line)
        print('爬取完成!
    ')
        if len(error_list)==0:
            print('爬取失败列表:0个')
        else:
            print('总共爬取失败%s个:'%len(error_list),','.join(error_list))
        # print('爬取完成!')
        print('耗时:%f s'%(time.time()-starttime))

    words为存放搜索记录的数组,当搜索记录失败时,会立即尝试重新搜索,num_retries为每条记录的最大搜索次数。如果某条记录在搜索num_retries次后仍失败,会把访问失败的word加入my_queue队列中。

    当所有words搜索完时,会重新搜索my_queue中的所有word,循环直到my_queue为空(即所有word搜索成功)。

    注意:这里要注意python多线程的GIL,修改同一个全局变量要加锁。

    运行截图:

     

    2.第二种设计方案的实现示例

    urls_queue、html_queue和item_queue3分别存放要访问的url、要解析的页面和爬取到的结果。分别设计三个类,Fetcher类根据url进行简单的抓取,Parser类根据抓取内容进行解析,生成待保存的ItemSaver类进行Item的保存。当urls_queue、html_queue和item_queue3个队列同时为空时,所有子线程终止,任务结束。

    # coding=utf-8
    import threading
    import queue,requests
    import time,random
    import mysql.connector
    from bs4 import BeautifulSoup
    
    class Fetcher(threading.Thread):
        def __init__(self,urls_queue,html_queue):
            threading.Thread.__init__(self)
            self.__running=threading.Event()
            self.__running.set()
            self.urls_queue = urls_queue
            self.html_queue = html_queue
            self.num_retries=3  #设置尝试重新搜索次数
            self.headers={
                'Host':'www.chembridge.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate',
                'Referer':'http://www.chembridge.com/search/search.php?search=1',
                'Connection':'keep-alive',
                'Upgrade-Insecure-Requests':'1'
            }
    
        def run(self):
            while not self.urls_queue.empty():
            # while self.__running.isSet():
                line=self.urls_queue.get()
                print(line)
                time.sleep(2*random.randint(5,15)/10)
                # self.urls_queue.task_done()
                self.get_page(line,self.num_retries)
        def get_page(self,line,num_retries=2):
            url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
            try:
                response = requests.get(url,headers=self.headers,timeout=20)
                status=response.status_code
                if status==200:
                    html_doc=response.text
                    print(html_doc)
                    self.html_queue.put(html_doc)
                    # self.urls_queue.task_done()
                    print('%s搜索完成'%line)
                else:
                    print('搜索%s网络异常,错误代码:%s'%(line,status))
                    if num_retries>0:
                        print('尝试重新搜索%s'%(line))
                        time.sleep(2*random.randint(5,15)/10)
                        self.get_page(line,num_retries-1)
                    else:
                        print('%s四次搜索失败!!!'%line)
                        self.urls_queue.put(line)
    
            except Exception as e:
                print('%s搜索异常,error:'%line,e)
                if num_retries>0:
                    print('尝试重新搜索%s'%(line))
                    time.sleep(2*random.randint(5,15)/10)
                    self.get_page(line,num_retries-1)
                else:
                    print('%s四次搜索失败!!!'%line)
                    self.urls_queue.put(line)
    
        def stop(self):
            self.__running.clear()
    
    class Parser(threading.Thread):
        def __init__(self, html_queue,item_queue):
            threading.Thread.__init__(self)
            self.__running=threading.Event()
            self.__running.set()
            self.html_queue = html_queue
            self.item_queue = item_queue
            self.num_retries=3  #设置尝试重新搜索次数
            self.headers={
                'Host':'www.hit2lead.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate, br'
            }
        def run(self):
            while self.__running.isSet():
                print('html_queue长度: ',self.html_queue.qsize())
                # if self.html_queue.empty():
                #     break
                html_doc=self.html_queue.get()
                try:
                    soup = BeautifulSoup(html_doc, 'lxml')
                    div=soup.find(id='BBResults')
                    if div:
                        links=div.select('a.chemical')
                        for link in links:
                            self.get_page_link(link,self.num_retries)
                    relay=random.randint(20,50)/10
                    # print(relay)
                    time.sleep(relay)
                except Exception as e:
                    self.html_queue.put(html_doc)
                # self.html_queue.task_done()
    
        def get_page_link(self,link,num_retries=2):
            print('haha')
            time.sleep(2*random.randint(5,15)/10)
            res=[]
            href=link.get('href')
            print(href)
            response=requests.get(href,headers=self.headers,timeout=20)
            status=response.status_code
            if status==200:
                parse_html=response.text
                soup1=BeautifulSoup(parse_html, 'lxml')
                catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#获取catalog
                # print(catalogs)
                table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
                # print(table_headers)
                if 'AmountPriceQty.' in table_headers:
                    index=table_headers.index('AmountPriceQty.')
                    catalog=catalogs[0]
                    trs=soup1.select('.form tbody tr')
                    # print(trs)
                    if len(catalogs)>1:
                        catalog=catalogs[index]
                    for tr in trs:
                        if len(tr.select('td'))>1:
                            row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                            res.append(row)
                    # print(res)
                    self.item_queue.put(res)
            else:
                print('搜索%s网络异常,错误代码:%s'%(link,status))
                # time.sleep(self.relay*random.randint(5,15)/10)
                if num_retries>0:
                    print('尝试重新搜索%s'%(link))
                    time.sleep(random.randint(5,15)/10)
                    self.get_page_link(link,num_retries-1)
                else:
                    print('%s四次搜索失败!!!'%line)
        def stop(self):
            self.__running.clear()
    
    class Saver(threading.Thread):
        def __init__(self, item_queue):
            threading.Thread.__init__(self)
            self.__running=threading.Event()
            self.__running.set()
            self.item_queue = item_queue
    
        def run(self):
            # while not self.item_queue.empty():
            while self.__running.isSet():
                print('item_queue长度: ',self.item_queue.qsize())
                res=self.item_queue.get()
                print(res)
                conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
                cursor = conn.cursor()
                sql = 'INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)'
                cursor.executemany(sql,res)
                print('入库')
                conn.commit()
                cursor.close()
                conn.close()
        def stop(self):
            self.__running.clear()
    
    
    if __name__ == '__main__':
        starttime=time.time()
        lock = threading.Lock()
        urls_queue = queue.Queue()
        html_queue = queue.Queue()
        item_queue = queue.Queue()
    
        conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
        cursor = conn.cursor()
        print('清空表...')
        cursor.execute('delete from chembridge_test2')
        conn.commit()
        cursor.close()
        conn.close()
    
        print('start...')
    
        f=open('MDL1.txt','r')
        for line in f.readlines():
            line=line.strip()
            urls_queue.put(line)
        f.close()
    
        threads=[]
        for j in range(8):
            thread1 = Fetcher(urls_queue,html_queue)
            thread1.setDaemon(True)
            thread1.start()
            threads.append(thread1)
        for j in range(1):
            thread1 = Parser(html_queue,item_queue)
            thread1.setDaemon(True)
            thread1.start()
            threads.append(thread1)
        for j in range(2):
            thread1 = Saver(item_queue)
            thread1.setDaemon(True)
            thread1.start()
            threads.append(thread1)
    
    
        # while not urls_queue.empty():
        #     while not html_queue.empty():
        #         while not item_queue.empty():
        #             pass
        while True:
            time.sleep(0.5)
            if urls_queue.empty() and html_queue.empty() and item_queue.empty():
                break
    
        print('完成!')
        for t in threads:
            t.stop()
        for t in threads:
            t.join()
        print('end')
        print('耗时:%f s'%(time.time()-starttime))

    根据网络情况,设置线程数量,避免requests访问网络时阻塞。

    另外附上用scrapy实现的代码

    items.py

    import scrapy
    
    class ChemItem(scrapy.Item):
        # define the fields for your item here like:
        # name = scrapy.Field()
        catalog=scrapy.Field()
        amount=scrapy.Field()
        price=scrapy.Field()
        qty=scrapy.Field()

    quotes_spider.py

    # -*- coding: utf-8 -*-
    import scrapy
    from scrapy.selector import Selector
    from tutorial.items import ChemItem
    
    class QuotesSpider(scrapy.Spider):
        name = "quotes"
        # allowed_domains = ["chembridge.com"]
        headers=[{
                'Host':'www.chembridge.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate',
                'Referer':'http://www.chembridge.com/search/search.php?search=1',
                'Connection':'keep-alive',
                'Upgrade-Insecure-Requests':'1'
            },
            {
                'Host':'www.hit2lead.com',
                'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
                'Accept-Encoding':'gzip, deflate, br'
            }]
        def start_requests(self):
            start_urls = []
            f=open('MDL.txt','r')
            for line in f.readlines():
                line=line.strip()
                print(line)
                start_urls.append('http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1')
            for url in start_urls:
                yield scrapy.Request(url=url, callback=self.parse,headers=self.headers[0])
    
        def parse(self, response):
            links=response.css('#BBResults a.chemical::attr(href)').extract()
            for link in links:
                yield scrapy.Request(url=link,callback=self.parse_dir_contents,headers=self.headers[1])
    
        def parse_dir_contents(self, response):
            items=[]
            catalogs=response.css('form div.matter h2::text').extract()
            table_headers=[''.join(res.re(r'>(.*)</td>')) for res in response.css('form div.matter thead tr')]
            print(table_headers)
            index=table_headers.index('AmountPriceQty.')
            catalog=catalogs[0]
            trs=response.css('.form tbody tr')
            if len(catalogs)>1:
                catalog=catalogs[index]
            for tr in trs:
                if len(tr.css('td'))>1:
                    item=ChemItem()
                    # print(tr.css('td::text').extract())
                    # row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.css('td'))
                    item['catalog']=catalog
                    item['amount']=tr.css('td')[0].css('::text').extract()[0]
                    item['price']='|'.join(tr.css('td')[1].css('::text').extract())
                    print(len(tr.css('td::text')))
                    item['qty']=tr.css('td')[2].css('::text').extract()[0] if len(tr.css('td')[2].css('::text').extract())==1 else tr.css('td')[2].css('::attr(value)').extract()[0]
                    # self.log('Saved result %s' % item)
                    # print(tr.css('td::text')[0].extract())
                    yield item
                    # items.append(item)
            # return items

    pipelines.py

    #将数据存储到mysql数据库
    from twisted.enterprise import adbapi
    import MySQLdb
    import MySQLdb.cursors
    from scrapy import log
    
    class MySQLStorePipeline(object):
        def __init__(self, dbpool):
            self.dbpool = dbpool
    
        #数据库参数
        @classmethod
        def from_settings(cls, settings):
            dbargs = dict(
                host=settings['MYSQL_HOST'],
                db=settings['MYSQL_DBNAME'],
                user=settings['MYSQL_USER'],
                passwd=settings['MYSQL_PASSWD'],
                charset='utf8',
                cursorclass = MySQLdb.cursors.DictCursor,
                use_unicode= True,
            )
            dbpool = adbapi.ConnectionPool('MySQLdb', **dbargs)
            return cls(dbpool)
    
    
        # #数据库参数
        # def __init__(self):
        #     dbargs = dict(
        #          host = 'localhost',
        #          db = 'test',
        #          user = 'root',
        #          passwd = 'password',
        #          cursorclass = MySQLdb.cursors.DictCursor,
        #          charset = 'utf8',
        #          use_unicode = True
        #         )
        #     self.dbpool = adbapi.ConnectionPool('MySQLdb',**dbargs)
    
        '''
        The default pipeline invoke function
        '''
        def process_item(self, item,spider):
            res = self.dbpool.runInteraction(self.insert_into_table,item)
            res.addErrback(self.handle_error)
            return item
        #插入的表,此表需要事先建好
        def insert_into_table(self,conn,item):
                conn.execute('insert into chembridge(catalog, amount, price,qty) values(%s,%s,%s,%s)', (
                    item['catalog'],
                    item['amount'],
                     # item['star'][0],
                     item['price'],
                     item['qty']
                    ))
        def handle_error(self,e):
            log.err(e)

    settings.py

    FEED_EXPORTERS = {
        'csv': 'tutorial.spiders.csv_item_exporter.MyProjectCsvItemExporter',
    } #tutorial为工程名
    
    FIELDS_TO_EXPORT = [
        'catalog',
        'amount',
        'price',
        'qty'
    ]
    
    LINETERMINATOR='
    '
    
    
    ITEM_PIPELINES = {
       'tutorial.pipelines.MySQLStorePipeline': 300,
    }
    
    # start MySQL database configure setting
    MYSQL_HOST = 'localhost'
    MYSQL_DBNAME = 'test'
    MYSQL_USER = 'root'
    MYSQL_PASSWD = 'password'
    # end of MySQL database configure setting

    main.py

    # -*- coding: utf-8 -*-
    from scrapy import cmdline
    cmdline.execute("scrapy crawl quotes -o items.csv -t csv".split())

    最后运行main.py,将结果同时保存到csv文件和mysql数据库中。

  • 相关阅读:
    hadoop-1.2.1-1.x86_64.rpm、jdk-7u45-linux-x64.tar.gz安装(64位)
    如何卸载rpm包
    html+css基础知识总结
    css常用公共样式
    jquery时间控件datepicker
    jquery事件重复绑定的快速解决方法
    原生javascript里jsonp的实现原理
    Bootstrap分页插件--Bootstrap Paginator
    数据库基础知识(1)--数据库php连接
    jQuery radio取值,checkbox取值,select取值
  • 原文地址:https://www.cnblogs.com/dotafeiying/p/7159758.html
Copyright © 2011-2022 走看看