zoukankan      html  css  js  c++  java
  • day35

    一、铺垫: 基于socket发送http请求

    1、需求一: 向百度发送请求搜索关键字 “alex”,有以下两种方式:

        import requests
        ret = requests.get('https://www.baidu.com/s?wd=alex')
    方式一:利用requests模块
     1     import socket
     2 
     3     client = socket.socket()
     4     # 和百度创建连接: 阻塞
     5     client.connect(('www.baidu.com', 80))
     6     # 问百度说我要什么?
     7     client.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
     8     # 我等着接收百度给我的回复
     9     chunk_list = []
    10     while 1:
    11         chunk = client.recv(8096)
    12         if not chunk:
    13             break
    14         chunk_list.append(chunk)
    15     
    16     body = b''.join(chunk_list)
    17     print(body.decode('utf-8'))
    方式二:利用socket,实际就是requests的原理

    2、需求二: 向百度发送请求搜索三个关键字:

        import requests
    
        key_list = ['alex', 'ab', 'sb']
        for item in key_list:
            ret = requests.get('https://www.baidu.com/s?wd=%s' %item)
    方式一
        import socket
        
        def get_data(key):
            client = socket.socket()
        
            # 和百度创建连接: 阻塞
            client.connect(('www.baidu.com', 80))
            # 问百度说我要什么?
            client.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
            # 我等着接收百度给我的回复
            chunk_list = []
            while 1:
                chunk = client.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
        
            body = b''.join(chunk_list)
            print(body.decode('utf-8'))
        
        key_list = ['alex', 'ab', 'sb']
        for item in key_list:
            get_data(item)    
    方式二

      分析上述需求二的代码,我们可以发现,两种方式去向浏览器发送请求时都是串行的,也就是等第一个请求发送后并得到响应才继续发送下一个请求,是一个一个执行的,如果连接服务器花了很长时间,或者接收数据花费很长时间,那么下一个请求就需要等待很长时间,无法实现并发。在等待响应的时候,cpu是空闲的,没有被占用的,当请求响应后,cpu才继续工作,这种工作效率是低下的,话费的时间也很长。这时候,我们想到了多线程实现并发来提高效率,节省时间。代码如下:

        import threading
    
        def get_data(key):
            client = socket.socket()
    
            # 和百度创建连接: 阻塞
            client.connect(('www.baidu.com', 80))
            # 问百度说我要什么?
            client.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
            # 我等着接收百度给我的回复
            chunk_list = []
            while 1:
                chunk = client.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
    
            body = b''.join(chunk_list)
            print(body.decode('utf-8'))
    
        key_list = ['alex', 'db', 'sb']
        for item in key_list:
            t = threading.Thread(target=get_data, args=(item,))
            t.start()

      多线程虽然实现了并发,提高了效率,节省了时间,但是也浪费了资源,那么我们能不能用单线程来实现并发呢?也就是在一个线程中发送完第一个请求(IO请求)后不等待响应结果,而是直接去发送第二个请求,也不等待响应结果,再继续去发送第三个,等请求响应后才去处理响应结果,这样就实现了单线程并发,即节省了资源又实现了并发,那具体怎么实现呢?

    首先需要解决两个问题:第一:如何判断是IO请求?第二:如何知道响应数据回来了?

    二、基于IO多路复用+socket实现单线程并发

      ------------------------------ 1,解决并发:单线程+IO不等待 ---------------------------------
        import socket
        import select
        
        client1 = socket.socket()
        client1.setblocking(False)  # 百度创建连接: 非阻塞
        try:
            client1.connect(('www.baidu.com',80))
        except BlockingIOError as e:
            pass
        
        client2 = socket.socket()
        client2.setblocking(False)  # 百度创建连接: 非阻塞
        try:
            client2.connect(('www.sogou.com',80))
        except BlockingIOError as e:
            pass
        
        client3 = socket.socket()
        client3.setblocking(False)  # 百度创建连接: 非阻塞
        try:
            client3.connect(('www.oldboyedu.com',80))
        except BlockingIOError as e:
            pass
        
        socket_list = [client1, client2, client3]
        conn_list = [client1, client2, client3]
        
        while True:
            rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)  # 最多花0.005S来检测变化
            # rlist中表示已经接收到数据的socket对象
            # wlist中表示已经连接成功的socket对象
            for sk in wlist:
                if sk == client1:
                    sk.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
                elif sk == client2:
                    sk.sendall(b'GET /web?query=fdf HTTP/1.0
    host:www.sogou.com
    
    ')
                else:
                    sk.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.oldboyedu.com
    
    ')
                conn_list.remove(sk)
            for sk in rlist:
                chunk_list = []
                while True:
                    try:
                        chunk = sk.recv(8096)
                        if not chunk:
                            break
                        chunk_list.append(chunk)
                    except BlockingIOError as e:
                        break
                body = b''.join(chunk_list)
                # print(body.decode('utf-8'))
                print('------------>', body)
                sk.close()
                socket_list.remove(sk)
            if not socket_list:
                break
      ---------------------------- 2,select监听socket的实质 ----------------------------
      多路复用中select.select(socket_list,conn_list,[],0.005),selec监听的是socket对象吗?实际上不是,select监听的 socket_list/conn_list 内部会调用列表中每一个值的fileno方法,获取该返回值(类似于一个身份ID)并去系统中检测。
        import socket
        import select
    
        client1 = socket.socket()
        client1.setblocking(False)  # 百度创建连接: 非阻塞
        try:
            client1.connect(('www.baidu.com',80))
        except BlockingIOError as e:
            pass
        
        client2 = socket.socket()
        client2.setblocking(False)  # 百度创建连接: 非阻塞
        try:
            client2.connect(('www.sogou.com',80))
        except BlockingIOError as e:
            pass
        
        client3 = socket.socket()
        client3.setblocking(False)  # 百度创建连接: 非阻塞
        try:
            client3.connect(('www.oldboyedu.com',80))
        except BlockingIOError as e:
            pass
        
        class Foo(object):
            def __init__(self, sk):
                self.sk = sk
            def fileno(self):
                return self.sk.fileno()
        
        """
        1. select.select(socket_list,conn_list,[],0.005)
            select监听的 socket_list/conn_list 内部会调用列表中每一个值的fileno方法,获取该返回值并去系统中检测。
        
        2. 方式一:
            select.select([client1,client2,client3],[client1,client2,client3],[],0.005)
        3. 方式二:
                select.select([Foo(client1),Foo(client2),Foo(client3)],Foo(client1),Foo(client2),(client3),[],0.005)
        """
        
        socket_list = [Foo(client1),Foo(client2),Foo(client3)]  # client1.fileno
        conn_list = [client1, client2, client3]
        
        while True:
            rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)  # 最多花0.005S来检测变化
            # wlist中表示已经连接成功的socket对象
            for sk in wlist:
                if sk == client1:
                    sk.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
                elif sk == client2:
                    sk.sendall(b'GET /web?query=fdf HTTP/1.0
    host:www.sogou.com
    
    ')
                else:
                    sk.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.oldboyedu.com
    
    ')
                conn_list.remove(sk)
            for sk in rlist:
                chunk_list = []
                while True:
                    try:
                        chunk = sk.recv(8096)
                        if not chunk:
                            break
                        chunk_list.append(chunk)
                    except BlockingIOError as e:
                        break
                body = b''.join(chunk_list)
                # print(body.decode('utf-8'))
                print('------------>', body)
                sk.close()
                socket_list.remove(sk)
            if not socket_list:
                break

      如果你不懂为什么要把socke对象和fileno方法封装到类Foo中,那么看下面简单的例子:

        # 代码一:
        v = [
            [11, 22],  # 每一个都有一个append方法
            [22, 33],  # 每一个都有一个append方法
            [33, 44],  # 每一个都有一个append方法
        ]
        
        for item in v:
            print(item.append)
        # 代码二:(为了不改变for循环代码,可以进行如下封装)
        class Foo(object):
            def __init__(self, data):
                self.row = data
    
            def append(self, item):
                self.row.append(item)
    
        v = [
            Foo([11, 22]),  # 每一个都有一个append方法
            Foo([22, 33]),  # 每一个都有一个append方法
            Foo([33, 44]),  # 每一个都有一个append方法
        ]
    
        for item in v:
            print(item.append)
      -------------------- 3,单线程并发高级版:封装(基于事件循环实现的异步非阻塞框架) ----------------------
        import socket
        import select
        
        class Req(object):
            def __init__(self, sk, func):
                self.sock = sk
                self.func = func
        
            def fileno(self):
                return self.sock.fileno()
        
        
        class Nb(object):
        
            def __init__(self):
                self.conn_list = []
                self.socket_list = []
        
            def add(self, url, func):
                client = socket.socket()
                client.setblocking(False)  # 非阻塞
        
                try:
                    client.connect((url, 80))
                except BlockingIOError as e:
                    pass
                obj = Req(client, func)
                self.conn_list.append(obj)
                self.socket_list.append(obj)
        
            def run(self):
                while True:
                    rlist, wlist, elist = select.select(self.socket_list, self.conn_list, [], 0.005)  # 最多花0.005S来检测变化
                    # wlist中表示已经连接成功的req对象
                    for sk in wlist:
                        # 发送变化的req对象
                        sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0
    host:www.baidu.com
    
    ')
                        self.conn_list.remove(sk)
        
                    for sk in rlist:
                        chunk_list = []
                        while True:
                            try:
                                chunk = sk.sock.recv(8096)
                                if not chunk:
                                    break
                                chunk_list.append(chunk)
                            except BlockingIOError as e:
                                break
                        body = b''.join(chunk_list)
                        # print(body.decode('utf-8'))
                        sk.func(body)
                        sk.sock.close()
                        self.socket_list.remove(sk)
                    if not self.socket_list:
                        break
        
        
        def baidu_repsonse(body):
            print('百度下载结果:', body)
        
        def sogou_repsonse(body):
            print('搜狗下载结果:', body)
        
        def oldboyedu_repsonse(body):
            print('老男孩下载结果:', body)
        
        
        t1 = Nb()
        t1.add('www.baidu.com', baidu_repsonse)
        t1.add('www.sogou.com', sogou_repsonse)
        t1.add('www.oldboyedu.com', oldboyedu_repsonse)
        t1.run()
        1. IO多路复用
            IO多路复用作用(select模块):检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可写/可读)
        
        2. 基于IO多路复用+socket实现并发请求(一个线程100个请求)
            IO多路复用 
            socket非阻塞
            
            基于事件循环实现的异步非阻塞框架: ykq
                非阻塞: 不等待
                  异步: 执行完某个任务后自动调用我给他的函数.
            
            Python中开源 基于事件循环实现的异步非阻塞框架 Twisted

    总结:
      1. socket默认是否是阻塞的? 阻塞体现在哪里?
        默认是阻塞的,体现在等待连接和等待接收数据。

      2. 如何让socket变成非阻塞?
        通过设置client.setblocking(False)

      3. IO多路复用的作用?
        检测多个socket是否发生变化.

        操作系统检测socket是否发生变化, 有三种模式:
          select:最多监听1024个;循环去检测。
          poll:不限制监听socket个数;循环取检测(水平触发)。
          epoll:不限制监听个数;回调方式(边缘触发)。

        Python模块:
          select.select
          select.epoll(windows不支持)

      4. 提高并发方案:
        - 多进程
        - 多线程
        - 异步非阻塞模块(Twisted) scrapy框架(单线程完成并发)

      5. 什么是异步非阻塞?
        - 非阻塞,不等待。
          比如创建socket对某个地址进行connect、获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作。
          如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可
        - 异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知)。
          比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自执行回调函数。

      6. 什么是同步阻塞?

        - 阻塞:等
        - 同步:按照顺序逐步执行,如下代码:

        key_list = ['alex','db','sb']
    
        for item in key_list:
        ret = requests.get('https://www.baidu.com/s?wd=%s' %item)
        print(ret.text)

    三、协程

    1、概念

      进程和线程都是操作系统中存在的,而协程是由程序员创造出来的一个不是真实存在的东西。

      协程: 是微线程, 对线程进行分片, 使得线程在代码块之间进行来回切换执行, 而不是原来的逐行执行.

      使用协程需要导入greenlet模块,才能使用协程.

        import greenlet
        
        def f1():
            print(11)
            gr2.switch()
            print(22)
            gr2.switch()
        
        def f2():
            print(33)
            gr1.switch()
            print(44)
        
        
        # 协程 gr1
        gr1 = greenlet.greenlet(f1)
        # 协程 gr2
        gr2 = greenlet.greenlet(f2)
        
        gr1.switch()
    协程示例

    2、协程有什么用?

      单纯的协程无意义,甚至会让性能降低,

      但是 协程 + 遇到IO就切换 就牛逼起来了

      假设我们执行一段代码遇到了IO操作,此时不需要等待,我们可以利用协程切换到另一段代码,然后遇到IO操作的时候再去切换,这样是不是也能提高性能,实现并发,但是greenlet只能做协程,不能实现遇到IO就切换,所以协程如果再加上遇到IO就切换,那么便能实现单线程并发了。

      那么谁能同时实现这两个要求呢? 这时候就用到了gevent模块,gevent模块内部也依赖协程,实现 greenlet+IO切换,所以gevent就很牛逼,写法如下:

        from gevent import monkey
        monkey.patch_all()  # 以后代码中遇到IO都会自动执行greenlet的switch进行切换
        import requests
        import gevent
        
        def get_page1(url):
            ret = requests.get(url)
            print(url, ret.content)
        
        def get_page2(url):
            ret = requests.get(url)
            print(url, ret.content)
        
        def get_page3(url):
            ret = requests.get(url)
            print(url, ret.content)
        
        gevent.joinall([
            gevent.spawn(get_page1, 'https://www.python.org/'),  # 协程1
            gevent.spawn(get_page2, 'https://www.yahoo.com/'),   # 协程2
            gevent.spawn(get_page3, 'https://github.com/'),      # 协程3
        ])

    上面通过gevent模块的 协程+IO切换 实现了单线程并发,提高了效率。对比一下两种实现单线程并发的方法:

      第一种是 IO多路复用+不阻塞,通过程序不停的执行(IO不阻塞),来不断的发送请求,利用IO多路复用来监听,实现了单线程并发。

      第二种是利用gevent模块的 协程+遇到IO就切换 , 在代码段中不断切换, 实现了单线程并发.

    总结:
      1. 什么是协程?
        协程也可以成为"微线程", 就是开发者控制线程执行流程, 控制先执行某段代码然后再切换到另外函数执行代码...来回进行切换

      2. 协程可以提高并发吗?
        协程自己本身无法实现并发(甚至性能会降低).
        协程+IO切换性能提高

      3. 进程、线程、协程的区别? *****
        进程是cpu资源分配的最小单元,主要用来做数据隔离;线程是cpu工作的最小单元。一个应用程序可以有多个进程(默认有一个), 一个进程可以有多个线程(默认有一个),这是它们的一个简单区别

        但是他们的应用场景在其他语言中基本没有进程这个概念,大都用线程,而在Python中有GIL锁,它保证了一个进程中同一时刻只能用一个现场被cpu调度,为了利用多核优势就要使用多进程,多线程没有用,所以计算密集型用多进程,IO密集型用多线程,因为IO操作并不占用cpu。

        而协程是程序员人为创造出来的不真实存在的, 它可以让程序员控制代码执行顺序,在函数之间来回切换,本身协程存在没有意义,但是能跟IO切换放在一起就厉害了,相当于将线程切片,程序遇到IO就切换到其他代码,IO完成后再切回来,达到让线程不停去工作的效果,实现协程的模块是greenlet,实现协程+IO切换的模块是gevent,这就是三者的区别。

      4. 单线程提高并发:
        - 协程+IO切换: gevent  注意:不是异步,无回调函数,但本质也是基于事件循环
        - 基于事件循环的异步非阻塞框架: Twisted

    补充: 手动实现协程:利用yield生成器(没有意义,了解即可)

        def f1():
            print(11)
            yield
            print(22)
            yield
            print(33)
        
        def f2():
            print(55)
            yield
            print(66)
            yield
            print(77)
        
        v1 = f1()
        v2 = f2()
        
        next(v1)  # v.send(None)
        next(v2)  # v.send(None)
        next(v1)  # v.send(None)
        next(v2)  # v.send(None)
        next(v1)  # v.send(None)
        next(v2)  # v.send(None)
    基于yield实现协程(1)
        def f1():
            print(11)
            x1 = yield 1
            print(x1,22)
            x2 = yield 2
            print(33)
        
        def f2():
            print(55)
            yield
            print(66)
            yield
            print(77)
        
        v1 = f1()
        v2 = f2()
        
        ret = v1.send(None)
        print(ret)
        r2 = v1.send(999)
        print(r2)
    基于yield实现协程(2)
  • 相关阅读:
    SpringBoot实现原理
    常见Http状态码大全
    forward(转发)和redirect(重定向)有什么区别
    1094. Car Pooling (M)
    0980. Unique Paths III (H)
    1291. Sequential Digits (M)
    0121. Best Time to Buy and Sell Stock (E)
    1041. Robot Bounded In Circle (M)
    0421. Maximum XOR of Two Numbers in an Array (M)
    0216. Combination Sum III (M)
  • 原文地址:https://www.cnblogs.com/kangqi452/p/11858790.html
Copyright © 2011-2022 走看看