zoukankan      html  css  js  c++  java
  • 爬虫之多线程 多进程 自定义异步IO框架

    什么是进程?

    进程是程序运行的实例,是系统进行资源分配和调度的一个独立单位,它包括独立的地址空间,资源以及1个或多个线程。

    什么是线程?

    线程可以看成是轻量级的进程,是CPU调度和分派的基本单位。

    进程和线程的区别?

    1.调度 :从上面的定义可以看出一个是调度和分派的基本单位,一个是拥有资源的基本单位

    2.共享地址空间,资源:进程拥有各自独立的地址空间,资源,所以共享复杂,需要用IPC,同步简单; 线程共享所属进程的资源,共享简单,但同步复杂,要通过加锁等措施。

    3.占用内存,cpu: 进程占用内存多,切换复杂,CPU利用率低; 线程占用内存少,切换简单,CPU利用率高。

    4.相互影响: 进程间不会相互影响; 一个线程挂掉会导致整个进程挂掉

    应用场景?
    计算型:就用进程,CPython有一个GIL锁 (GIL全称Global Interpreter Lock),同一时刻,只能使用一个进程使用。

    IO密集型:就用线程,因为IO操作不通过CPU。

    非常有趣的解释了什么是进程和线程

    参考博客:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html

    同步和异步的概念

    同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行。

    异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

    阻塞和非阻塞的概念

    阻塞是指IO操作需要彻底完成后才返回到用户空间。

    非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

    一、多线程

    可以实现并发
    但是,请求发送出去后和返回之前,中间时期线程空闲
    编写方式:
    - 直接返回处理
    - 通过回调函数处理

    方法一:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Author: nulige
    #线程
    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time

    def task(rul):
    response = requests.get(url)
    print(url, response)

    pool = ThreadPoolExecutor(6)

    url_list = [
    'http://huaban.com/favorite/beauty/',
    'https://www.bing.com/',
    'https://www.baidu.com/',
    'https://www.sina.com/',
    'https://www.zhihu.com/',
    'https://www.tencent.com/',
    ]

    for url in url_list:
    pool.submit(task,url)

    pool.shutdown(wait=True)

    方法二:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    # Author: nulige
    #进程
    
    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time
    
    def task(url):
        response = requests.get(url)
        return response
    
    
    def done(future,*args,**kwargs):
        response = future.result()
        print(response)
    
    pool = ThreadPoolExecutor(6)
    
    url_list = [
        'http://huaban.com/favorite/beauty/',
        'https://www.bing.com/',
        'https://www.baidu.com/',
        'https://www.sina.com/',
        'https://www.zhihu.com/',
        'https://www.tencent.com/',
    ]
    
    for url in url_list:
        v = pool.submit(task,url)
        v.add_done_callback(done)
    
    pool.shutdown(wait=True)

    二、多进程

    可以实现并发
    但是,请求发送出去后和返回之前,中间时期进程空闲
    编写方式:
        - 直接返回处理
        - 通过回调函数处理

    方法一:

    from concurrent.futures import ProcessPoolExecutor
    import requests
    import time
    
    def task(url):
        response = requests.get(url)
        print(url,response)
        # 写正则表达式
    
    
    pool = ProcessPoolExecutor(7)
    url_list = [
        'http://www.cnblogs.com/wupeiqi',
        'http://huaban.com/favorite/beauty/',
        'http://www.bing.com',
        'http://www.zhihu.com',
        'http://www.sina.com',
        'http://www.baidu.com',
        'http://www.autohome.com.cn',
    ]
    for url in url_list:
        pool.submit(task,url)
    
    pool.shutdown(wait=True)

    方法二:

    from concurrent.futures import ProcessPoolExecutor
    import requests
    import time
    
    def task(url):
        response = requests.get(url)
        return response
    
    def done(future,*args,**kwargs):
        response = future.result()
        print(response.status_code,response.content)
    
    pool = ProcessPoolExecutor(7)
    url_list = [
        'http://www.cnblogs.com/wupeiqi',
        'http://huaban.com/favorite/beauty/',
        'http://www.bing.com',
        'http://www.zhihu.com',
        'http://www.sina.com',
        'http://www.baidu.com',
        'http://www.autohome.com.cn',
    ]
    for url in url_list:
        v = pool.submit(task,url)
        v.add_done_callback(done)
    
    pool.shutdown(wait=True

    三、协程(微线程)+异步IO = 1个线程发送N个Http请求

    异步IO (asyncio IO)

    1、单线程,实现伪并发,同时处理两个任务,所有请求同时发出去,有返回值就继续执行。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    #Author: nulige
    
    import asyncio
    
    @asyncio.coroutine
    def task():
        print('before...task......')
        yield from asyncio.sleep(5) # 发送Http请求,支持TCP获取结果..
        print('end...task......')
    
    tasks = [task(), task()]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    执行结果:

    before...task......
    before...task......
    end...task......
    end...task......

    2、单线程,发送http请求原理(asyncio 内部完成了,异步IO操作)

    asyncio:内部其实就是自己封装了http数据包

    安装模块:

    pip3 install asyncio

    代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    #Author: nulige
    
    import asyncio
    
    @asyncio.coroutine
    def task(host, url='/'):
        print('start', host, url)
        reader, writer = yield from asyncio.open_connection(host, 80)  #创建连接
    
        request_header_content = "GET %s HTTP/1.0
    Host: %s
    
    " % (url, host,)  #http请求格式
        request_header_content = bytes(request_header_content, encoding='utf-8')
    
        writer.write(request_header_content)
        yield from writer.drain()
        text = yield from reader.read()
        #返回结果
        print('end', host, url, text)
        writer.close()
    
    #两个任务
    tasks = [
        task('www.cnblogs.com', '/wupeiqi/'),
        task('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    3、asyncio(异步IO)+aiohttp(内部封装Http数据包)

    安装模块:

    pip3 install aiohttp

    代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    #Author: nulige
    
    import aiohttp
    import asyncio
    
    @asyncio.coroutine
    def fetch_async(url):
        print(url)
        response = yield from aiohttp.request('GET', url)
        print(url, response)
        response.close()
    
    tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')]
    
    event_loop = asyncio.get_event_loop()
    results = event_loop.run_until_complete(asyncio.gather(*tasks))
    event_loop.close()

    4、asyncio+requests模块

    安装模块

    pip3 install requests
    

    代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #Author: nulige
    
    import asyncio
    import requests
    
    
    @asyncio.coroutine
    def task(func, *args):
        print(func,args)
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, func, *args) # requests.get('http://www.cnblogs.com/wupeiqi/')
        response = yield from future
        print(response.url, response.content)
    
    
    tasks = [
        task(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
        task(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    四、gevent,依赖greenlet(是个协程的模块)+异步IO(requests模块)

    协程是微线程,规定这个线程先执行点什么,再执行点什么。

    安装模块:

    pip3 install greenlet
    
    pip3 install gevent

    代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    #Author: nulige
    
    import gevent
    import requests
    from gevent import monkey
    monkey.patch_all()   #找到你内部封装的异步IO的socket
    
    
    def task(method, url, req_kwargs):
        #第一个任务来先传到这里
        print(method, url, req_kwargs)
        #request(get内部就是调用了request)
        response = requests.request(method=method, url=url, **req_kwargs)
        print(response.url, response.content)
    
    ##### 发送请求 #####
    # gevent.joinall([
    #     gevent.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}),
    #     gevent.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    #     gevent.spawn(task, method='get', url='https://github.com/', req_kwargs={}),
    # ])
    
    ##### 发送请求(协程池控制最大协程数量,就是控制往远程发送多少个请求) #####
    from gevent.pool import Pool
    pool = Pool(5)
    gevent.joinall([
        pool.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}),
        pool.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}),
        pool.spawn(task, method='get', url='https://www.github.com/', req_kwargs={}),
    ])

    执行结果:

    get https://www.python.org/ {}
    get https://www.yahoo.com/ {}
    get https://www.github.com/ {}
    后面省略......

    五、 grequests模块 =  gevent+requests

    安装模块:

    pip3 install grequests

    代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*- 
    #Author: nulige
    
    import grequests
    
    request_list = [
        grequests.get('http://httpbin.org/delay/1', timeout=0.001),
        grequests.get('http://fakedomain/'),
        grequests.get('http://httpbin.org/status/500')
    ]
    
    #执行并获取响应列表
    request_list = grequests.map(request_list)
    print(request_list)

    执行结果:

    [None, None, <Response [500]>]

    六、Twisted (用一个线程,发多个请求)

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from twisted.internet import defer
    from twisted.web.client import getPage
    from twisted.internet import reactor
    
    def one_done(arg):
        print(arg)
    
    def all_done(arg):
        print('done')
        reactor.stop()
    
    @defer.inlineCallbacks
    def task(url):
        res = getPage(bytes(url, encoding='utf8')) # 发送Http请求
        res.addCallback(one_done)
        yield res
    
    url_list = [
        'http://www.cnblogs.com',
        'http://www.cnblogs.com',
        'http://www.cnblogs.com',
        'http://www.cnblogs.com',
    ]
    
    defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)]
    for url in url_list:
        v = task(url)
        defer_list.append(v)
    
    d = defer.DeferredList(defer_list)
    d.addBoth(all_done)
    
    
    reactor.run() # 死循环

    七、Tornado (用一个线程,发多个请求)

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from tornado.httpclient import AsyncHTTPClient
    from tornado.httpclient import HTTPRequest
    from tornado import ioloop
    
    COUNT = 0
    def handle_response(response):
        global COUNT
        COUNT -= 1
        if response.error:
            print("Error:", response.error)
        else:
            print(response.body)
            # 方法同twisted
            # ioloop.IOLoop.current().stop()
        if COUNT == 0:
            ioloop.IOLoop.current().stop()
    
    def func():
        url_list = [
            'http://www.baidu.com',
            'http://www.bing.com',
        ]
        global COUNT
        COUNT = len(url_list)
        for url in url_list:
            print(url)
            http_client = AsyncHTTPClient()
            http_client.fetch(HTTPRequest(url), handle_response)
    
    
    ioloop.IOLoop.current().add_callback(func)
    ioloop.IOLoop.current().start() # 死循环

    执行结果:

    http://www.baidu.com
    http://www.bing.com

    八、自定义异步IO框架

    import socket
    import select
    
    # ########################## HTTP请求本质,阻塞 ##########################
    """
    sk = socket.socket()
    # 1.连接
    sk.connect(('www.baidu.com',80,)) # IO阻塞
    print('连接成功了...')
    
    # 2. 连接成功发送消息
    sk.send(b'GET / HTTP/1.0
    Host:www.baidu.com
    
    ')
    # sk.send(b'POST / HTTP/1.0
    Host:www.baidu.com
    
    k1=v1&k2=v2')
    
    # 3. 等待着服务端响应
    data = sk.recv(8096) # IO阻塞
    print(data)
    
    # 关闭连接
    sk.close()
    """
    # ########################## HTTP请求本质,非阻塞 ##########################
    """
    sk = socket.socket()
    sk.setblocking(False)
    # 1.连接
    try:
        sk.connect(('www.baidu.com',80,)) # IO阻塞
        print('连接成功了...')
    except BlockingIOError as e:
        print(e)
    # 2. 连接成功发送消息
    sk.send(b'GET / HTTP/1.0
    Host:www.baidu.com
    
    ')
    # sk.send(b'POST / HTTP/1.0
    Host:www.baidu.com
    
    k1=v1&k2=v2')
    
    # 3. 等待着服务端响应
    data = sk.recv(8096) # IO阻塞
    print(data)
    
    # 关闭连接
    sk.close()
    """
    
    ############################异步IO非阻塞################################
    IO:就是读和写

    class HttpRequest: def __init__(self,sk,host,callback): self.socket = sk self.host = host self.callback = callback def fileno(self): return self.socket.fileno() class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b' ', 1) self.body = body header_list = headers.split(b' ') for h in header_list: h_str = str(h,encoding='utf-8') v = h_str.split(':',1) if len(v) == 2: self.header_dict[v[0]] = v[1] class AsyncRequest: def __init__(self): self.conn = [] self.connection = [] # 用于检测是否已经连接成功 def add_request(self,host,callback): try: sk = socket.socket() sk.setblocking(0) sk.connect((host,80,)) except BlockingIOError as e: pass request = HttpRequest(sk,host,callback) self.conn.append(request) self.connection.append(request) def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) for w in wlist: print(w.host,'连接成功...') # 只要能循环到,表示socket和服务器端已经连接成功 tpl = "GET / HTTP/1.0 Host:%s " %(w.host,) w.socket.send(bytes(tpl,encoding='utf-8')) self.connection.remove(w) for r in rlist: # r,是HttpRequest recv_data = bytes() while True: try: chunck = r.socket.recv(8096) #最多接收的大小 recv_data += chunck except Exception as e: break response = HttpResponse(recv_data) r.callback(response) r.socket.close() self.conn.remove(r) if len(self.conn) == 0: break def f1(response): print('保存到文件',response.header_dict) def f2(response): print('保存到数据库', response.header_dict) url_list = [ {'host':'www.baidu.com','callback': f1}, {'host':'cn.bing.com','callback': f2}, {'host':'www.cnblogs.com','callback': f2}, ] req = AsyncRequest() for item in url_list: req.add_request(item['host'],item['callback']) req.run() 

    备注:本篇博客用到的模块(python3.x)

    #安装安装地址如下:
    
    pip3 install aiohttp -i http://pypi.douban.com/simple --trusted-host pypi.douban.com 
    
    pip3 install greenlet -i http://pypi.douban.com/simple --trusted-host pypi.douban.com 
     
    pip3 install gevent -i http://pypi.douban.com/simple --trusted-host pypi.douban.com 
    
    pip3 install grequests -i http://pypi.douban.com/simple --trusted-host pypi.douban.com 
    
    pip3 install tornado -i http://pypi.douban.com/simple --trusted-host pypi.douban.com   
  • 相关阅读:
    以前给工大软件学院作得首页
    rinruby
    螃蟹为什么煮熟后会变红?
    关于R中利用apply、tapply、lapply、sapply、mapply、table
    hp laserjet 1020驱动 for windows
    关于睡觉巻起来姿势
    王强英語
    进程的前后台切换
    研究生=烟酒生
    计算矩阵乘法的网页工具
  • 原文地址:https://www.cnblogs.com/nulige/p/6860150.html
Copyright © 2011-2022 走看看