zoukankan      html  css  js  c++  java
  • Python6 线程与进程、网络编程、生成器与迭代器、协程、异步IO

    Python6 ---- 线程与进程、网络编程、生成器与迭代器、协程、异步IO

    线程与进程、网络编程、生成器与迭代器、协程、异步IO

    requests模块的介绍

    • requests的作用
      通过python来模拟请求网址

    • 一个模拟请求由以下四个部分组成

      • url
      • method
      • body
      • headers
    • 模拟请求百度

       当前python环境下执行以下语句安装第三方库,在Terminal安装requests库在
       pip install requests
      import requests
      
      def request_baidu():
          url = "https://www.baidu.com/"
          # body = ""
          headers = {
              "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
          }
          response = requests.get(url=url, headers=headers)
          print(response.text)

    理解多线程和多进程

    • 什么是进程?什么是线程?

      • 进程: 可以简单地认为是一个程序. 进程是操作系统分配资源的最小单位.
      • 线程: 一个进程可以有多个线程, 每个线程可以独立完成一些任务. 线程是操作系统进行运算调度的最小单位.
    • 多线程demo

      from threading import Thread    
      for i in range(10):
          # 只是创建了线程对象,target指定目标
          t = Thread(target=request_baidu)
          # 启动线程
          t.start()
    • 多进程demo

      from multiprocessing import Process   
      for i in range(10):
          # 只是创建了进程对象
          p = Process(target=request_baidu)
          # 启动进程
          p.start()
    • 多线程

      • 等待任务完成后回到主进程
        通过调用Thread对象的join方法

        # 保存当前thread对象
        thread_array = []
        for i in range(10):
            t = Thread(target=request_baidu, args=(i, ))
            thread_array.append(t)
            t.start()
        # 调用thread对象join接口, 等待任务完成后回到主进程
        for t in thread_array:
            t.join()
        print("done!")
      • 如何拿到返回结果

        • 赋值到全局变量当中, 添加到可变对象之中
          result = []
          def request_baidu(index):
          	...
              result.append(response)
              
          if __name__ == "__main__":
              thread_array = []
              for i in range(10):
                  t = Thread(target=request_baidu, args=(i, ))
                  thread_array.append(t)
                  t.start()
              for t in thread_array:
                  t.join()
              print("done!")
              print(result)
    • 多进程

      • 等待任务完成后回到主进程
        通过调用Process对象的join方法
      • 如何拿到返回结果
        无法通过全局变量存储返回结果,线程与进程返回的结果是不一样的
        多进程相当于启动了多个程序, 共同执行了同一份代码, 他们之间的内存地址完全不一样
        import requests
        import time
        from threading import Thread
        from multiprocessing import Process
        
        result = []
        print(f"主进程result内存地址: {id(result)}")
        
        def request_baidu(index):
            time.sleep(2)
            url = "https://www.baidu.com/"
            # body = ""
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
            }
            response = requests.get(url=url, headers=headers)
            print(f"当前请求序号: {index}, 返回结果状态码: {response.status_code}")
            print(f"子进程result内存地址: {id(result)}")
            result.append(response)
        
        # 如果没有判断入口代码段if __name__ == "__main__", 多进程程序会报错
        # 原因是windows和pycharm的进程阻塞带来的问题
        if __name__ == "__main__":
            process_array = []
            for i in range(10):
                p = Process(target=request_baidu, args=(i, ))
                process_array.append(p)
                p.start()
            for p in process_array:
                p.join()
            print("done!")
            print(result)
    • 多进程和多线程的异同点

      • 相同点
        • 都是对cpu工作时间段的描述, 只是颗粒度不同.
          简单地说就是多进程和多线程都会调用cpu资源的, 但是进程可以启动多个线程去执行.
        • linux内核态不区分进程和线程
      • 不同点
        • 进程有自己的独立地址空间, 建立数据表来维护代码段, 堆栈段和数据段, 而线程共享进程中的资源, 使用相同的地址空间, 所以线程间的切换快得多.
        • 因为线程共享进程的全局变量, 静态变量等对象, 线程间的通信更为方便, 而进程间的通信更加复杂, 需要以ipc的方式进行.
        • 多进程要比多线程要健壮. 进程之间一般不会相互影响, 而多线程有一条线程崩溃, 会导致整个进程跟着发生崩溃或者无法正常退出等.

    全局解释器锁(GIL)

    • 计算密集型
      主要占用cpu资源
    • IO密集型
      IO就是input output, 需要等待的一些任务
      • 网络请求会有网络延迟
      • 和数据库交互需要等待数据库查询事件
      • 读写硬盘
    • 多进程在处理计算密集型程序的时候比多线程块
      由于全局解释器锁的存在, 一个进程下, 只允许一个线程执行Python程序的字节码(当前代码文件的二进制表示).
      简单地说, 创建的10个线程其实在争夺一个cpu资源. 但是遇到io操作会让渡cpu资源.
    • 如何绕过GIL?
      • 将多线程方法改为多进程
      • 将计算密集型任务转移给C扩展
      • 分布式计算引擎spark, Apache
      • 使用PyPy解释器, 工业上几乎没人这么用, 因为PyPy并不成熟.

    课后作业

    • 利用Python实现一个多线程程序
    • 将多线程程序改为多进程程序
    import requests
    import time
    from threading import Thread
    from multiprocessing import Process
    
    
    def requests_baidu(index):
        time.sleep(1)
        url = "https://www.baidu.com"
        # body = ""
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"
        }
        response = requests.get(url=url, headers=headers)
        # print(f"请求序号:{index}, 返回状态码:{response.status_code}")
    
    
    def process():
        start_time = time.time()
        thread_array = []
        for i in range(10):
            p = Thread(target=requests_baidu, args=(i,))
            thread_array.append(p)
            p.start()
        for p in thread_array:
            p.join()
        end_time = time.time()
        print(f"process thread execute time: {end_time - start_time} s")
    
    
    def thread():
        start_time = time.time()
        thread_array = []
        for i in range(10):
            t = Thread(target=requests_baidu, args=(i,))
            thread_array.append(t)
            t.start()
        for t in thread_array:
            t.join()
        end_time = time.time()
        print(f"thread execute time: {end_time - start_time} s")
    
    
    if __name__ == "__main__":
        thread()
        process()

    进程间通信(IPC)

    • 文件
      通过读写文件来进行变量, 数据, 信息的传递

      • 读写冲突
        两个进程同时进行写, 或者一个写一个读, 造成了冲突.
      • 解决读写冲突
        • 互斥锁
          from multiprocessing import Process, Lock
          
          def save_to_file(index, lock):
              with lock:
                  with open("test.log", "a", encoding="utf-8") as f:
                      f.write(str(index) + "\n")
          
          if __name__ == "__main__":
              process_array = []
              lock = Lock()
              for i in range(10):
                  p = Process(target=save_to_file, args=(i, lock))
                  process_array.append(p)
                  p.start()
              for p in process_array:
                  p.join()
              print("done!")
    • 套接字(socket-插座)
      通过一个协议, 连接两个进程. 主要就是网络请求.

      进程A向百度云上传文件, 进程B向百度云下载文件, 不会有冲突.
      socket.png

    • 管道(了解)
      用文件的内存缓冲区作为管道, 实现进程间通信

      • 匿名管道
        主进程和子进程进行交互
      • 具名管道
        和匿名管道原理是一样的, 不是不相关的进程也可以互相访问
        ipc_pipeline.png
    • 消息队列
      就是一个存在内核内存空间中的列表

      redis就是消息队列+socket

      from multiprocessing import Process, Queue
      
      def save_to_queue(index, my_queue):
          my_queue.put(index)
      
      
      if __name__ == "__main__":
          process_array = []
          my_queue = Queue()
          for i in range(10):
              p = Process(target=save_to_queue, args=(i, my_queue))
              process_array.append(p)
              p.start()
          for p in process_array:
              p.join()
      
          while True:
              print(my_queue.get())
    • 共享内存(了解)
      进程访问内核态同一块内存

      from multiprocessing import Queue, Array, Value
    • 信号量(了解)
      不是用来传递数据的, 是用来传递消息

      进程B要等到进程A执行到某一步操作后, 才会启动
      进程A->发消息->内核->转发信息->进程B

    线程间通信

    线程间通信强调的是线程之间传递对象引用

    • 共享变量
      • 线程安全
        线程有GIL锁, 但是拿到GIL锁不代表可以一直执行下去
        现代计算机多线程也是A执行一会儿, B执行一会儿这样交替执行

        import requests
        import time
        from threading import Thread
        
        zero = 0
        
        def foo():
            global zero
            for i in range(10**7):
                zero += 1
                zero -= 1
        
        if __name__ == "__main__":
            process_array = []
            for i in range(2):
                p = Thread(target=foo)
                process_array.append(p)
                p.start()
            for p in process_array:
                p.join()
        
            print(zero)
      • 解决线程安全
        将重要指令包装成原子操作(不可分割的).

        • 加互斥锁
        import requests
        import time
        from threading import Thread,Lock
        
        zero = 0
        lock = Lock()
        
        def foo():
            global zero
            for i in range(10**6):
                with lock:
                    zero += 1
                    zero -= 1
        
        if __name__ == "__main__":
            process_array = []
            for i in range(2):
                p = Thread(target=foo)
                process_array.append(p)
                p.start()
            for p in process_array:
                p.join()
        
            print(zero)

    课后作业

    • 多进程锁, 多线程锁都要自己实现一遍
    • 多进程通过Queue来实现进程通信
    • 把上述概念熟记并理解

    迭代器和生成器

    • 迭代器
      概念上: 迭代器可以用来表示一个数据流, 提供了数据的惰性返回功能(只有我们主动去使用next方法调用, 才会返回值)
      实现上: 实现了__next__接口的对象

      传统声明一个列表, 里面的元素会立即写进内存当中, 占用大量内存.

      迭代器可以一次只返回一个元素, 占用内存非常小, 在读取大文件和大的数据集合的时候特别有用

      • 通过iter方法返回一个迭代器对象

        # 两者实现的功能是一摸一样的
        l = list(range(10**7))
        l2 = iter(range(10**7))
      • 通过next方法主动获取迭代器中的值

        # 当迭代器中没有值了以后, 会抛出StopIteration的异常, 需要大家自行处理一下
        l = iter(range(5))
        print(next(l))
        print(next(l))
        print(next(l))
        print(next(l))
        print(next(l))
        print(next(l))
    • 生成器
      生成器是一种特殊的迭代器, 在迭代器惰性返回数据的基础上, 提供了额外的功能, 实现了程序的暂停.

      • 声明一个生成器
        只要函数体中有yield关键词, 它就是一个生成器

        yield翻译为让渡, 我们可以简单理解为暂停并返回右边的值

        def my_range_gen(n):
            for i in range(n):
                yield i*i
                print(f"current index: {i}")
        
        my_range = my_range_gen(10)
        print(my_range)
        print(next(my_range))
        print(next(my_range))
        print(next(my_range))
        print(next(my_range))
    • 生成器和迭代器的区别?
      同样提供了惰性返回的功能, 迭代器侧重于提供数据的惰性返回功能, 生成器侧重于指令的惰性返回功能

    协程

    • 协程的原理
      协程的实现原理就是生成器的实现原理, 在生成器的基础上又提供了传递值的功能.
      • 通过send方法向生成器传递值, 以下例子中, b就是通过send方法赋值为2
        对生成器进行send操作一定要调用next方法预激, 使其停留在第一个yield位置

        def simple_coro(a):
            print("初始值 a=", a)
            b = yield a
            print("传递值 b=", b)
            c = yield a + b
            print("传递值 c=", c)
        
        coro = simple_coro(1)
        print(next(coro))
        print(coro.send(2))
        print(coro.send(3))

      • 用协程实现计算平均数的函数

        def coro_avg():
            total = 0
            length = 0
            while True:
                try:
                    value = yield total/length
                except ZeroDivisionError:
                    value = yield 0
                total += value
                length += 1
        
        my_avg = coro_avg()
        print(next(my_avg))
        print(my_avg.send(2))
        print(my_avg.send(3))
      • yieldyield from

        yield from实现的协程异步程序晦涩难懂, 在python3.4引用asyncio标准库之后被弃用
        yield from 用来驱动子程序中的循环并返回最终值

        def return_triple():
            while True:
                value = yield
                if value % 3 == 0:
                    return value
        
        
        def triple_recorder():
            while True:
                result = yield from return_triple()
                triple_array.append(result)
        
        triple_array = []
        coro = triple_recorder()
        next(coro)
        for i in range(100):
            coro.send(i)
        print(triple_array)

    异步I/O

    • asyncio(异步)

      Python3.4引入的标准库, 替换yield from实现协程异步IO, 可以更好地实现异步程序
      实现原理: 自动维护了一个事件队列, 然后循环访问事件来完成异步的消息维护.

      import asyncio
      import time
      
      class Response:
          staus_code = 200
      
      async def sim_request(index):
          print(f"模拟发送请求 Index: {index}")
          response = Response()
          # 模拟网络延迟
          # 当前是单线程运行的, 如果调用的是time.sleep(1), 那么这个线程会被阻塞
          # 当前线程被阻塞之后, 不会让渡cpu资源, 异步的效率就不会体现
          await asyncio.sleep(1)
          print(f"request index {index}, response status_code: {response.staus_code}")
          return response.staus_code
      
      # 获取消息队列
      loop = asyncio.get_event_loop()
      
      # 包装任务
      task_array = []
      for i in range(100):
          task_array.append(sim_request(i))
      
      # 循环访问事件来完成异步的消息维护
      loop.run_until_complete(asyncio.wait(task_array))
      
      # 关闭事件循环
      loop.close()
      • 当前异步实际上有没有提高效率, 也关乎到你调用的第三方是不是异步的.

        这也是当前python异步的一个痛点, 就是丰富的第三方库不是都支持asyncio的.

      • 小技巧: 获取异步完成之后的所有返回值
        result = loop.run_until_complete(asyncio.gather(*task_array))
        print(result)

    课后作业

    • 什么是迭代器?什么是生成器?两者有什么区别?
    • 协程的实现原理.
    • asyncio实现原理
    • 用协程实现一个计算平均数的函数
    def coro_avg():
        total = 0
        length = 0
        while True:
            try:
                value = yield total / length
            except ZeroDivisionError:
                value=yield 0
            total += value
            length += 1
    
    
    my_avg = coro_avg()
    print(next(my_avg))
    print(my_avg.send(2))
    • 编写一个asyncio异步程序
      ``` bash
      import asyncio
      import time

        # asyncio异步程序
        async def do_something(index):
        	print(f"第{index}个任务启动")
        	await asyncio.sleep(1)
        	print(f"第{index}个任务完成")
      
        # python 3.10才有问题,使用下面两行代码
        # https://blog.csdn.net/u013421629/article/details/100163014
        # loop = asyncio.get_event_loop()
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        task_array = []
      
      
        for i in range(10):
        	task_array.append(do_something(i))
      
        loop.run_until_complete(asyncio.wait(task_array))
      
        loop.close()
    • 扩展: 了解aiohttp异步请求网址

  • 相关阅读:
    BiLiBiLi爬虫
    12-UE4-控件类型
    11-UE4-UMG UI设计器
    10-UE4-蓝图定义简介
    UE4-目录结构简介
    UE4-字符串
    UE4-基类
    Redis-事物
    Redis的主从配置
    Redis持久化-AOF
  • 原文地址:https://www.cnblogs.com/final233/p/15751897.html
Copyright © 2011-2022 走看看