zoukankan      html  css  js  c++  java
  • python进程、线程、协程、IO多路复用

     线程进程介绍  

    1. 工作最小单元是线程
    2. 应用程序 -> 至少有一个进程 -> 至少有一个线程
    3. 应用场景:

          IO密集型:线程

          计算密集型:进程

      4. GIL,全局解释器锁。

          保证同一个进程中只有一个线程同时被调度

    线程

      1. 基本使用

    def task(arg):
        time.sleep(arg)
        print(arg)
    
    for i in range(5):
        t = threading.Thread(target=task,args=[i,])
        # t.setDaemon(True) # 主线程终止,不等待子线程
        # t.setDaemon(False)
        t.start()
        # t.join() # 一直等
        # t.join(1) # 等待最大时间

      2. 锁

    import threading
    import time
    v = 10
    # 1. 只能有一个人使用锁
    lock = threading.Lock() # 只能开一把
    lock = threading.RLock()# 可以开多把
    
    def task(arg):
        time.sleep(2)
        # 申请使用锁,其他人等
        lock.acquire()
        lock.acquire()
        global v
        v -= 1
        print(v)
        # 释放
        lock.release()
        lock.release()
    
    for i in range(10):
        t = threading.Thread(target=task,args=(i,))
        t.start()

      1. 只能有一个人使用锁
        # lock = threading.Lock() # 只能开一把
        # lock = threading.RLock()# 可以开多把
      2. 多个人同时使用锁
        # lock = threading.BoundedSemaphore(3)
      3. 所有的解脱锁的限制
        #lock = threading.Event()
      4. 肆意妄为
        #lock = threading.Condition()

    线程池

      在什么情况下使用线程池? 
     
        1.单个任务处理的时间比较短 
        2.将需处理的任务的数量大 
     
      使用线程池的好处: 
     
        1.减少在创建和销毁线程上所花的时间以及系统资源的开销 
        2.如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及”过度切换”。
     

      模式一:直接处理

    def task(url):
        """
        任务执行两个操作:下载;保存本地
        """
        # response中封装了Http请求响应的所有数据
        # - response.url            请求的URL
        # - response.status_code    响应状态码
        # - response.text           响应内容(字符串格式)
        # - response.content        响应内容(字节格式)
        # 下载
        response = requests.get(url)
        
        # 下载内容保存至本地
        f = open('a.log','wb')
        f.write(response.content)
        f.close()
    
    pool = ThreadPoolExecutor(2)
    url_list = [
        'http://www.oldboyedu.com',
        'http://www.autohome.com.cn',
        'http://www.baidu.com',
    ]
    for url in url_list:
        print('开始请求',url)
        # 去连接池中获取链接
        pool.submit(task,url)

      模式二:分步处理

    def save(future):
        """
        只做保存  # future中包含response
        """
        response = future.result()
        
        # 下载内容保存至本地
        f = open('a.log','wb')
        f.write(response.content)
        f.close()
    
    def task(url):
        """
        只做下载 requests
        """
        # response中封装了Http请求响应的所有数据
        # - response.url            请求的URL
        # - response.status_code    响应状态码
        # - response.text           响应内容(字符串格式)
        # - response.content        响应内容(字节格式)
        # 下载
        response = requests.get(url)
        return response
    
    pool = ThreadPoolExecutor(2)
    url_list = [
        'http://www.oldboyedu.com',
        'http://www.autohome.com.cn',
        'http://www.baidu.com',
    ]
    for url in url_list:
        print('开始请求',url)
        # 去连接池中获取链接
        # future中包含response
        future = pool.submit(task,url)
        # 下载成功后,自动调用save方法
        future.add_done_callback(save)

    进程   

      1. 基本使用

    from multiprocessing import Process
    import time
    def task(arg):
        time.sleep(arg)
        print(arg)
    
    if __name__ == '__main__':
        for i in range(10):
        p = Process(target=task,args=(i,))
        p.daemon = True
        # p.daemon = False
        p.start()
        # p.join(1)
    
    print('主进程最后...')


      2. 进程之间的数据共享
          特殊的东西
        - Array(‘类型’,长度)
        - Manager().list() / Manager().dict()

      3. 进程池

        跟线程池一样

    协程

      线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
      协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
      协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;
    # 协程
    # from greenlet import greenlet
    #
    # def test1():
    #     print(12)
    #     gr2.switch()
    #     print(34)
    #     gr2.switch()
    #
    # def test2():
    #     print(56)
    #     gr1.switch()
    #     print(78)
    #
    # gr1 = greenlet(test1)
    # gr2 = greenlet(test2)
    # gr1.switch()
    
    # 根据协程二次开发:协程+IO
    from gevent import monkey; monkey.patch_all()
    import gevent
    import requests
    
    def f(url):
        response = requests.get(url)
        print(response.url,response.status_code)
    
    gevent.joinall([
            gevent.spawn(f, 'http://www.oldboyedu.com/'),
            gevent.spawn(f, 'http://www.baidu.com/'),
            gevent.spawn(f, 'http://github.com/'),
    ])

    IO多路复用

    首先什么是I/O:

    I/O(input/output),即输入/输出端口。每个设备都会有一个专用的I/O地址,用来处理自己的输入输出信息

    I/O分为磁盘io和网络io,这里说的是网络io

    IO多路复用:

    I/O多路复用指:通过一种机制,可以监视多个描述符(socket),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

    Linux

    Linux中的 select,poll,epoll 都是IO多路复用的机制。

    Linux下网络I/O使用socket套接字来通信,普通I/O模型只能监听一个socket,而I/O多路复用可同时监听多个socket.

    I/O多路复用避免阻塞在io上,原本为多进程或多线程来接收多个连接的消息变为单进程或单线程保存多个socket的状态后轮询处理.

    Python  

    Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

    下面利用IO多路复用的select模块模拟web框架

    import socket
    import select
    
    
    class Foo(object):
        def __init__(self, sock, callback, url, host):
            self.sock = sock
            self.callback = callback
            self.url = url
            self.host = host
    
    
    class NbIO(object):
        def __init__(self):
            self.fds = []
            self.connections = []
    
        def connect(self, url_list):
            for item in url_list:
                conn = socket.socket()
                conn.setblocking(False)
                # 1. 发送链接请求
                try:
                    conn.connect((item['host'],80))
                except BlockingIOError as e:
                    pass
                obj = Foo(conn, item['callback'], item['url'], item['host'])
                self.fds.append(obj)
                self.connections.append(obj)
    
        def send(self):
            while True:
                # wList,有对象;当前socket已经创建链接
                try:
                    if len(self.fds) == 0:
                        return
                    rList, wList, eList = select.select(self.fds, self.connections, [], 0.5)    # 这里只存有变化的值
                    print(rList)
                    for obj in rList:
                        # 4.有数据响应回来了
                        conn = obj.sock
                        data = bytes()
                        while True:
                            try:
                                d = conn.recv(1024)
                                data = data + d
                            except BlockingIOError as e:
                                d = None
                            if not d:
                                break
                        # print(data)
                        obj.callback(data)  # 自定义操作 f1  f2
                        self.fds.remove(obj)
                        # print(len(self.fds),len(self.connections))
                        # 执行当前请求 函数:f1  f2
                    # 【1,2,3,】
                    for obj in wList:
                        # 2.已经连接上远程
                        conn = obj.sock
                        # 3. 发送数据
                        # HTTP/1.0
    Host: %s
    
    
                        template = "GET %s HTTP/1.1
    Host: %s
    
    " %(obj.url,obj.host,)
    
                        # template = "POST %s HTTP/1.1
    Host: 127.0.0.1:8888
    
    k1=v1&k2=v2" %(obj.url,)
                        conn.sendall(template.encode('utf-8'))
                        self.connections.remove(obj)
                except OSError as e:
                    pass

    模拟游览器端

    import 上面代码
    
    
    def f1(data):
        print(data)
    
    
    def f2(data):
        print(data)
    
    url_list = [
        {'host': "www.baidu.com", 'url': '/', 'callback': f1},
        {'host': "www.bing.com", 'url': '/', 'callback': f2},
        {'host': "www.cnblogs.com", 'url': '/wangyufu', 'callback': f1},
        {'host': "www.oldboyedu.com", 'url': '/', 'callback': f1},
    ]
    obj = helei_new.NbIO()
    obj.connect(url_list)
    obj.send()
  • 相关阅读:
    Java虚拟机平台无关性
    全局变量维护
    linux free 打印机
    存储介质
    Linux 从手表到大型主机 硬件驱动
    queue_action
    queue — A synchronized queue class
    Spark Shuffle 中 JVM 内存使用及配置内幕详情
    JAVA中Stack和Heap的区别
    spark 33G表
  • 原文地址:https://www.cnblogs.com/wangyufu/p/6593515.html
Copyright © 2011-2022 走看看