zoukankan      html  css  js  c++  java
  • 爬虫提高性能:串行、线程进程、异步非阻塞

    - 高性能相关模块:	
    	- gevent		# 源码用C实现
    	- twisted		# 用的比较多,源码用python实现
    	- tornado		# 源码用python实现
    	- ayncio		# 源码用C实现
    	- 	现象:一个线程实现并发请求
    		本质:socket+IO多路复用
    

    问:10个URL,爬虫获取到数据?

    一、 串行

    url_list = [
    	'http://www.cnblogs.com/xuyaping/p/7667055.html',
    	'http://www.baidu.com',
    	'http://www.xiaohuar.com',
    ]
    import requests
    
    # 1.串行(6s,用了一个线程或进程)
    for url in url_list:
    	response = requests.get(url)
    	print(response.content)
    

    二、 线程、进程

    # 2.线程,进程。耗费资源提高网络请求。(3s,用了3个线程或进程)
    # 不是创建越多的线程和进程就好,线程之间的切换耗时,效率很低。使用线程池
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor   # ThreadPoolExecutor线程池,ProcessPoolExecutor进程池
    # python2中没ThreadPoolExecutor线程池
    
    def tast(url):
    	response = requests.get(url)
    	print(response.content)
    
    pool = ThreadPoolExecutor(10)     #最多10个线程
    # pool = ProcessPoolExecutor(10)   # pool改为pool = ProcessPoolExecutor(10)就是进程池了
    
    for url in url_list:
    	pool.submit(tast,url)       # tast为函数名,url为参数。去线程池中获取一个线程,执行tast函数
    pool.shutdown(wait=True)        # 等待上面的线程都执行完再往下走
    

    三、 异步非阻塞

    # 3.异步非阻塞的方式:本质是socket。
    # 异步:回调,执行完后再回调这个函数。
    # 非阻塞:不等。创建socket对象,连接,发送数据,接收数据一气呵成的,不等每个操作是否执行完毕。
    # 阻塞:100个请求,向远程发连接,每个请求先执行connect,连接成功才能发送,连接的时候是堵塞的,第一个url连接,第二个等着第一个处理完。
    # 并且第一个连接也要等着,等着发消息,等连接成功才能发消息,然后返回结果。
    # client = socket();client.connet(ip,端口)
    # 非阻塞:第一个url来了,发连接,发过去不等,往下走要发送消息,但这时候发送消息可能会失败,因为可能还未连接成功,可能会报错,然后紧接着收消息,收不到报错。
    # 所以单纯给socket设置上非阻塞一定会报错,所以这里非阻塞指的不是排队这个,而是一个url来了后是否阻塞。
    # client = socket(); client.setblocking(False); client.connet(ip,端口)
    # 异步非阻塞的方式:100个url请求同时进行,先不发消息,全部只连接,当其中有url请求连接成功,告诉下我要发数据,这叫回调动作。收到回调动作后拿到结果再执行下一步操作。
    

    a. asyncio

    python3.3后增加的内置模块asyncio,但是该模块只能发tcp请求(socket的请求),不能发http请求,更偏向底层些。
    也可以自己封装构造http请求。但不常用,太偏向底层。
    
    import asyncio
    @asyncio.coroutine
    def fetch_async(host, url='/'):
    
    
        print(host, url)
        reader, writer = yield from asyncio.open_connection(host, 80)                # open_connection,连接会阻塞,不等
        
        # 发数据
        request_header_content = """GET %s HTTP/1.0
    Host: %s
    
    """ % (url, host,)        
        # GET %s HTTP/1.0
    Host: %s 构造请求头的一部分,
    
     分割请求头请求体。封装成这种类型的发给TCP,TCP以为是http协议
        request_header_content = bytes(request_header_content, encoding='utf-8')
        
        
        writer.write(request_header_content)
        yield from writer.drain()
        text = yield from reader.read()            # 等待用户返回数据,等到返回结果后才往下走
        print(host, url, text)
        writer.close()
    
    tasks = [
        fetch_async('www.cnblogs.com', '/wupeiqi/'),
        fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    asyncio

    b. asyncio + aiohttp

    把数据封装成http协议,再把数据发送给asyncio
    
    import aiohttp
    import asyncio
    
    @asyncio.coroutine
    def fetch_async(url):
        print(url)
        response = yield from aiohttp.request('GET', url)    # 内部连接、发消息、等数据回来
        print(url, response)
        response.close()
    
    tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]
    
    event_loop = asyncio.get_event_loop()
    results = event_loop.run_until_complete(asyncio.gather(*tasks))
    event_loop.close()
    asyncio + aiohttp

    c. asyncio + requests

    原理同上面二个,封装的更深
    
    import asyncio
    import requests
    
    @asyncio.coroutine
    def fetch_async(func, *args):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, func, *args)
        response = yield from future
        print(response.url, response.content)
    
    tasks = [
        fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
        fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    asyncio + requests

    d. gevent + requests

    gevent本身是没有协程的功能,内部一栏greenlet模块,greenlet模块才是真正实现协程的。依赖libevent...C的一个库。
    因为greenlet遇到IO阻塞不能自动切换执行另外一个请求,不够智能。
    

    greenlet:遇到switch切换执行另一个请求

    from greenlet import greenlet 
    def test1():
        print 12                # 第4步
        gr2.switch()            # 第5步
        print 34                # 第8步
        gr2.switch()            # 第9步
     
    def test2():
        print 56                # 第6步
        gr1.switch()            # 第7步
        print 78                # 第10步
     
    gr1 = greenlet(test1)        # 第1步
    gr2 = greenlet(test2)        # 第2步
    gr1.switch()                # 第3步
    greenlet切换

    和gevent配合是遇到IO阻塞时才切换执行另一个请求,完成异步非阻塞

    import gevent
    import requests
    from gevent import monkey
    
    monkey.patch_all()                # 将request.get(...)或request.post(...)内部的socket替换成setblocking(False)非阻塞的socket
    
    def fetch_async(method, url, req_kwargs):
        print(method, url, req_kwargs)
        response = requests.request(method=method, url=url, **req_kwargs)
        print(response.url, response.content)
    
    # ##### 发送请求 #####
    gevent.joinall([
        gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
    ])
    
    # ##### 发送请求(协程池控制最大协程数量) #####
    from gevent.pool import Pool
    pool = Pool(None)
    gevent.joinall([
        pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
        pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
        pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
    ])
    gevent + requests

    e. grequests

    本质是gevent + requests的封装
    
    import grequests
    
    request_list = [
        grequests.get('http://httpbin.org/delay/1', timeout=0.001),
        grequests.get('http://fakedomain/'),
        grequests.get('http://httpbin.org/status/500')
    ]
    
    
    # ##### 执行并获取响应列表 #####
    response_list = grequests.map(request_list)
    print(response_list)
    
    
    # ##### 执行并获取响应列表(处理异常) #####
    def exception_handler(request, exception):
    print(request,exception)
        print("Request failed")
    
    response_list = grequests.map(request_list, exception_handler=exception_handler)
    print(response_list)
    grequests

    f. Twisted

    全部是一下执行到底,没有等待,因为根本没有发数据,只是创建了个对象,发送了连接的请求
    
    from twisted.web.client import getPage, defer
    from twisted.internet import reactor
    
    
    def all_done(arg):
        reactor.stop()
    
    
    def callback(contents):
        print(contents)
    
    
    deferred_list = []
    
    url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
    
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf8'))            # getPage相当于requests模块。deferred是创建的对象
        deferred.addCallback(callback)                            # addCallback回调,这里是异步
        deferred_list.append(deferred)
    
    dlist = defer.DeferredList(deferred_list)
    dlist.addBoth(all_done)                            # 所有的请求都执行完了,执行all_done函数,中止,防止reactor.run()不停的死循环
    
    reactor.run()            # reactor.run()内部是死循环,deferred_list是二个socket对象,检测deferred_list是否连接成功,成功发请求返回数据,直到数据全部返回死循环还是中止不了
    twisted

    g. Tornado

    和Twisted类似
    
    from tornado.httpclient import AsyncHTTPClient
    from tornado.httpclient import HTTPRequest
    from tornado import ioloop
    
    
    def handle_response(response):
        """
        处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
        :param response: 
        :return: 
        """
        if response.error:
            print("Error:", response.error)
        else:
            print(response.body)
    
    
    def func():
        url_list = [
            'http://www.baidu.com',
            'http://www.bing.com',
        ]
        for url in url_list:
            print(url)
            http_client = AsyncHTTPClient()
            http_client.fetch(HTTPRequest(url), handle_response)        # 每个循环结束,执行handle_response回调
    
    
    ioloop.IOLoop.current().add_callback(func)
    ioloop.IOLoop.current().start()                    # 开始循环执行handle_response
    tornado

    h. Twisted补充

    from twisted.internet import reactor
    from twisted.web.client import getPage
    import urllib.parse
    
    
    def one_done(arg):
        print(arg)
        reactor.stop()
    
    post_data = urllib.parse.urlencode({'check_data': 'adf'})
    post_data = bytes(post_data, encoding='utf8')
    headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
    response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
                       method=bytes('POST', encoding='utf8'),
                       postdata=post_data,
                       cookies={},
                       headers=headers)
    response.addBoth(one_done)
    
    reactor.run()
    View Code

    总结:

    gevent + requests、Twisted、asyncio + requests比较常用,
    按优先级 Twisted > gevent + requests > asyncio + requests或者asyncio + aiohttp
    

     

    四、自定制异步IO模型

    a. socket客户端
    	obj = socket()
    	# obj.connect((198.1.1.1,80))
    	obj.connect((http://dig.chouti.com/,80)) # 阻塞
    	
    	
    	obj.send('GET /index http1.1
    host:...
    content-type:xxxxx
    
    ')
    	
    	obj.recv(1024) # 最多接收字节 # 阻塞
    	
    	
    	obj.close()
    	
    	
    	
    	示例:基于socket实现http请求
    		############ 阻塞 ############
    		import socket
    
    		client = socket.socket()
    		# 连接
    		client.connect(("43.226.160.17",80)) # 阻塞		# ping dig.chouti.com ---> 43.226.160.17
    
    		# 发送请求
    		data = b"GET / HTTP/1.0
    host: dig.chouti.com
    
    "
    		client.sendall(data)
    
    
    		response = client.recv(8096) # 阻塞
    		print(response)
    
    		client.close()
    	
    	
    	
    		############ 非阻塞 ############
    	
    		import socket
    
    		client = socket.socket()
    		client.setblocking(False)	# client.setblocking(0)	都可以,设置成非阻塞
    		try:
    			# 连接
    			client.connect(("43.226.160.17",80)) 		# 连接的请求已经发送出去
    		except BlockingIOError as e:
    			print(e)
    		# 发送请求
    		data = b"GET / HTTP/1.0
    host: dig.chouti.com
    
    "
    		client.sendall(data)
    
    
    		response = client.recv(8096) 	# 非阻塞,但接收不到消息,也会报错
    		print(response)
    
    		client.close()
    				
    	
    	
    	总结:
    		发送Http请求
    		非阻塞,会报错 使用try
    		定义一些操作
    		
    		
    b. IO多路复用,用来检测【多个】socket对象是否有变化
    
    	伪代码,实现异步非阻塞
    	
    		socket_list = []
    			
    		for i in [www.baid.......,.....]
    		
    			client = socket.socket()
    			client.setblocking(False)
    			# 连接
    			try:
    				client.connect((i,80)) # 连接的请求已经发送出去,
    			except BlockingIOError as e:
    				print(e)
    			socket_list.append(client)
    	
    		
    		# 事件循环
    		while True:
    			r,w,e = select.select(socket_list,socket_list,[],0.05)
    			# w, 是什么?[sk2,sk3],连接成功了
    			for obj in w:
    				obj.send("GET / http/1.0....")
    			# r,是什么? [sk2,sk3], 要收数据了
    			for obj in r:
    				response = obj.recv(...)
    				print(response)
    					   
    		
    	知识点:
    		client.setblocking(False)
    		select.select检测:连接成功,数据回来了 
    
    import socket
    import select
    
    class Request(object):
    def __init__(self,sock,info):
        self.sock = sock
        self.info = info
    
    def fileno(self):
        return self.sock.fileno()
    
    
    class Test(object):
    def __init__(self):
        self.sock_list = []
        self.conns = []
    
    
    def add_request(self,req_info):
        """
        创建请求
        :param req_info: {'host':'www.baidu.com','port':80,'path':'/'}
        :return:
        """
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect((req_info['host'],req_info['port']))
        except BlockingIOError as e:
            pass
    
        obj = Request(sock,req_info)
        self.sock_list.append(obj)
        self.conns.append(obj)
    
    
    def run(self):
        '''开始事件循环,检测连接是否成功,数据是否返回'''
        while True:
            #select.select([socket对象,]),其实不是仅限于socket对象,可以是任何对象,但这个对象一定要有fileno方法
            # select.select([socket对象,])拿到的不是socket对象,而是socket对象方法fileno的返回值。对象.fileno()
            r,w,e = select.select(self.sock_list,self.conns,[],0.05)
            # select.select(self.sock_list,self.conns,[],0.05) ---> select.select([request对象,])
            # w,是否连接成功,w有值连接成功
            for obj in w:
                data = "GET %s http/1.1
    host:%s
    
    "%(obj.info['path'],obj.info['host'])
                obj.sock.send(data.encode('utf8'))
                self.conns.remove(obj)
             # 数据返回,接收到数据
            for obj in r:
                response = obj.sock.recv(8096)
                print(obj.info['host'],response)
                obj.info['callback'](response)
                self.sock_list.remove(obj)
    
    
            # 所有的请求已经返回
            if not self.sock_list:
                break
    异步非阻塞模块原理
    from .test import Test
    
    
    def done1(response):
        print(response)
    
    def done2(response):
        print(response)
    
    url_list = [
        {'host':'www.baidu.com','port':80,'path':'/','callback':done1},
        {'host':'www.cnblogs.com','port':80,'path':'/index.html','callback':done2},
        {'host':'www.bing.com','port':80,'path':'/','callback':done2},
    ]
    
    test = Test()
    for item in url_list:
        test.add_request(item)
    test.run()
    使用

    以上是Twisted和Tornado实现异步非阻塞模块的原理

  • 相关阅读:
    python eval() 进行条件匹配
    spring boot 学习
    JAVA基础
    在mac上进行JAVA开发
    移动端开发基础【8】页面生命周期
    数据挖掘【1】概述(引言)
    项目管理【26】 | 项目成本管理-规划成本管理
    项目管理【24】 | 项目进度管理-控制进度
    项目管理【25】 | 项目成本管理-成本管理概念
    操作系统【8】 Linux虚拟内存和物理内存
  • 原文地址:https://www.cnblogs.com/xuyaping/p/7755491.html
Copyright © 2011-2022 走看看