zoukankan      html  css  js  c++  java
  • Python之网络编程 进程 线程 协程

    进程介绍(理论部分)

    1. 进程与程序的区别

      进程是由CPU运行的程序,程序使磁盘中的文件

    2. 介绍几个概念

      串行: 所有的进程由cpu一个一个的解决.

      并发:单个cpu,同时执行多个进程(来回切换的),看起来像是同时运行.

      并行:多个cpu,真正的同时运行多个进程.

      阻塞:遇到IO才叫阻塞.

      一个cpu运行两个进程,其中一个进程完全没有阻塞,

      非阻塞: 没有IO.

    3. 进程的创建

      什么是开启多个进程: socket: server,client 两个进程.

      python中,如果一次想开启多个进程,必须是一个主进程,开启多个子进程.

      linux, windows: 由主进程开启子进程:

      相同点: 原则:主进程开启子进程两个进程都有相互隔离的独立的空间,互不影响.

      不同点:

      ​ linux: 子进程空间的初始数据完全是从主(父)进程copy一份.

      ​ windows: 子进程空间的初始数据完全是从主(父)进程copy一份,但是有所不同.

    process模块介绍

    process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

    由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

    强调:

    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:

    1. group参数未使用,值始终为None

    2. target表示调用对象,即子进程要执行的任务

    3. args表示调用对象的位置参数元组,args=(1,2,'egon',)

    4. kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

    5. name为子进程的名称

    Process的方法

    1. p.start():启动进程,并调用该子进程中的p.run()

    2. p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法

    3. p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

    4. p.is_alive():如果p仍然运行,返回True

    5. p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

    Process属性

    1. p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

    2. p.name:进程的名称

    3. p.pid:进程的pid

    4. p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)

    5. p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

    在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if name ==‘main’ 判断保护起来,import 的时候 ,就不会递归运行了。

    Python并发 之 多进程

    开启进程的两种方式

    # from multiprocessing import Process
    # import time
    #
    #
    # def task(name):
    #
    #     print(f'{name} is running')
    #     time.sleep(3)
    #     print(f'{name} is done')
    #
    #
    # if __name__ == '__main__':  # windows环境下,开启多进程一定放在这个下面
    #
    #     p = Process(target=task,args=('怼哥',))  # args 一定是一个元组的形式.
    #     p.start()
    #     # 通知操作系统,你给我在内存中开辟一个空间,将p这个进程放进去,然后让cpu执行.
    #     print('===主进程')
    
    
    
    # 第二种方式 了解
    from multiprocessing import Process
    import time
    
    
    class MyProcess(Process):
    
        def __init__(self, name):
            super().__init__()  # 必须要继承父类的__init__
    
            self.name = name
    
        def run(self):  # 必须定义run名字.
    
            print(f'{self.name} is running')
            time.sleep(3)
            print(f'{self.name} is done')
    
    
    if __name__ == '__main__':  # windows环境下,开启多进程一定放在这个下面
    
        p = MyProcess('怼怼哥')
        p.start()
        # 通知操作系统,你给我在内存中开辟一个空间,将p这个进程放进去,然后让cpu执行.
        print('===主进程')
    

    线程

    线程的理论知识

    什么是线程

    线程就是进程中的一条流水线

    什么是进程?进程开启经历了什么?

    开启进程: 内存中开启空间, 加载数据, 调用CPU执行, 可能还会使用这个空间的资源.

    进程:主要任务;开启空间,加载数据.

    线程:流水线,执行代码

    1563956515042

    进程:划分空间,加载资源,静态的

    线程:执行代码.执行能力,动态的

    抽象概念

    开启qq:开启一个进程,在内存中,开启空间,加载数据.启动一个线程执行代码

    线程是依赖进程, 一个进程可以包含多个进程,但是一定有一个主线程.线程才是CPU执行的最小单元

    线程vs进程(理论)

    1. 开启多进程开销非常大,是线程的10-100倍,开启线程开销非常小
    2. 开启多进程的速度慢, 开启多线程的速度快
    3. 进程之间数据不能直接共享,通过队列可以.同一进程下的线程之间的数据可以共享

    多线程的应用场景介绍

    ​ 并发:一个CPU来回切换(线程之间的切换),多进程并发,多线程并发.

    ​ 多进程并发: 开启多个进程,每个进程里面的主线程执行任务

    ​ 多线程并发:
    开启一个进程,此进程里面多个线程执行任务

    ​ 什么时候使用多进程, 什么时候使用多线程?

    ​ 一个程序:
    三个不同任务

    ​ 如果以后工作中遇到并发:
    多线程居多

    开启线程的两种方式

    # 第一种方式
    # from threading import Thread
    #
    #
    # def task(name):
    #     print(f'{name} is running')
    #
    #
    #
    # if __name__ == '__main__':
    #     t = Thread(target=task,args=('mcsaoQ',))
    #     t.start()
    #
    #     print('主线程')
    
    
    
    
    from threading import Thread
    
    
    class MyThread(Thread):
    
    
        def run(self):
            print(f'{self.name} is running')
    
    
    if __name__ == '__main__':
        t = MyThread()
        t.start()
    
        print('主线程')
    

    线程与进程之间的对比

    1. 速度的对比

      from threading import Thread
      
      
      def task(name):
          print(f'{name} is running')
      
      
      
      if __name__ == '__main__':
          t = Thread(target=task,args=('mcsaoQ',))
          t.start()
      
          print('主线程')
      
      '''
      线程绝对要比进程要快:
          mcsaoQ is running
          主线程
      '''
      
    2. pid

      # pid 进程号
      from threading import Thread
      import os
      
      def task():
          print(f'子线程: {os.getpid()}')
      
      
      
      if __name__ == '__main__':
          t = Thread(target=task,)
          t.start()
      
          print(f'主线程: {os.getpid()}')
      
    3. 线程之间数据共享

      # from threading import Thread
      #
      # x = 1000
      #
      # def task():
      #     global x
      #     x = 0
      #
      #
      # if __name__ == '__main__':
      #     t = Thread(target=task)
      #     t.start()
      #     # t.join()
      #     print(f'主线程: {x}')
      
      
      
      
      # from threading import Thread
      # import time
      # x = 1000
      #
      # def task():
      #     time.sleep(3)
      #     print('子线程....')
      #
      #
      # def main():
      #     print('111')
      #     print('222')
      #     print('333')
      #
      # #
      # # task()
      # # main()
      #
      # if __name__ == '__main__':
      #     t = Thread(target=task)
      #     t.start()
      #     # t.join()
      #     main()
      

    线程的其他方法

    # from threading import Thread
    # import threading
    # import time
    #
    #
    # def task(name):
    #     time.sleep(1)
    #     print(f'{name} is running')
    #     print(threading.current_thread().name)
    #
    #
    #
    # if __name__ == '__main__':
    #     for i in range(5):
    #         t = Thread(target=task,args=('mcsaoQ',))
    #         t.start()
    #     # 线程对象的方法:
    #     # time.sleep(1)
    #     # print(t.is_alive())  # 判断子线程是否存活  ***
    #     # print(t.getName())  # 获取线程名
    #     # t.setName('线程111')
    #     # print(t.getName())  # 获取线程名
    #
    #     # threading模块的方法:
    #     # print(threading.current_thread().name)  # MainThread
    #     # print(threading.enumerate())  # 返回一个列表 放置的是所有的线程对象
    #     print(threading.active_count())  # 获取活跃的线程的数量(包括主线程)
    #     print('主线程')
    

    守护线程

    # 回顾守护进程
    # from multiprocessing import Process
    # import time
    #
    # def foo():
    #     print(123)  # 进程启动慢,来不及打印123 主进程就完事儿,
    #     time.sleep(1)
    #     print("end123")
    #
    # def bar():
    #     print(456)
    #     time.sleep(3)
    #     print("end456")
    #
    # if __name__ == '__main__':
    #
    #     p1 = Process(target=foo)
    #     p2 = Process(target=bar)
    #     p1.daemon = True
    #     p1.start()
    #     p2.start()
    #
    #     print("main-------")
    
    
    # 守护线程
    # from threading import Thread
    # import time
    #
    # def task(name):
    #     print(f'{name} is running')
    #     time.sleep(1)
    #     print(f'{name} is over')
    #
    #
    #
    # if __name__ == '__main__':
    #     t = Thread(target=task,args=('mcsaoQ',))
    #     t.daemon = True
    #     t.start()
    #
    #     print('主线程')
    
    
    # 分歧:
    
    # from threading import Thread
    # import time
    #
    # def foo():
    #     print(123)
    #     time.sleep(1)
    #     print("end123")
    #
    # def bar():
    #     print(456)
    #     time.sleep(3)
    #     print("end456")
    #
    # if __name__ == '__main__':
    #
    #     t1=Thread(target=foo)
    #     t2=Thread(target=bar)
    #
    #     t1.daemon = True
    #     t1.start()
    #     t2.start()
    #     print("main-------")
    
    # 守护: 子守护主, 只要主结束,子马上结束.
    # 主线程什么时候结束???
    # 多线程是同一个空间,同一个进程,进程代表 空间,资源. 静态的.
    # 主线程是进程空间存活在内存中的必要条件.
    # 主线程: 必须要等待所有的子线程全部结束之后,你在执行完毕,进程在消失.
    # 守护线程必须等待主线程结束才结束, 主线程必须等待所有的非守护线程结束才能结束.
    # 守护线程: 必须等待所有的非守护线程以及主线程结束之后才能够结束.
    
    # a = 1
    #
    # b = 2
    #
    # def func():
    #     time.sleep(3)
    #     print('in func')
    #
    # func()
    #
    
    
    
    from threading import Thread
    import time
    
    def foo():
        print(123)
        time.sleep(3)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(1)
        print("end456")
    
    if __name__ == '__main__':
    
        t1=Thread(target=foo)
        t2=Thread(target=bar)
    
        t1.daemon = True
        t1.start()
        t2.start()
        print("main-------")
    

    互斥锁

    # from threading import Thread
    # import time
    # x = 100
    #
    # def task():
    #     global x
    #     temp = x
    #     time.sleep(1)
    #     temp -= 1
    #     x = temp
    #
    #
    #
    # if __name__ == '__main__':
    #     t = Thread(target=task)
    #     t.start()
    #     t.join()
    #     print(f'主线程{x}')
    

    GIL锁

    GIL锁:全局解释器锁,就是一把互斥锁,将并发变成串行,同一时刻只能有一个线程使用共享资源,牺牲效率,保证数据安全.

    python并发不行,趁着python蹭热度

    1564046894839

    Ipython:交互式解释器,可以不全代码

    Jpython:Java字节码剩下的一样

    pypy:动态编译 JAT技术,技术有缺陷,bug

    1564046999812

    1564047011367

    设置全局锁:GIL

    1. 保证解释器里面的数据安全.当时开发Python是, 只有单核
    2. 强行加锁:减轻了开发人员的负担

    双刃剑:加了这把锁, 带来了什么问题

    1564047310803

    问题1:

    单进程的多线程不能利用多核.诟病之一

    多进程的多线程可以利用多核

    问题二:

    感觉上不能并发的执行问题??

    讨论:单核处理IO阻塞的多线程,与多核处理IO阻塞的多线程效率差不多

    1564047597413

    有了多核之后,GIL锁为什么不去掉:

    源代码太多,改不动了

    1564048323333

    1564048334970

    总结:

    多核的前提下: 如果任务Io密集型: 多线程并发.
    如果任务计算密集型: 多进程并发.

    验证CPython并发执行效率

    # 计算密集型
    # 开启四个进程,开启四个线程
    # from multiprocessing import Process
    # from threading import Thread
    # import time
    # import os
    # # print(os.cpu_count())
    #
    # def task1():
    #     res = 1
    #     for i in range(1, 100000000):
    #         res += i
    #
    #
    # def task2():
    #     res = 1
    #     for i in range(1, 100000000):
    #         res += i
    #
    #
    # def task3():
    #     res = 1
    #     for i in range(1, 100000000):
    #         res += i
    #
    #
    # def task4():
    #     res = 1
    #     for i in range(1, 100000000):
    #         res += i
    #
    # if __name__ == '__main__':
    #     # 四个进程 四个cpu 并行 效率
    #     start_time = time.time()
    #     p1 = Process(target=task1)
    #     p2 = Process(target=task2)
    #     p3 = Process(target=task3)
    #     p4 = Process(target=task4)
    #
    #     p1.start()
    #     p2.start()
    #     p3.start()
    #     p4.start()
    #
    #     p1.join()
    #     p2.join()
    #     p3.join()
    #     p4.join()
    #     print(f'主: {time.time() - start_time}')  # 7.53943133354187
    #
    #     # 一个进程 四个线程 1 cpu 并发  25.775474071502686
    #     # start_time = time.time()
    #     # p1 = Thread(target=task1)
    #     # p2 = Thread(target=task2)
    #     # p3 = Thread(target=task3)
    #     # p4 = Thread(target=task4)
    #     #
    #     # p1.start()
    #     # p2.start()
    #     # p3.start()
    #     # p4.start()
    #     #
    #     # p1.join()
    #     # p2.join()
    #     # p3.join()
    #     # p4.join()
    #     # print(f'主: {time.time() - start_time}')  # 25.775474071502686
    
    # 计算密集型:  多进程的并行  单进程的多线程的并发执行效率高很多.
    
    
    # 讨论IO密集型: 通过大量的任务去验证.
    #
    from multiprocessing import Process
    from threading import Thread
    import time
    import os
    
    
    # print(os.cpu_count())
    
    def task1():
        res = 1
        time.sleep(3)
    
    
    # if __name__ == '__main__':
        
        # 开启150个进程(开销大,速度慢),执行IO任务, 耗时 9.293531656265259
    
        # start_time = time.time()
        # l1 = []
        # for i in range(150):
        #     p = Process(target=task1)
        #     l1.append(p)
        #     p.start()
        # for i in l1:
        #     i.join()
        # print(f'主: {time.time() - start_time}')
    
    
        # 开启150个线程(开销小,速度快),执行IO任务, 耗时 3.0261728763580322
        # start_time = time.time()
        # l1 = []
        # for i in range(150):
        #     p = Thread(target=task1)
        #     l1.append(p)
        #     p.start()
        # for i in l1:
        #     i.join()
        # print(f'主: {time.time() - start_time}')  # 3.0261728763580322
    
    
    # 任务是IO密集型并且任务数量很大,用单进程下的多线程效率高.
    

    讨论GIL全局解释器锁与自定义互斥锁的关系

    # 互斥锁
    
    # 1. GIL 自动上锁解锁, 文件中的互斥锁Lock 手动上锁解锁.
    # 2. GIL锁 保护解释器的数据安全. 文件的互斥锁Lock 保护的文件数据的安全.
    
    # from threading import Thread
    # from threading import Lock
    # import time
    #
    # lock = Lock()
    # x = 100
    #
    # def task():
    #     global x
    #     lock.acquire()
    #     temp = x
    #     # time.sleep(1)
    #     temp -= 1
    #     x = temp
    #     lock.release()
    #
    #
    #
    # if __name__ == '__main__':
    #     t_l = []
    #     for i in range(100):
    #         t = Thread(target=task)
    #         t_l.append(t)
    #         t.start()
    #
    #     for i in t_l:
    #         i.join()
    #
    #     print(f'主线程{x}')
    #
    # # 线程全部是计算密集型:当程序执行,开启100个线程时,第一个线程先要拿到GIL锁,然后拿到lock锁,释放lock锁,最后释放GIL锁.
    
    
    
    from threading import Thread
    from threading import Lock
    import time
    
    lock = Lock()
    x = 100
    
    def task():
        global x
        lock.acquire()
        temp = x
        time.sleep(1)
        temp -= 1
        x = temp
        lock.release()
    
    
    
    if __name__ == '__main__':
        t_l = []
        for i in range(100):
            t = Thread(target=task)
            t_l.append(t)
            t.start()
    
        for i in t_l:
            i.join()
    
        print(f'主线程{x}')
    '''
    线程IO密集型:当程序执行,开启100个线程时,第一个线程先要拿到GIL锁,然后拿到lock锁,
    运行,遇到....截图上有解释..
    
    总结: 自己加互斥锁,一定要加在处理共享数据的地方,加的范围不要扩大,
    
    '''
    # 释放lock锁,最后释放GIL锁.
    

    进程池线程池

    池': 容器, 进程池: 放置进程的一个容器, 线程池: 放置线程的一个容器.
    完成了一个简单的socket通信, 服务端必须与一个客户端交流完毕并且这个客户端断开连接之后,服务端才
    能接待下一个客户端.....
    不合理.

    import socket
    from threading import Thread
    
    def communication(conn):
        while 1:
            try:
                from_client_data = conn.recv(1024)  # 阻塞
                print(from_client_data.decode('utf-8'))
    
                to_client_data = input('>>>').strip()
                conn.send(to_client_data.encode('utf-8'))
            except Exception:
                break
        conn.close()
    
    
    def customer_service():
    
        server = socket.socket()
        server.bind(('127.0.0.1', 8080))
        server.listen()
    
        while 1:
            conn,addr = server.accept()  # 阻塞
            print(f'{addr}客户:')
            t = Thread(target=communication,args=(conn,))
            t.start()
        server.close()
    
    if __name__ == '__main__':
        customer_service()
    

    客户端:

    import socket
    
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    
    while 1:
        to_server_data = input('>>>').strip()
        client.send(to_server_data.encode('utf-8'))
        from_server_data = client.recv(1024)
        print(f'客服回信: {from_server_data.decode("utf-8")}')
    
    client.close()
    

    线程即使开销小,你的电脑不可以无限的开线程,我们应该对线程(进程)做数量的限制.在计算机的能满足的最大情况下,更多的创建线程(进程).

    线程池好,进程池好?

    多线程,多进程.: IO 计算

    同步是指:当程序1调用程序2时,程序1停下不动,直到程序2完成回到程序1来,程序1才继续执行下去。
    异步是指:当程序1调用程序2时,程序1径自继续自己的下一个动作,不受程序2的的影响。

    在进行网络编程时,我们常常见到同步、异步、阻塞和非阻塞四种调用方式。这些方式彼此概念并不好理解。下面是我对这些术语的理解。
    同步
    所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回。按照这个定义,其实绝大多数函数都是同步调用(例如sin, isdigit等)。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。最常见的例子就是 SendMessage。该函数发送一个消息给某个窗口,在对方处理完消息之前,这个函数不返回。当对方处理完毕以后,该函数才把消息处理函数所返回的 LRESULT值返回给调用者。
    异步
    异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。以CAsycSocket类为例(注意,CSocket从CAsyncSocket派生,但是起功能已经由异步转化为同步),当一个客户端通过调用 Connect函数发出一个连接请求后,调用者线程立刻可以朝下运行。当连接真正建立起来以后,socket底层会发送一个消息通知该对象。这里提到执行部件和调用者通过三种途径返回结果:状态、通知和回调。可以使用哪一种依赖于执行部件的实现,除非执行部件提供多种选择,否则不受调用者控制。如果执行部件用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一种很严重的错误)。如果是使用通知的方式,效率则很高,因为执行部件几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。
    阻塞
    阻塞调用是指调用结果返回之前,当前线程会被挂起。函数只有在得到结果之后才会返回。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。例如,我们在CSocket中调用Receive函数,如果缓冲区中没有数据,这个函数就会一直等待,直到有数据才返回。而此时,当前线程还会继续处理各种各样的消息。如果主窗口和调用函数在同一个线程中,除非你在特殊的界面操作函数中调用,其实主界面还是应该可以刷新。socket接收数据的另外一个函数recv则是一个阻塞调用的例子。当socket工作在阻塞模式的时候,如果没有数据的情况下调用该函数,则当前线程就会被挂起,直到有数据为止。
    非阻塞
    非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
    对象的阻塞模式和阻塞函数调用
    对象是否处于阻塞模式和函数是不是阻塞调用有很强的相关性,但是并不是一一对应的。阻塞对象上可以有非阻塞的调用方式,我们可以通过一定的API去轮询状态,在适当的时候调用阻塞函数,就可以避免阻塞。而对于非阻塞对象,调用特殊的函数也可以进入阻塞调用。函数select就是这样的一个例子。

    阻塞 非阻塞 异步 同步

    程序运行中表现得状态: 阻塞 运行 就绪

    阻塞:程序遇到IO阻塞,立马停止(挂起), CPU马上切换, 等到IO结束之后 再执行

    非阻塞: 程序没有遇到IO或者遇到IO通过某种手段让CPU去执行其他的任务,尽可能的占用CPU

    异步,同步:

    站在任务发布的角度:

    同步:任务发布之后,等待,直到这个任务执行完毕, 给我一个返回值,我再发布下一个任

    异步:所有的任务同时发出,我就继续执行下一行 之前的任务执行完毕 结果返回

    # from concurrent.futures import ProcessPoolExecutor
    # import os
    # import time
    # import random
    #
    # def task():
    #     print(f'{os.getpid()} is running')
    #     time.sleep(random.randint(0,2))
    #     return f'{os.getpid()} is finish'
    #
    # if __name__ == '__main__':
    #
    #     p = ProcessPoolExecutor(4)
    #     obj_l1 = []
    #     for i in range(10):
    #         obj = p.submit(task,)  # 异步发出.
    #         obj_l1.append(obj)
    #
    #     # time.sleep(3)
    #     p.shutdown(wait=True)
    #     # 1. 阻止在向进程池投放新任务,
    #     # 2. wait = True 十个任务是10,一个任务完成了-1,直至为零.进行下一行.
    #     # print(666)
    #     for i in obj_l1:
    #         print(i.result())
        # 异步回收任务的方式一: 我将所有的任务的结果统一收回.
    
    
    
    # 同步发布任务: 我要发布10个任务,先把第一个任务给第一个进程,等到第一个进程完成之后.
    # 我在将第二任务给了下一个进程,......
    
    # 异步发布任务: 我直接将10个任务抛给4个进程, 我就继续执行下一行代码了.等结果.
    
    
    
    
    # 同步:
    
    # from concurrent.futures import ProcessPoolExecutor
    # import os
    # import time
    # import random
    #
    # def task():
    #     print(f'{os.getpid()} is running')
    #     time.sleep(1)
    #     return f'{os.getpid()} is finish'
    #
    # if __name__ == '__main__':
    #
    #     p = ProcessPoolExecutor(4)
    #
    #     for i in range(10):
    #         obj = p.submit(task,)  # 异步发出.
    #         print(obj.result())
    

    异步 + 调用机制

    # # 爬虫.
    # # 1 简单认识一下requests模块
    # # 第一步: 爬取服务端的文件(IO阻塞).
    # # 第二步: 拿到文件,进行数据分析,(非IO,IO极少)
    # import requests
    # from concurrent.futures import ProcessPoolExecutor
    # from multiprocessing import Process
    # import time
    # import random
    # import os
    #
    # def get(url):
    #     response = requests.get(url)
    #     print(f'{os.getpid()} 正在爬取:{url}')
    #     time.sleep(random.randint(1,3))
    #     if response.status_code == 200:
    #         return response.text
    #
    #
    # def parse(text):
    #     '''
    #     对爬取回来的字符串的分析
    #     简单用len模拟一下.
    #     :param text:
    #     :return:
    #     '''
    #     print(f'{os.getpid()} 分析结果:{len(text)}')
    #
    # # get('http://www.taobao.com')
    # # get('http://www.baidu.com')
    # # get('http://www.JD.com')
    #
    # if __name__ == '__main__':
    #
    #     url_list = [
    #         'http://www.taobao.com',
    #         'http://www.JD.com',
    #         'http://www.JD.com',
    #         'http://www.JD.com',
    #         'http://www.baidu.com',
    #         'https://www.cnblogs.com/jin-xin/articles/11232151.html',
    #         'https://www.cnblogs.com/jin-xin/articles/10078845.html',
    #         'http://www.sina.com.cn',
    #         'https://www.sohu.com',
    #         'https://www.youku.com',
    #     ]
    #     pool = ProcessPoolExecutor(4)
    #     obj_list = []
    #     for url in url_list:
    #         obj = pool.submit(get, url)
    #         obj_list.append(obj)
    #
    #     pool.shutdown(wait=True)
    #
    #     for obj in obj_list:
    #         parse(obj.result())
    '''
        串行
        obj_list[0].result()
        obj_list[1].result()
        obj_list[2].result()
        obj_list[3].result()
        obj_list[4].result()
    '''
    
    # 问题出在哪里?
    # 1. 分析结果的过程是串行,效率低.
    # 2. 你将所有的结果全部都爬取成功之后,放在一个列表中,分析.
    # 问题1解决:
    # 在开进程池,再开进程,耗费资源.
    
    '''
    爬取一个网页需要2s,并发爬取10个网页:2.多s.
    分析任务: 1s.    10s. 总共12.多秒.
    
    现在这个版本的过程:
        异步发出10个爬取网页的任务,然后4个进程并发(并行)的先去完成4个爬取网页的任务,然后谁先结束,谁进行下一个
        爬取任务,直至10个任务全部爬取成功.
        将10个爬取结果放在一个列表中,串行的分析.
        
    爬取一个网页需要2s,分析任务: 1s,总共3s,总共3.多秒(开启进程损耗).
    .    10s.
    下一个版本的过程:
        异步发出10个 爬取网页+分析 的任务,然后4个进程并发(并行)的先去完成4个爬取网页+分析 的任务,
        然后谁先结束,谁进行下一个 爬取+分析 任务,直至10个爬取+分析 任务全部完成成功.
    
        
    
    
    '''
    
    # 版本二:
    # 异步处理: 获取结果的第二种方式: 完成一个任务返回一个结果,完成一个任务,返回一个结果 并发的返回.
    
    # import requests
    # from concurrent.futures import ProcessPoolExecutor
    # from multiprocessing import Process
    # import time
    # import random
    # import os
    #
    # def get(url):
    #     response = requests.get(url)
    #     print(f'{os.getpid()} 正在爬取:{url}')
    #     time.sleep(random.randint(1,3))
    #     if response.status_code == 200:
    #         parse(response.text)
    #
    #
    # def parse(text):
    #     '''
    #     对爬取回来的字符串的分析
    #     简单用len模拟一下.
    #     :param text:
    #     :return:
    #     '''
    #     print(f'{os.getpid()} 分析结果:{len(text)}')
    #
    # if __name__ == '__main__':
    #
    #     url_list = [
    #         'http://www.taobao.com',
    #         'http://www.JD.com',
    #         'http://www.JD.com',
    #         'http://www.JD.com',
    #         'http://www.baidu.com',
    #         'https://www.cnblogs.com/jin-xin/articles/11232151.html',
    #         'https://www.cnblogs.com/jin-xin/articles/10078845.html',
    #         'http://www.sina.com.cn',
    #         'https://www.sohu.com',
    #         'https://www.youku.com',
    #     ]
    #     pool = ProcessPoolExecutor(4)
    #     for url in url_list:
    #         obj = pool.submit(get, url)
    #
    #     # pool.shutdown(wait=True)
    #     print('主')
    
    
    # 版本三: 版本二几乎完美,但是两个任务有耦合性. 再上一个基础上,对其进程解耦.
    # 回调函数
    
    # import requests
    # from concurrent.futures import ProcessPoolExecutor
    # from multiprocessing import Process
    # import time
    # import random
    # import os
    #
    # def get(url):
    #     response = requests.get(url)
    #     print(f'{os.getpid()} 正在爬取:{url}')
    #     # time.sleep(random.randint(1,3))
    #     if response.status_code == 200:
    #         return response.text
    #
    #
    # def parse(obj):
    #     '''
    #     对爬取回来的字符串的分析
    #     简单用len模拟一下.
    #     :param text:
    #     :return:
    #     '''
    #     time.sleep(1)
    #     print(f'{os.getpid()} 分析结果:{len(obj.result())}')
    #
    # if __name__ == '__main__':
    #
    #     url_list = [
    #         'http://www.taobao.com',
    #         'http://www.JD.com',
    #         'http://www.JD.com',
    #         'http://www.JD.com',
    #         'http://www.baidu.com',
    #         'https://www.cnblogs.com/jin-xin/articles/11232151.html',
    #         'https://www.cnblogs.com/jin-xin/articles/10078845.html',
    #         'http://www.sina.com.cn',
    #         'https://www.sohu.com',
    #         'https://www.youku.com',
    #     ]
    #     start_time = time.time()
    #     pool = ProcessPoolExecutor(4)
    #     for url in url_list:
    #         obj = pool.submit(get, url)
    #         obj.add_done_callback(parse)  # 增加一个回调函数
    #         # 现在的进程完成的还是网络爬取的任务,拿到了返回值之后,结果丢给回调函数add_done_callback,
    #         # 回调函数帮助你分析结果
    #         # 进程继续完成下一个任务.
    #     pool.shutdown(wait=True)
    #
    #     print(f'主: {time.time() - start_time}')
    
    # 回调函数是主进程帮助你实现的, 回调函数帮你进行分析任务. 明确了进程的任务: 只有一个网络爬取.
    # 分析任务: 回调函数执行了.对函数之间解耦.
    
    # 极值情况: 如果回调函数是IO任务,那么由于你的回调函数是主进程做的,所以有可能影响效率.
    
    # 回调不是万能的,如果回调的任务是IO,
    # 那么异步 + 回调机制 不好.此时如果你要效率只能牺牲开销,再开一个线程进程池.
    
    
    # 异步就是回调! 这个是错的!! 异步,回调是两个概念.
    
    # 如果多个任务,多进程多线程处理的IO任务.
    # 1. 剩下的任务 非IO阻塞.  异步 + 回调机制
    # 2. 剩下的任务 IO << 多个任务的IO  异步 + 回调机制
    # 3. 剩下的任务 IO >= 多个任务的IO  第二种解决方式,或者两个进程线程池.
    

    线程队列

    # 1 FIFO queue
    
    # import queue
    #
    # q = queue.Queue(3)
    # q.put(1)
    # q.put(2)
    # q.put('太白')
    # # q.put(666)
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    
    
    # LIFO 栈.
    # import queue
    #
    # q = queue.LifoQueue()
    # q.put(1)
    # q.put(3)
    # q.put('barry')
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    
    
    # 优先级队列
    # 需要元组的形式,(int,数据) int 代表优先级,数字越低,优先级越高.
    # import queue
    # q = queue.PriorityQueue(3)
    #
    # q.put((10, '垃圾消息'))
    # q.put((-9, '紧急消息'))
    # q.put((3, '一般消息'))
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    

    事件 Event

    # import time
    # from threading import Thread
    # from threading import current_thread
    #
    # flag = False
    #
    #
    # def task():
    #     print(f'{current_thread().name} 检测服务器是否正常开启....')
    #     time.sleep(3)
    #     global flag
    #     flag = True
    #
    #
    # def task1():
    #     while 1:
    #         time.sleep(1)
    #         print(f'{current_thread().name} 正在尝试连接服务器.....')
    #         if flag:
    #             print('连接成功')
    #             return
    #
    # if __name__ == '__main__':
    #     t1 = Thread(target=task1,)
    #     t2 = Thread(target=task1,)
    #     t3 = Thread(target=task1,)
    #
    #     t = Thread(target=task)
    #
    #
    #     t.start()
    #     t1.start()
    #     t2.start()
    #     t3.start()
    
    
    
    import time
    from threading import Thread
    from threading import current_thread
    from threading import Event
    
    event = Event()  # 默认是False
    def task():
        print(f'{current_thread().name} 检测服务器是否正常开启....')
        time.sleep(3)
        event.set()  # 改成了True
    
    def task1():
        print(f'{current_thread().name} 正在尝试连接服务器')
        # event.wait()  # 轮询检测event是否为True,当其为True,继续下一行代码. 阻塞.
        event.wait(1)
        # 设置超时时间,如果1s中以内,event改成True,代码继续执行.
        # 设置超时时间,如果超过1s中,event没做改变,代码继续执行.
        print(f'{current_thread().name} 连接成功')
    if __name__ == '__main__':
        t1 = Thread(target=task1,)
        t2 = Thread(target=task1,)
        t3 = Thread(target=task1,)
    
        t = Thread(target=task)
    
    
        t.start()
        t1.start()
        t2.start()
        t3.start()
    

    协程初识

    协程

    # # import time
    # #
    # # def func1():
    # #
    # #     for i in range(11):
    # #         yield
    # #         print('这是我第%s次打印啦' % i)
    # #         time.sleep(1)
    # #
    # #
    # # def func2():
    # #     g = func1()
    # #     #next(g)
    # #     for k in range(10):
    # #
    # #         print('哈哈,我第%s次打印了' % k)
    # #         time.sleep(1)
    # #         next(g)
    #
    # #不写yield,下面两个任务是执行完func1里面所有的程序才会执行func2里面的程序,
    # # 有了yield,我们实现了两个任务的切换+保存状态
    # # func1()
    # # func2()
    #
    #
    # # 计算密集型:串行与协程的效率对比
    # # import time
    # #
    # # def task1():
    # #     res = 1
    # #     for i in range(1,100000):
    # #         res += i
    # #
    # #
    # # def task2():
    # #     res = 1
    # #     for i in range(1,100000):
    # #         res -= i
    # #
    # # start_time = time.time()
    # # task1()
    # # task2()
    # # print(f'串行消耗时间:{time.time()-start_time}')  # 串行消耗时间:0.012000560760498047
    #
    #
    # import time
    #
    #
    # def task1():
    #     res = 1
    #     for i in range(1, 100000):
    #         res += i
    #         yield res
    #
    #
    # def task2():
    #     g = task1()
    #     res = 1
    #     for i in range(1, 100000):
    #         res -= i
    #         next(g)
    #
    #
    # start_time = time.time()
    # task2()
    # print(f'协程消耗时间:{time.time() - start_time}')  # 协程消耗时间:0.0260012149810791
    
    #
    # from greenlet import greenlet
    # import time
    #
    # # 不能自动切换,
    # # 遇到IO不切换
    # # 可以保持原来的状态.
    # def eat(name):
    #
    #     print('%s eat 1' %name)  #2
    #     g2.switch('alex')   #3
    #     time.sleep(3)
    #     print('%s eat 2' %name) #6
    #     g2.switch() #7
    #
    # def play(name):
    #     print('%s play 3' %name) #4
    #     g1.switch()      #5
    #     print('%s play 4' %name) #8
    #
    # g1 = greenlet(eat)
    # g2 = greenlet(play)
    #
    # g1.switch('太白')  # 1  第一次切换一定要传参
    #
    # # g2.switch('b1')
    #
    # # time.sleep(300)
    
    # 还没有做到真正遇到IO切换
    #
    # import gevent
    # import time
    # def eat(name):
    #     print('%s eat 1' %name)  # 1
    #     # gevent.sleep(2)
    #     time.sleep(300)
    #     print('%s eat 2' %name)
    #
    # def play(name):
    #     print('%s play 1' %name)  # 2
    #     # gevent.sleep(1)
    #     time.sleep(3)
    #     print('%s play 2' %name)
    #
    #
    # g1 = gevent.spawn(eat, 'alex')
    # g2 = gevent.spawn(play, name='taibai')
    # # g1.join()
    # # g2.join()
    # #或者gevent.joinall([g1,g2])
    # gevent.joinall([g1,g2])
    # print('主')
    
    
    
    import threading
    from gevent import monkey
    monkey.patch_all()  # 将你代码中的所有的IO都标识.
    
    import gevent  # 直接导入即可
    import time
    def eat():
        print(f'线程1:{threading.current_thread().getName()}')
        print('eat food 1')
        time.sleep(3)  # 加上mokey就能够识别到time模块的sleep了
        print('eat food 2')
    
    def play():
        print(f'线程2:{threading.current_thread().getName()}')
        print('play 1')
        time.sleep(1)  # 来回切换,直到一个I/O的时间结束,这里都是我们个gevent做得,不再是控制不了的操作系统了。
        print('play 2')
    
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play)
    gevent.joinall([g1,g2])
    print(f'主:{threading.current_thread().getName()}')
    
  • 相关阅读:
    List的Select 和Select().tolist()
    lambda中FirstOrDefault和First
    EF时,数据库字段和实体类不一致问题
    用户变量问题
    实验十二:字符串和结构
    实验十一:指针(2)
    实验十:指针(1)
    实验九:二维数组和字符数组的应用
    实验八:一维数组的应用
    实验七:函数及数组(1)
  • 原文地址:https://www.cnblogs.com/Jacob-yang/p/11228979.html
Copyright © 2011-2022 走看看