zoukankan      html  css  js  c++  java
  • 饮冰三年-人工智能-Python-38 爬虫之并发

    一、多线程实现并发

    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time
    # 把大象放冰箱一共分几步
    #1:找一个冰箱 导入并创建线程池(ThreadPoolExecutor)
    #2:大象放进去  pool.submit(task, url)
    #3:关门       pool.shutdown(wait=True)
    def task(url):
        # 创建get请求
        response = requests.get(url)
        print(url, response)
    
    # 创建一个线程池
    pool = ThreadPoolExecutor(7)
    url_list = [
        'http://www.cnblogs.com/wupeiqi',
        'http://huaban.com/favorite/beauty/',
        'http://www.bing.com',
        'http://www.zhihu.com',
        'http://www.sina.com',
        'http://www.baidu.com',
        'http://www.autohome.com.cn',
    ]
    
    for url in url_list:
        pool.submit(task, url)
    
    pool.shutdown(wait=True)
    线程并发1.0
    # 并发--线程
    
    from concurrent.futures import ThreadPoolExecutor
    import requests
    import time  
    
    #  放好大象你说一声啊
    #1:找一个冰箱 导入并创建线程池(ThreadPoolExecutor)
    #2:大象放进去  pool.submit(task, url)
    #3:关门       pool.shutdown(wait=True)
    
    def task(url):
        response=requests.get(url)
        return response
    def done(future,*args,**kwargs):
        response= future.result()
        print(response.status_code,response.content)
    
    pool = ThreadPoolExecutor(7)
    url_list = [
        'http://www.cnblogs.com/wupeiqi',
        'http://huaban.com/favorite/beauty/',
        'http://www.bing.com',
        'http://www.zhihu.com',
        'http://www.sina.com',
        'http://www.baidu.com',
        'http://www.autohome.com.cn',
    ]
    for url in url_list:
        v = pool.submit(task,url)
        v.add_done_callback(done)
    
    pool.shutdown(wait=True)
    线程并发2.0

    二、多进程并发和多线程并发代码类似,只是引入的包不同

    from concurrent.futures import ProcessPoolExecutor
    import requests
    import time
    
    def task(url):
        response = requests.get(url)
        print(url,response)
        # 写正则表达式
    
    
    if __name__ == "__main__":
        pool = ProcessPoolExecutor(7)
        url_list = [
            'http://www.cnblogs.com/wupeiqi',
            'http://huaban.com/favorite/beauty/',
            'http://www.bing.com',
            'http://www.zhihu.com',
            'http://www.sina.com',
            'http://www.baidu.com',
            'http://www.autohome.com.cn',
        ]
        for url in url_list:
            pool.submit(task, url)
    
        pool.shutdown(wait=True)
    进程并发1.0
    from concurrent.futures import ProcessPoolExecutor
    import requests
    import time
    
    def task(url):
        response = requests.get(url)
        return response
    
    def done(future,*args,**kwargs):
        response = future.result()
        print(response.status_code,response.content)
    
    if __name__ == "__main__":
        pool = ProcessPoolExecutor(7)
        url_list = [
            'http://www.cnblogs.com/wupeiqi',
            'http://huaban.com/favorite/beauty/',
            'http://www.bing.com',
            'http://www.zhihu.com',
            'http://www.sina.com',
            'http://www.baidu.com',
            'http://www.autohome.com.cn',
        ]
        for url in url_list:
            v=pool.submit(task, url)
            v.add_done_callback(done)
    
        pool.shutdown(wait=True)
    进程并发2.0

     三、协程 + 异步IO

    需要安装一下包 pip install aiohttp、requests、greenlet、gevent、grequests

    import asyncio
    @asyncio.coroutine
    def task():
        print('before...task...')
        yield from asyncio.sleep(5) #仅支持TCP获取结果,大象内部携程并发
        print('end...task...')
    
    tasks=[task(),task()]
    
    loop = asyncio.get_event_loop()     #打开冰箱门
    loop.run_until_complete(asyncio.gather(*tasks)) #大象放进去
    loop.close()  #关门
    示例一、asyncio初识
    import asyncio
    @asyncio.coroutine
    def task(host,url='/'):
        print('before...task...',host,url)
        reader,writer= yield from asyncio.open_connection(host,80) #仅支持TCP获取结果,大象内部携程并发
        request_header_content = "GET %s HTTP/1.0
    Host: %s
    
    " % (url, host,)
        request_header_content = bytes(request_header_content, encoding='utf-8')
    
        writer.write(request_header_content)
        yield from writer.drain()
        text = yield from reader.read()
        print('end', host, url, text)
        writer.close()
    
    tasks=[task('www.cnblogs.com', '/Yk2012/'),
           task('www.baidu.com')]
    
    loop = asyncio.get_event_loop()     #打开冰箱门
    loop.run_until_complete(asyncio.gather(*tasks)) #大象放进去
    loop.close()  #关门
    示例二、自己封装Http数据包
    import aiohttp
    import asyncio
    
    
    @asyncio.coroutine
    def fetch_async(url):
        print(url)
        response = yield from aiohttp.request('GET', url)
        print(url, response)
        response.close()
    
    
    tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')]
    
    event_loop = asyncio.get_event_loop()
    results = event_loop.run_until_complete(asyncio.gather(*tasks))
    event_loop.close()
    示例三、使用aiohttp
    import asyncio
    import requests
    
    @asyncio.coroutine
    def task(func, *args):
        print(func,args)
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, func, *args) # requests.get('http://www.cnblogs.com/wupeiqi/')
        response = yield from future
        print(response.url, response.content)
    
    
    tasks = [
        task(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
        task(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    示例四、aiohttp+request
    import grequests
    
    import grequests
    
    
    request_list = [
        grequests.get('https://baidu.com/'),
    ]
    
    
    # ##### 执行并获取响应列表 #####
    response_list = grequests.map(request_list,size=5)
    print(response_list)
    示例五、grequests

    四、自定义异步IO

    import socket
    import select
    
    sk = socket.socket()
    #1、连接
    sk.connect({'www.baidu.com',80,}) #IO阻塞
    #2、发送消息
    sk.send(b'GET / HTTP/1.0
    Host:www.baidu.com
    
    ')
    #3、等待服务端响应
    data = sk.recv(8096) #IO阻塞
    
    #4、关闭连接
    sk.close()
    http请求本质
    import socket
    import select
    class HttpResponse:
        def __init__(self,recv_data):
            self.recv_data = recv_data
            self.header_dict = {}
            self.body = None
    
            self.initialize()
        def initialize(self):
            headers, body = self.recv_data.split(b'
    
    ', 1)
            self.body = body
            header_list = headers.split(b'
    ')
            for h in header_list:
                h_str = str(h,encoding='utf-8')
                v = h_str.split(':',1)
                if len(v) == 2:
                    self.header_dict[v[0]] = v[1]
    class HttpRequest:
        def __init__(self,socket,host,callback):
            self.socket = socket
            self.host=host
            self.callback = callback #会掉函数
        def fileno(self):
            '''有了fileno方法,那么这个对象也可以select了'''
            return self.socket.fileno()
    class AsyncRequest:
        def __init__(self):
            self.conn = []   # 用于检测消息是否接收完成
            self.connection = [] # 用于检测是否已经连接成功
        def add_request(self,host,callback):
            try:
                sk = socket.socket() #创建sk,
                sk.setblocking(0) #关闭阻塞
                sk.connect((host,80,)) #sk连接
            except BlockingIOError as e:
                pass
            # 创建一个Request对象(sk, host)
            request = HttpRequest(sk,host,callback)
            # 把这个对象放到两个属性中
            self.conn.append(request)
            self.connection.append(request)
        def run(self):
            while True:
                rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
                for w in wlist:
                    print(w.host, '连接成功...')
                     # 只要能循环到,表示socket和服务器端已经连接成功 发送消息
                    tpl = "GET / HTTP/1.0
    Host:%s
    
    "  %(w.host,)
                    w.socket.send(bytes(tpl,encoding='utf-8'))
                    self.connection.remove(w) #链接成功后,从connection中删除
                for r in rlist:
                    recv_data = bytes()
                    #为了防止数据丢失,需要while True一直接收
                    while True:
                        try :
                            chunck = r.socket.recv(8096)
                            recv_data += chunck    # 接收消息
                        except Exception as e:
                            break
                    # 接收完消息,赋给HttpResponse
                    resonse = HttpResponse(recv_data)
                    r.callback(resonse) #接收完响应,调用回调函数
                    r.socket.close() #r接收完响应,关闭请求。
                    self.conn.remove(r) #接收完响应,从conn中移除
    
                # 数据接收完成后,conn为空。结束循环
                if len(self.conn) == 0:
                    break
    
    def f1(response):
        print('保存到文件',response.header_dict)
    
    def f2(response):
        print('保存到数据库', response.header_dict)
    
    url_list = [
        {'host':'www.baidu.com','callback': f1},
        {'host':'cn.bing.com','callback': f2},
        {'host':'www.cnblogs.com','callback': f2},
    ]
    
    req = AsyncRequest()
    for item in url_list:
        req.add_request(item['host'],item['callback'])
    
    req.run()
    自定义异步IO

  • 相关阅读:
    文字无缝滚动效果,鼠标移入时暂停
    Spring中使用@Autowired注解静态实例对象
    服务器环境搭建
    nexus问题
    useUnicode=true&characterEncoding=UTF-8 的作用
    SpringBoot项目启动时自动执行指定方法
    springboot自定义消息转换器HttpMessageConverter
    kubernetes资源类别介绍
    红黑树与平衡二叉树的比较
    Feign Client的超时时间
  • 原文地址:https://www.cnblogs.com/YK2012/p/11881652.html
Copyright © 2011-2022 走看看