zoukankan      html  css  js  c++  java
  • 进程 线程 协程

    进程

    进程 :
    什么是进程?
    是操作系统的发展过程中,为了提高CPU的利用率,在操作系统同时运行多个程序的时候,为了数据的安全代码不混乱而被创造出来的一个概念
    每一个程序运行起来都至少是一个进程.
    进程是计算机中最小的资源分配单位
    进程被操作系统调度的,有很多相关的算法 - 早期的操作系统
    进程之间是数据隔离的
    进程的三状态 就绪 运行 阻塞
    同步异步
        同步 : 一个任务的执行依赖另一个事务的结束
                join lock
        异步 : 一个任务的执行不依赖另一个事务的结束
                start terminate
    阻塞非阻塞
        阻塞   : accept recv recvfrom queue.get join
        非阻塞 : setblocking = False
    并发并行
        并行是特殊的并发
        并行就是 同一时刻 两个以上的程序同时在cpu上执行
        并发就是 同一时段 两个以上的程序看起来在同时执行
    IO概念 : 文件操作 数据库操作 网络传输 用户输入输出
        Input  得到bytes/str
        Output 发送数据/输出数据
    
    因为进程与进程之间本质上是异步且数据隔离
    守护进程 : 子进程等待主进程的代码结束就结束了
    同步控制
        join
        锁 - 互斥锁 : 多个进程同时对一个数据进行操作的时候 操作同一个文件/数据库/管道/Manager.dict
        信号量
        事件
    数据共享 - 数据不安全
        Manager
    IPC-进程之间通信
        管道
        队列 - 生产者消费者模型(为了解决数据的生产和处理的效率问题)
        第三方工具(消息队列,消息中间件)
    进程池
        解决大量任务 开启多个进程的开销过大的问题
        节省资源,提高并发效率的
        一般开进程数 cpu_count * 1 or 2
    进程? 时间片? 阻塞非阻塞,同步异步,进程三状态,并发并行,数据隔离,IO概念//进程池,IPC,锁,事件,信号量 ...
    Python并不支持真正意义上的多线程。Python中提供了多线程模块,但如果想通过多线程提高代码的速度,并不推荐使用多线程模块。Python中有一个全局锁Global Interpreter Lock(GIL),全局锁会确保任何时候多个线程中只有一个会被执行。线程的执行速度非常快,会误以为线程是并行执行的,但实际上都是轮流执行。经过GIL处理后,会增加线程执行的开销。
    全局锁 GIL(Global interpreter lock) 并不是 Python 的特性,而是在实现 Python 解析器(CPython)时所引入的一个概念。Python有CPython,PyPy,Psyco 等不同的 Python 执行环境,其中 JPython 没有GIL。CPython 是大部分环境下默认的 Python 执行环境,GIL 并不是 Python 的特性,Python 完全可以不依赖于 GIL。
    GIL 限制了同一时刻只能有一个线程运行,无法发挥多核 CPU 的优势。GIL 本质是互斥锁,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。在一个 Python 的进程内,不仅有主线程或者由主线程开启的其它线程,还有解释器开启的垃圾回收等解释器级别的线程。进程内,所有数据都是共享的,代码作为一种数据也会被所有线程共享,多个线程先访问到解释器的代码,即拿到执行权限,然后将 target 的代码交给解释器的代码去执行,解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,因此为了保证数据安全需要加锁处理,即 GIL。 
    由于GIL 的存在,同一时刻同一进程中只有一个线程被执行。多核 CPU可以并行完成计算,因此多核可以提升计算性能,但 CPU 一旦遇到 I/O 阻塞,仍然需要等待,所以多核CPU对 I/O 密集型任务提升不明显。根据执行任务是计算密集型还是I/O 密集型,不同场景使用不同的方法,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势
    GIL
    所有的程序 - 任务
    所有的任务 - 进程
    进程 ?  进行中的程序  PID进程ID
        进程是计算机中资源分配的最小单位
        进程间数据隔离   要通信的话运用socket
      进程调度:--先来先服务调度算法,短作业优先,时间片轮转法,多级反馈机制

    并发 资源有限的情况下,AB程序交替使用cpu目的是提高效率
    并行 同一时刻都在执行 多核  (是并发里的一种特殊情况)  更苛刻的条件
    
    同步:  程序顺序执行,多个任务之间串行执行 (洗衣完--做饭完--洗碗)
    异步:  多个任务同时运行  (在同一时间内洗衣做饭洗碗)
    
    阻塞: 程序由于不符合某个条件或者等待某个条件满足 而在某一个地方进入等待状态
    非阻塞: 程序正常执行
    同步阻塞
    一件事儿一件事儿的做
    中间还要被阻塞
    同步非阻塞 : 费力不讨好
    一件事儿一件事儿的做
    但是不阻塞
    异步阻塞
    同时进行的
    每一件事儿都会遇到阻塞事件
    异步非阻塞
    几个事情同时进行
    每一件事都不阻塞
     
    import os,time
    print(os.getpid())  #获取当前进程号
    print(os.getppid()) #获取当前父进程id
    
    time.sleep(5)
    print(os.getpid()) 
    查看进程号
    import multiprocessing  #是个包
    
    from multiprocessing import Process
    import os
    def son_process():
        # 这个函数的代码实在子进程执行的
        print('执行我啦',os.getpid(),os.getppid())
        print()
    
    if __name__ == '__main__':
        print('1-->',os.getpid()) #主进程
        p = Process(target=son_process) #实例化
        p.start()  #进程开始
    下面是打印结果
    #  1--> 6416
    #  执行我啦 7388 6416
    python 开启一个子进程
    import time
    from multiprocessing import Process
    # 通过并发实现一个有并发效果的socket server
    
    # 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行)
    def son_process():
        print('son start')
        time.sleep(1)
        print('son end')
    
    if __name__ == '__main__':
        p = Process(target=son_process) #创建子进程
        p.start() #通知操作系统开启一个子进程  os响应需求 分配资源 执行进程中的代码
        print('主进程')
    并发的主/子进程
    import time
    from multiprocessing import Process
    # 通过并发实现一个有并发效果的socket server
    
    # 1.开启了一个子进程就已经实现了并发: 父进程(主进程)和子进程并发(同时执行)
    def son_process():
        print('son start')
        time.sleep(1)
        print('son end')
    
    if __name__ == '__main__':
        p = Process(target=son_process) #创建子进程
        p.start() #通知操作系统开启一个子进程  os响应需求 分配资源 执行进程中的代码
    
        for i in range(5):
            print('主进程')
            time.sleep(0.3)
    时间测试 主/子进程
    import time
    from multiprocessing import Process
    def son_process():
        print('son start')
        time.sleep(1)
        print('son end')
    
    if __name__ == '__main__':
        for i in range(3):
            p = Process(target=son_process)
            p.start()
    开启多个子进程
    import time
    from multiprocessing import Process
    def son_process(i):
        print('son start',i)
        time.sleep(1)
        print('son end',i)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=son_process,args=(i,)) #args必须元组
            p.start()  # 通知操作系统 start并不意味着子进程已经开始了
    子进程中传参数
    import time
    from multiprocessing import Process
    def son_process(i):
        print('son start',i)
        time.sleep(1)
        print('son end',i)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=son_process,args=(i,))  #子进程不支持返回值
            p.start()
        print('主进程的代码执行完毕')
    # 主进程会等待子进程结束之后才结束
    # 为什么?
    # 父进程负责创建子进程,也负责回收子进程的资源
    主进程和子进程之间的关系
     - server.py-
    import socket,time
    from multiprocessing import Process
    def talk(conn):
        conn, addr = sk.accept()
        print(conn)
        while True:
            msg = conn.recv(1024).decode()
            time.sleep(10)
            conn.send(msg.upper().encode())
    
    if __name__ == '__main__':
        # 这句话下面的所有代码都只在主进程中执行
        sk = socket.socket()
        sk.bind(('127.0.0.1',9000))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            Process(target=talk,args=(sk,)).start()
    
    # 卡 大量的while True 并且代码中并没有太多的其他操作
    # 如果我们使用socketserver,不会这么卡
    # 多进程确实可以帮助我们实现并发效果,但是还不够完美
    # 操作系统没开启一个进程要消耗大量的资源
    # 操作系统要负责调度进程 进程越多 调度起来就越吃力
    
     - clitent.py-
    import socket
    
    sk = socket.socket()
    
    sk.connect(('127.0.0.1',9000))
    while True:
        sk.send(b'hello')
        print(sk.recv(1024))
    多进程实现socketserver
    def son_process(i):
        while True:
            print('son start',i)
            time.sleep(0.5)
            print('son end',i)
    
    if __name__ == '__main__':
        p = Process(target=son_process, args=(1,))
        p.start()           # 开启一个子进程,异步的
        print('主进程的代码执行完毕')
        print(p.is_alive())  # 子进程还活着
        p.terminate()        # 结束一个子进程,异步的
        print(p.is_alive())  # 子进程还在活着
        time.sleep(0.1)
        print(p.is_alive()) # False
    主进程可不可以直接结束一个子进程?
    n = [100]
    def sub_n():  # 减法
        global n  # 子进程对于主进程中的全局变量的修改是不生效的
        n.append(1)
        print('子进程n : ',n)   #子进程n :  [100, 1]
    if __name__ == '__main__':
        p = Process(target = sub_n)
        p.start()
        p.join()     # 阻塞 直到子进程p结束
        print('主进程n : ',n)  #主进程n :  [100]
    数据隔离
    # 主进程里的print('主进程n : ',n)这句话在十个子进程执行完毕之后才执行
    n = [100]
    import random
    def sub_n():
        global n  # 子进程对于主进程中的全局变量的修改是不生效的
        time.sleep(random.random())
        n.append(1)
        print('子进程n : ',n)
    if __name__ == '__main__':
        p_lst = []  #进程添加进列表
        for i in range(10):
            p = Process(target = sub_n) #创建
            p.start() #通知os 开启
            p_lst.append(p)
        for p in p_lst:p.join()  # 阻塞 只有一个条件是能够让我继续执行 这个条件就是子进程结束
        print('主进程n : ',n)
    开启十个进程执行subn
    n = [100]
    def sub_n():
        global n  # 子进程对于主进程中的全局变量的修改是不生效的
        n.append(1)
        print('子进程n : ',n)
        time.sleep(10)
        print('子进程结束')
    
    if __name__ == '__main__':
        p = Process(target = sub_n)
        p.start()
        p.join(timeout = 5)     # 如果不设置超时时间 join会阻塞直到子进程p结束
    #     # timeout超时
    #     # 如果设置的超时时间,那么意味着如果不足5s子进程结束了,程序结束阻塞
    #     # 如果超过5s还没有结束,那么也结束阻塞
        print('主进程n : ',n)
        p.terminate()  # 也可以强制结束一个子进程
    join拓展 超时时间
    # 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束
    # 由于主进程要负责给所有的子进程收尸,所以主进程必须是最后结束,守护进程只能在主进程的代码结束之后就认为主进程结束了
    # 守护进程在主进程的代码结束之后就结束了,不会等待其他子进程结束
    #
    # 希望守护进程必须等待所有的子进程结束之后才结束
    # ????
    # import time
    # from multiprocessing import Process
    # def alive():
    #     while True:
    #         print('连接监控程序,并且发送报活信息')
    #         time.sleep(0.6)
    #
    # def func():
    #     '主进程中的核心代码'
    #     while True:
    #         print('选择的项目')
    #         time.sleep(1)
    #         print('根据用户的选择做一些事儿')
    #
    # if __name__ == '__main__':
    #     p = Process(target=alive)
    #     p.daemon = True   # 设置子进程为守护进程,守护进程会随着主进程代码的结束而结束
    #     p.start()
    #     p = Process(target=func)
    #     p.start()
    #     p.join()   # 在主进程中等待子进程结束,守护进程就可以帮助守护其他子进程了
    
    
    # 守护进程
    # 1.守护进程会等待主进程的代码结束而结束,不会等待其他子进程的结束
    # 2.要想守护进程等待其他子进程,只需要在主进程中加上join
    守护进程
    for i in range(5):
        pass
    print(i)  # i=4
    
    lst = []
    for i in range(5):
        p = Process()
        lst.append(p)
        p.start()
    for p in lst:
        p.join()
        p.terminate()
    操作多个子进程的结束terminate和join阻塞
    import os
    from multiprocessing import Process
    class MyProcess(Process): #继承
        def __init__(self,参数):
            super().__init__() #父类 初始化
            self.一个属性 = 参数 
    
        def run(self):
            print('子进程中要执行的代码')
    
    if __name__ == '__main__':
        conn = '一个链接'
        mp = MyProcess(conn)
        mp.start()
    面向对象 开启子进程的方法
    当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
    接下来,我们以  模拟抢票  为例,来看看数据安全的重要性。
    import json
    import time
    from multiprocessing import Process,Lock
    
    def search(name):
        '''查询余票的功能'''
        with open('ticket') as f:  # 'r'
            dic = json.load(f)  # 读取  dict
            print(name , dic['count'])
    
    def buy(name): # 买票
        with open('ticket') as f:
            dic = json.load(f)
        time.sleep(0.1)
        if dic['count'] > 0:
            print(name,'买到票了')
            dic['count'] -= 1
        time.sleep(0.1)
        with open('ticket','w') as f:
            json.dump(dic,f)  # 写进去
    
    def get_ticket(name,lock): #整个操作
        search(name) # 先查
        lock.acquire()  # 只有第一个到达的进程才能获取锁,剩下的其他人都需要在这里阻塞  上锁
        buy(name)  # 再买
        lock.release()  # 有一个人还锁,会有一个人再结束阻塞拿到钥匙  还锁
    
    if __name__ == '__main__':
        lock = Lock()  # 实例化锁
        for i in range(10):  # 10个进程
            p = Process(target=get_ticket,args=('name%s'%i,lock)) # 创建
            p.start() # 通知os 开启
    
    # tips : ticket 里面的数据结构 {"count": 0}
    
        # 模拟过程描述:
    # 第一个来的人 取钥匙 开门 进门 关门 带着钥匙反锁
    # 第一个拿到钥匙的人  开门 出门 锁门 挂钥匙
    锁 multiprocess.Lock 模拟抢票 例子
    进程 状态码 Z/z 僵尸进程 linux命令
    主进程中控制子进程的方法:
        p = Process(target,args) #创建这一刻 根本没有通知操作系统
        p.start() #通知os 开启子进程  异步非阻塞
        p.terminate() #通知os,关闭子进程,异步非阻塞
        p.is_alive() # 查看子进程是否还活着
        p.join(timeout=10) # 阻塞 直到子进程结束  超时时间理解
        
    # 守护进程
        # 守护进程是一个子进程
        # 守护进程会在主进程代码结束之后才结束
        # 为什么会这样?
            # 由于主进程必须要回收所有的子进程的资源
            # 所以主进程必须在子进程结束之后才能结束
            # 而守护进程就是为了守护主进程存在的
            # 不能守护到主进程结束,就只能退而求其次,守护到代码结束了
        # 守护到主进程的代码结束,意味着如果有其他子进程没有结束,守护进程无法继续守护
        # 解决方案 : 在主进程中加入对其他子进程的join操作,来保证守护进程可以守护所有主进程和子进程的执行
        # 如何设置守护进程
            # 子进程对象.daemon = True 这句话写在start之前
    
    #
        # 为什么要用锁?
        # 由于多个进程的并发,导致很多数据的操作都在同时进行
        # 所以就有可能产生多个进程同时操作 : 文件数据库 中的数据
        # 导致数据不安全
        # 所以给某一段修改数据的程序加上锁,就可以控制这段代码永远不会被多个进程同时执行
        # 保证了数据的安全
    # Lock 锁(互斥锁)
    # 锁实际上是把你的某一段程序变成同步的了,降低了程序运行的速度,为了保证数据的安全性
    # 没有数据安全的效率都是耍流氓
    进程 总结
    # 对于锁 保证一段代码同一时刻只能有一个进程执行
    # 对于信号量 保证一段代码同一时刻只能有n个进程执行
    # 流量控制
    
    import time
    import random
    from multiprocessing import Process,Semaphore
    def ktv(name,sem):
        sem.acquire() #拿锁
        print("%s走进了ktv"%name)
        time.sleep(random.randint(5,10))
        print("%s走出了ktv" % name)
        sem.release() #还锁
    
    if __name__ == '__main__':
        sem = Semaphore(4) # 同时只能有4个进程执行
        for i in range(25):
            p = Process(target=ktv,args = ('name%s'%i,sem))
            p.start()
    
    # 信号量原理 : 锁 + 计数器实现的
    # 普通的锁 acquire 1次
    # 信号量   acquire 多次
    # count计数
    # count = 4
    # acquire count -= 1
    # 当count减到0的时候 就阻塞
    # release count + = 1
    # 只要count不为0,你就可以继续acquire
    信号量 Semaphore
    # from multiprocessing import Event
    # Event 事件类
    # e = Event()
    # e 事件对象
    # 事件本身就带着标识 : False
    # wait 阻塞
    # 它的阻塞条件是 对象标识为False
    # 结束阻塞条件是 对象标识为True
    
    # 对象的标识相关的 :
    # set  将对象的标识设置为True
    # clear 将对象的标识设置为False
    # is_set 查看对象的标识是否为True
    
    import time
    import random
    from multiprocessing import Event,Process
    def traffic_light(e):
        print('33[1;31m红灯亮33[0m')
        while True:
            time.sleep(2)
            if e.is_set():   # 如果当前是绿灯
                print('33[1;31m红灯亮33[0m') # 先打印红灯亮
                e.clear()                        # 再把灯改成红色
            else :           # 当前是红灯
                print('33[1;32m绿灯亮33[0m') # 先打印绿灯亮
                e.set()                          # 再把灯变绿色
    #
    def car(e,carname):
        if not e.is_set():  # False
            print('%s正在等待通过'%carname)
            e.wait()  #阻塞
        print('%s正在通过'%carname)
    
    if __name__ == '__main__':
        e = Event()  #创建
        p = Process(target=traffic_light,args = (e,)) #创建进程
        p.start() #开始
        for i in range(100):  #100辆车
            time.sleep(random.randrange(0,3)) #随机
            p = Process(target=car, args=(e,'car%s'%i))
            p.start()
    
    # 太复杂了
    # 在我们进行并发操作的时候很少用到这么复杂的场景
    
    # Event事件
    # 放到进程中的代码一定不止一段
    # 这两个操作之间 存在同步关系
    # 一个操作去确认另一个操作的执行条件是否完成
    
    # 标识 控制wait是否阻塞的关键
    # 如何修改这个标识 : clear set
    # 如何查看这个标识 : is_set
    事件Event
    # 管道数据不安全 管道加锁就是队列 
    from multiprocessing import Pipe,Process
    
    def f(conn):  #接收子conn
        conn.send('hello world') #发消息
        conn.close()
    
    if __name__ == '__main__':
        parent_conn,child_conn = Pipe()
        p = Process(target=f,args=(child_conn,)) #传子conn
        p.start()
        print(parent_conn.recv()) #父conn接收
        p.join()
    Pipe 管道 IPC数据间通信 是队列的底层
    # 进程之间 数据隔离
    # 凭什么判断 子进程是否执行完毕了????
    # lock对象
    # a进程 acquire了 b进程在acquire的地方一直阻塞直到a release
    # 你在b进程 如何知道a进程release了?
    # 你之前学习的lock semaphore event实际上都用到了进程之间的通信
    # 只不过这些通信都是非常简单而固定的信号
    # 在你使用这些工具的过程中并感知不到
    
    # 对于用户来讲 : 就希望能够去进行一些更加复杂的 不固定的内容的交互
    # 这种情况下使用lock semaphore event就不可行了
    
    # 进程间通信 IPC
    # IPC Inter-Process Communication
    # 实现进程之间通信的两种机制:
        # 管道 Pipe  数据不安全
        # 队列 Queue  管道+锁
    
    # from multiprocessing import Queue,Process
    #
    # def consumer(q):
    #     print(
    #        '子进程 :', q.get()
    #     )
    #
    #
    # if __name__ == '__main__':
    #     q = Queue()
    #     p = Process(target=consumer,args=(q,))
    #     p.start()
    #     q.put('hello,world')
    
    # 生产者消费者模型
    import time
    from multiprocessing import Queue,Process
    
    def producer(name,food,num,q):
        '''生产者'''
        for i in range(num):
            time.sleep(0.3)
            foodi = food + str(i)
            print('%s生产了%s'%(name,foodi))
            q.put(foodi)
    
    def consumer(name,q):
        while True:
            food = q.get()   # 等待接收数据
            if food == None:break
            print('%s吃了%s'%(name,food))
            time.sleep(1)
    
    if __name__ == '__main__':
        q = Queue(maxsize=10)
        p1 = Process(target=producer,args = ('宝元','泔水',20,q))
        p2 = Process(target=producer,args = ('战山','鱼刺',10,q))
        c1 = Process(target=consumer, args=('alex', q))
        c2 = Process(target=consumer, args=('wusir', q))
        p1.start()   # 开始生产
        p2.start()   # 开始生产
        c1.start()
        c2.start()
        p1.join()    # 生产者结束生产了
        p2.join()    # 生产者结束生产了
        q.put(None)  # put None 操作永远放在所有的生产者结束生产之后
        q.put(None)  # 有几个消费者 就put多少个None
    
    # 为什么队列为空 为满 这件事情不够准确
    # q.qsize()  队列的大小
    # q.full()   是否满了 满返回True
    # q.empty()  是否空了 空返回True
    进程间通信 IPC
    import  time
    from multiprocessing import JoinableQueue,Process
    
    def consumer(name,q):
        while True:
            food = q.get()
            time.sleep(1)
            print('%s消费了%s'%(name,food))
            q.task_done()
    
    def producer(name,food,num,q):
        '''生产者'''
        for i in range(num):
            time.sleep(0.3)
            foodi = food + str(i)
            print('%s生产了%s'%(name,foodi))
            q.put(foodi)
        q.join()   # 消费者消费完毕之后会结束阻塞
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer, args=('宝元', '泔水', 20, q))
        c1 = Process(target=consumer, args=('alex', q))
        c2 = Process(target=consumer, args=('wusir', q))
        c1.daemon = True
        c2.daemon = True
        p1.start()
        c1.start()
        c2.start()
        p1.join()
    
    # 消费者每消费一个数据会给队列发送一条信息
    # 当每一个数据都被消费掉之后 joinablequeue的join阻塞行为就会结束
    # 以上就是为什么我们要在生产完所有数据的时候发起一个q.join()
    
    # 随着生产者子进程的执行完毕,说明消费者的数据都消费完毕了
    # 这个时候主进程中的p1.join结束
    # 主进程的代码结束
    # 守护进程也结束了
    JoinableQueue 消费者模型
    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        #     d['count']-=1
        ''' 等价于下面的代码 '''
        lock.acquire()
        d['count'] -= 1
        lock.release()
    
    if __name__ == '__main__':
        lock=Lock()
        m = Manager()
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
    
    # Manager是一个类 内部有一些数据类型能够实现进程之间的数据共享
    # dict list这样的数据 内部的数字进行自加 自减 是会引起数据不安全的,这种情况下 需要我们手动加锁完成
    # 因此 我们一般情况下 不适用这种方式来进行进程之间的通信
    # 我们宁可使用Queue队列或者其他消息中间件 来实现消息的传递 保证数据的安全
    进程之间的数据共享 Manager类
    import time
    from multiprocessing import Process
    
    def func(i):
        i -= 1
    
    if __name__ == '__main__':
        start = time.time()  #计时开始
        l = []
        for i in range(100):
            p = Process(target=func,args=(i,))
            p.start()
            l.append(p)
        for p in l:
            p.join()
        print(time.time() - start) #计时结束
    100个进程 计算时间
    计算时间差
    import time
    from multiprocessing import Pool  #
    def func(i):
        i -= 1
    
    if __name__ == '__main__':
        start = time.time()
        p = Pool(5)  #池  你的池中打算放多少个进程,个数cpu的个数 * 1/2
        p.map(func,range(100))  # 自动带join
        print(time.time()-start)
    Pool 进程池
    from multiprocessing import Pool
    def f(i):
        i -= 1
        return i**2
    
    if __name__ == '__main__':
        p = Pool(5)  #池  你的池中打算放多少个进程,个数cpu的个数 * 1/2
        ret = p.map(f,range(100))  # 自动带join
        print(ret)
    Pool 获取程序执行的返回值
    import time
    from multiprocessing import Pool  #
    def func(i):
        i -= 1
        time.sleep(0.5)
        return i**2
    
    if __name__ == '__main__':
        p = Pool(5) #你的池中打算放多少个进程,个数cpu的个数 * 1|2
        for i in range(100):
            ret = p.apply(func,args=(i,))  # 自动带join 串行/同步 apply就是同步提交任务
            print(ret)
    apply 同步方式向进程池提交任务
    import time
    from multiprocessing import Pool  #
    def func(i):
        i -= 1
        time.sleep(0.3)
        # print(i)
        return i**2
    if __name__ == '__main__':
        p = Pool(5)
        lst = []
        for i in range(100):
            ret = p.apply_async(func,args=(i,))  # 自动带join 异步的 apply_async异步提交任务
            lst.append(ret)
        # p.close()   # 关闭进程池的任务提交 从此之后不能再向p这个池提交新的任务
        # p.join()    # 阻塞 一直到所有的任务都执行完
        # print('结束')
        for i in lst :
            print(i.get())
    apply_async 异步方式向进程池提交任务
    什么是进程池? 有限的进程的池子
    为什么要用进程池?
        任务很多 cpu个数*5个任务以上
        为了节省创建和销毁进程的时间 和 操作系统的资源
    一般进程池中进程的个数:
        cpu的1-2倍
        如果是高计算,完全没有io,那么就用cpu的个数
        随着IO操作越多,可能池中的进程个数也可以相应增加
    向进程池中提交任务的三种方式
        map    异步提交任务 简便算法 接收的参数必须是 子进程要执行的func,可迭代的(可迭代中的每一项都会作为参数被传递给子进程)
            能够传递的参数是有限的,所以比起apply_async限制性比较强
        apply  同步提交任务(你删了吧)
        apply_async 异步提交任务
            能够传递比map更丰富的参数,但是比较麻烦
            首先 apply_async提交的任务和主进程完全异步
            可以通过先close进程池,再join进程池的方式,强制主进程等待进程池中任务的完成
            也可以通过get获取返回值的方式,来等待任务的返回值
                我们不能在apply_async提交任务之后直接get获取返回值
                   for i in range(100):
                        ret = p.apply_async(func,args=(i,))  # 自动带join 异步的 apply_async异步提交任务
                        l.append(ret)
                    for ret in l:
                        print(ret.get())
    进程池总结
    回调函数
    import os
    import time
    import random
    from multiprocessing import Pool  #
    def func(i):     # [2,1,1.5,0,0.2]
        i -= 1
        time.sleep(random.uniform(0,2))
        return i**2
    
    def back_func(args):
        print(args,os.getpid())
    
    if __name__ == '__main__':
        print(os.getpid())
        p = Pool(5)
        l = []
        for i in range(100):
            ret = p.apply_async(func,args=(i,),callback=back_func)  # 5个任务
        p.close()
        p.join()
    callback回调函数
    主动执行func,然后在func执行完毕之后的返回值,直接传递给back_func作为参数,调用back_func
    处理池中任务的返回值
    回调函数是由谁执行的? 主进程
    回调函数 进程池 回调函数没有返回值
    import re
    import json
    from urllib.request import urlopen  #请求页面包
    from multiprocessing import Pool
    
    def get_page(i):  #页面
        ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read()
        ret = ret.decode('utf-8')
        return ret
    
    def parser_page(s): #页面数据分析
        com = re.compile(
            '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>d+).*?<span class="title">(?P<title>.*?)</span>'
            '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S)
    
        ret = com.finditer(s)
        with open('file','a',encoding='utf-8') as f:
            for i in ret:
                dic = {
                    "id": i.group("id"),
                    "title": i.group("title"),
                    "rating_num": i.group("rating_num"),
                    "comment_num": i.group("comment_num"),
                }
                f.write(json.dumps(dic,ensure_ascii=False)+'
    ')
    
    if __name__ == '__main__':
        p = Pool(5)
        count = 0
        for i in range(10):
            p.apply_async(get_page,args=(count,),callback=parser_page)
            count += 25
        p.close()
        p.join()
    进程池 爬虫 豆瓣 例子 处理中文
    import json
    with open('file2','w',encoding='utf-8') as f:
        json.dump({'你好':'alex'},f,ensure_ascii=False)
    ensure_ascii=False

    线程 - 

    线程
        轻量级 进程  解决并发 整体效率高于进程
        在进程中数据共享  资源共享
        是进程的一部分,不能独立存在的 
        被CPU调度的最小单位
    使用场景 socketserver web的框架 django flask tornado 多线程来接收用户并发的请求
    python里  同一个进程中的多个线程能不能同时使用多个cpu
    
    在整个程序界:
        如果你的程序需要数据隔离 : 多进程
        如果你的程序对并发的要求非常高 : 多线程
    
    python
    初期 面向单核的 一个cpu
    作为一门脚本语言  解释型语言
    
    线程锁这件事儿是由Cpython解释器完成
    对于python来说 同一时刻只能有一个线程被cpu访问
    彻底的解决了多核环境下的安全问题
    线程锁 : 全局解释器锁 GIL
        1.这个锁是锁线程的
        2.这个锁是解释器提供的
    
    多线程仍然有它的优势
        你的程序中用到cpu真的多么
        如果100% 90%的时间都消耗在计算上,那么cpython解释器下的多线程对你来说确实没用
        但是你写的大部分程序 的时间实际上都消耗在了IO操作上
    遇到高计算型
        开进程 4个进程  每个进程里开n个线程
        换个解释器
    线程概念
    import os
    import time
    from threading import Thread
    def func():
        print('start',os.getpid())
        time.sleep(1)
        print('end')
    
    if __name__ == '__main__':
        t = Thread(target=func)
        t.start()
        for i in range(5):
            print('主线程',os.getpid())
            time.sleep(0.3)
    Thread 线程类 初识
    import time
    from threading import Thread
    def func():
        n = 1 + 2 + 3
        n ** 2
    
    if __name__ == '__main__':
        start = time.time()
        lst = []
        for i in range(100):
            t = Thread(target=func)
            t.start()
            lst.append(t)
        for t in lst:
            t.join()
        print(time.time() - start)
    #
    
    import time
    from multiprocessing import Process as Thread
    def func():
        n = 1 + 2 + 3
        n**2
    
    if __name__ == '__main__':
        start = time.time()
        lst = []
        for i in range(100):
            t = Thread(target=func)
            t.start()
            lst.append(t)
        for t in lst:
            t.join()
        print(time.time() - start)
    进程与线程的效率对比
    from threading import Thread
    n = 100
    def func():
        global n
        n -= 1
    
    t = Thread(target=func)
    t.start()
    t.join()
    print(n)
    线程的数据共享
    from threading import Thread
    class Mythread(Thread):
        def __init__(self,arg):
            super().__init__()
            self.arg = arg
        def run(self):
            print('in son',self.arg)
    
    t = Mythread(123)
    t.start()
    开启线程的另一种方式 传参数
    import time
    from threading import Thread,currentThread,activeCount,enumerate
    class Mythread(Thread):
        def __init__(self,arg):
            super().__init__()
            self.arg = arg
        def run(self):
            time.sleep(1)
            print('in son',self.arg,currentThread())
    # t = Mythread(123)
    # t.start()
    # print('主',currentThread()) #当前线程
    #
    for i in range(10):
        t = Mythread(123)
        t.start()
        # print(t.ident) #当前线程id
    print(activeCount())  #几个活跃的线程  11
    print(enumerate()) # 一共几个线程 []
    threading模块中的其他功能
    - server.py - 
    import socket
    from threading import Thread
    
    def talk(conn):
        while True:
            msg = conn.recv(1024).decode()  #解码
            conn.send(msg.upper().encode())  #编码
    
    sk = socket.socket() #创建
    sk.bind(('127.0.0.1',9000)) #bind
    sk.listen() #监听
    while True:
        conn,addr = sk.accept() #接收
        Thread(target=talk,args = (conn,)).start() #创建线程并开启
    
    - client.py -
    import socket
    sk = socket.socket() #创建
    
    sk.connect(('127.0.0.1',9000)) #连接
    while True:
        sk.send(b'hello') #
        print(sk.recv(1024)) #
    多线程实现socketserver
    import time
    from threading import Thread
    
    def func():
        while True:
            print('in func')
            time.sleep(0.5)
    
    def func2():
        print('start func2')
        time.sleep(10)
        print('end func2')
    
    Thread(target=func2).start()
    t = Thread(target=func)
    t.setDaemon(True)  #线程 守护操作
    t.start()
    print('主线程')
    time.sleep(2)
    print('主线程结束')
    
    # 守护进程 只守护主进程的代码,主进程代码结束了就结束守护,守护进程在主进程之前结束
    # 守护线程 随着主线程的结束才结束,守护线程是怎么结束的  直到主子线程都结束 进程结束
    
    # 进程 terminate 强制结束一个进程的
    # 线程 没有强制结束的方法
    # 线程结束 : 线程内部的代码执行完毕 那么就自动结束了
    守护线程 线程结束问题
    import time
    from threading import Thread,currentThread
    def func():
        print(currentThread())
    
    print('开始')
    for i in range(10):
        Thread(target=func).start()
        # time.sleep(2)
    print('主线程')
    currentThread
    锁 用来保证数据安全
    有了GIL还是会出现数据不安全的现象,所以还是要用锁 
    import time
    from threading import Thread,Lock
    n = 100
    def func(lock): #
        global n  #用 全局的n
        # n -= 1
        with lock:
            tmp = n-1  # n-=1
            # time.sleep(0.1)
            n = tmp
    
    if __name__ == '__main__':
        l = []
        lock = Lock() #实例化
        for i in range(100):
            t = Thread(target=func,args=(lock,)) #创建
            t.start() #开启
            l.append(t)
        for t in l:
            t.join()
        print(n)
    锁 from threading import Thread,Lock
    import dis
    n = 1  #全局空间的 += -= 操作 都不是数据安全的
    def func():
        n = 100  #局部空间内 永远安全
        n -= 1
    
    dis.dis(func)
    
    # 会出现线程不安全的两个条件
    # 1.是全局变量
    # 2.出现 += -=这样的操作
    
    #下面是解析
    LOAD  仅有这个 数据安全
    STORE 有load store 就会不安全
    
    # 列表 字典
    # 方法 l.append l.pop l.insert dic.update 都是线程安全的
    # l[0] += 1  不安全
    # d[k] += 1  不安全
    dis 模块 cpu计算底层步骤
    # 科学家吃面问题
    import time
    from threading import Thread,Lock
    # noodle_lock = Lock()
    # fork_lock = Lock()
    # 死锁不是时刻发生的,有偶然的情况整个程序都崩了
    # 每一个线程之中不止一把锁,并且套着使用
    # 如果某一件事情需要两个资源同时出现,那么不应该将这两个资源通过两把锁控制
    # 而应看做一个资源
    #
    # def eat1(name):
    #     noodle_lock.acquire()
    #     print('%s拿到面条了'%name)
    #     fork_lock.acquire()
    #     print('%s拿到叉子了'%name)
    #     print('%s开始吃面'%name)
    #     time.sleep(0.2)
    #     fork_lock.release()
    #     print('%s放下叉子了' % name)
    #     noodle_lock.release()
    #     print('%s放下面了' % name)
    #
    # def eat2(name):
    #     fork_lock.acquire()
    #     print('%s拿到叉子了' % name)
    #     noodle_lock.acquire()
    #     print('%s拿到面条了' % name)
    #     print('%s开始吃面' % name)
    #     time.sleep(0.2)
    #     noodle_lock.release()
    #     print('%s放下面了' % name)
    #     fork_lock.release()
    #     print('%s放下叉子了' % name)
    #
    # Thread(target=eat1,args=('wei',)).start()
    # Thread(target=eat2,args=('hao',)).start()
    # Thread(target=eat1,args=('太',)).start()
    # Thread(target=eat2,args=('宝',)).start()
    
    lock = Lock()
    def eat1(name):
        lock.acquire()
        print('%s拿到面条了'%name)
        print('%s拿到叉子了'%name)
        print('%s开始吃面'%name)
        time.sleep(0.2)
        lock.release()
        print('%s放下叉子了' % name)
        print('%s放下面了' % name)
    
    def eat2(name):
        lock.acquire()
        print('%s拿到叉子了' % name)
        print('%s拿到面条了' % name)
        print('%s开始吃面' % name)
        time.sleep(0.2)
        lock.release()
        print('%s放下面了' % name)
        print('%s放下叉子了' % name)
    
    Thread(target=eat1,args=('alex',)).start()
    Thread(target=eat2,args=('wusir',)).start()
    Thread(target=eat1,args=('太白',)).start()
    Thread(target=eat2,args=('宝元',)).start()
    
    # 先临时解决 fork_lock=noodle_lock = Lock()
    # 然后再找到死锁的原因,再去修改  终极办法一把锁
    科学家吃面问题 死锁现象
    from threading import RLock,Lock,Thread
    
    # 互斥锁
    #     无论在相同的线程还是不同的线程,都只能连续acquire一次
    #     要想再acquire,必须先release
    # 递归锁
    #     在同一个线程中,可以无限次的acquire
    #     但是要想在其他线程中也acquire,
    #     必须现在自己的线程中添加和acquire次数相同的release
    rlock = RLock()  #每一次acquire都像进去一道门
    rlock.acquire()
    rlock.acquire()
    rlock.acquire()
    rlock.acquire()  #直到全都release 才能下个人进门
    print('锁不住')
    
    lock = Lock() #普通锁/互斥锁
    lock.acquire()
    print('1')  #到这里 hang住了 
    lock.acquire()
    print('2')
    递归锁 RLock 互斥锁Lock
    from threading import RLock
    rlock = RLock()
    def func(num):
        rlock.acquire() #
        print('aaaa',num)
        rlock.acquire()
        print('bbbb',num)
        rlock.release() #必须  还锁
        rlock.release() #必须
    
    Thread(target=func,args=(1,)).start()
    Thread(target=func,args=(2,)).start()
    # aaaa 1
    # bbbb 1
    # aaaa 2
    # bbbb 2
    递归锁 1
    import time
    from threading import RLock,Lock,Thread
    noodle_lock = fork_lock = RLock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到面条了'%name)
        fork_lock.acquire()
        print('%s拿到叉子了'%name)
        print('%s开始吃面'%name)
        time.sleep(0.2)
        fork_lock.release()
        print('%s放下叉子了' % name)
        noodle_lock.release()
        print('%s放下面了' % name)
    
    def eat2(name):
        fork_lock.acquire()
        print('%s拿到叉子了' % name)
        noodle_lock.acquire()
        print('%s拿到面条了' % name)
        print('%s开始吃面' % name)
        time.sleep(0.2)
        noodle_lock.release()
        print('%s放下面了' % name)
        fork_lock.release()
        print('%s放下叉子了' % name)
    
    Thread(target=eat1,args=('alex',)).start()
    Thread(target=eat2,args=('wusir',)).start()
    Thread(target=eat1,args=('太白',)).start()
    Thread(target=eat2,args=('宝元',)).start()
    递归锁 科学家吃面 会多用占用资源   建议互斥锁
    import time
    from threading import Semaphore,Thread
    
    def func(name,sem):
        sem.acquire()
        print(name,'start')
        time.sleep(1)
        print(name,'stop')
        sem.release()
    
    sem = Semaphore(5)
    for i in range(20):
        Thread(target=func,args=(i,sem)).start()
    信号量 线程 不太好 比池
    from threading import Event
    # 事件
    # wait() 阻塞 到事件内部标识为True就停止阻塞
    # 控制标识
        # set
        # clear
        # is_set
    
    # 连接数据库
    import time
    import random
    from threading import Thread,Event
    def connect_sql(e):
        count = 0
        while count < 3:
            e.wait(0.5)
            if e.is_set():
                print('连接数据库成功')
                break
            else:
                print('数据库未连接成功')
                count += 1
    
    def test(e):
        time.sleep(random.randint(0,3))
        e.set()
    
    e = Event()
    Thread(target=test,args=(e,)).start() #测试
    Thread(target=connect_sql,args=(e,)).start() #连接
    事件Event 2个事情,一个事情要确认另一个事情才能开始做
    from threading import Timer
    
    def func():
        print('执行我啦')
    
    t = Timer(3,func)
    # 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
    t.start()
    print('主线程的逻辑')
    t.join()
    print('ok ')
    定时器
    # wait      阻塞
    # notify(n) 给信号
    
    # 假如现在有20个线程
    # 所有的线程都在wait这里阻塞
    # notify(n) n传了多少
    # 那么wait这边就能获得多少个解除阻塞的通知
    
    # notifyall
    # acquire
    # release
    
    import threading
    
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" % n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition() #条件
        for i in range(10): #10个线程
            t = threading.Thread(target=run, args=(i,))
            t.start() #开启
    
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire()
            con.notify(int(inp))
    
            con.release()
            print('****')
    
    # 设置某个条件
    # 如果满足这个条件 就可以释放线程
    # 监控测试我的网速
    # 20000个任务
    # 测试我的网速 /系统资源
    # 发现系统资源有空闲,我就放行一部分任务
    条件 condition
    import queue
    #  线程队列 线程之间数据安全
    q = queue.Queue()
    # # 普通队列
    # q.put(1)
    # # print(q.get())
    # try:
    #     q.put_nowait(2)
    # except queue.Full:
    #     print('您丢失了一个数据2')
    # print(q.get_nowait()) # 如果有数据我就取,如果没数据不阻塞而是报错
    
    
    # 非阻塞的情况下
    q.put(10)
    print(q.get(timeout=2))
    #
    # # 算法里 栈
    # lfq = queue.LifoQueue()   # 栈
    # lfq.put(1)
    # lfq.put(2)
    # lfq.put(3)
    # print(lfq.get())
    # print(lfq.get())
    # print(lfq.get())
    #
    # # 优先级队列,是根据第一个值的大小来排定优先级的
    # # ascii码越小,优先级越高
    # q = queue.PriorityQueue()
    # q.put((2,'a'))
    # q.put((1,'c'))
    # q.put((1,'b'))
    #
    # print(q.get())
    
    # 线程+队列 实现生产者消费者模型
    线程队列
    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用
    
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor  #线程池
    # from concurrent.futures import ProcessPoolExecutor as Pool #进程池
    
    def func(num):
        print('in %s func'%num,currentThread())
        time.sleep(random.random())
        return num**2
    
    tp = ThreadPoolExecutor(5)  #5个线程
    ret_l = []
    for i in range(30):
        ret = tp.submit(func,i) #提交
        ret_l.append(ret)
    for ret in ret_l:  #取值
        print(ret.result())
    线程池1
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor as Pool
    import os
    def func(num):
        # print('in %s func'%num,currentThread())
        print('in %s func'%num,os.getpid())
        time.sleep(random.random())
        return num**2
    if __name__ == '__main__':
    
        # tp = ThreadPoolExecutor(5)
        tp = Pool(5)
        ret = tp.map(func,range(30))
        # print(list(ret))
        for i in ret:
            print(i)
    线程池 map 简单用法
    # 回调函数 add_done_callback
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor as Pool
    def func1(num):
        print('in func1 ',num,currentThread())
        return num*'*'
    
    def func2(ret):
        print('--->',ret.result(),currentThread())
    tp = Pool(5)
    print('主 : ',currentThread())
    for i in range(10):
        tp.submit(func1,i).add_done_callback(func2)
    # 回调函数收到的参数是需要使用result()获取的
    # 回调函数是由谁执行的? 主线程
    线程 回调函数
    import time
    import random
    from threading import currentThread
    from concurrent.futures import ThreadPoolExecutor as Pool 
    from urllib.request import urlopen
    def func(name,url):
        content = urlopen(url).read()
        return name,content
    
    def parserpage(ret):
        name,content = ret.result()
        with open(name,'wb') as f:
            f.write(content)
    
    urls = {
        # 'baidu.html':'https://www.baidu.com',
        # 'python.html':'https://www.python.org',
        # 'openstack.html':'https://www.openstack.org',
        'github.html':'https://help.github.com/',
        'sina.html':'http://www.sina.com.cn/'
    }
    
    tp = Pool(2)
    for k in urls:
        tp.submit(func,k,urls[k]).add_done_callback(parserpage)
    多线程爬虫
    线程
    锁
        为什么有了GIL之后还需要锁
            多个线程同时操作全局变量的时候
            当出现"非原子性操作",例如+= -= *= /=
        l.append(1) 原子性操作
        a += 1  a = a + 1
            tmp = a +1
            a = tmp
    死锁现象
        什么是死锁现象?
            两个以上的线程争抢同一把锁,
            其中一个线程获取到锁之后不释放
            另外的其他线程就都被锁住了
            比较容易出现问题的情况 : 两把锁套在一起用了
            死锁现象的本质 :代码逻辑问题
    递归锁
        一把锁在同一个线程中acquire多次而不被阻塞
        如果另外的线程想要使用,必须release相同的次数,
        才能释放锁给其他线程
    信号量
        控制几个线程同一时刻只能有n个线程执行某一段代码
        锁 + 计数器
    事件
        两件事情
        一件事情要想执行依赖于另一个任务的结果
    条件
        n个线程在某处阻塞
        由另一个线程控制这n个线程中有多少个线程能继续执行
    定时器
        规定某一个线程在开启之后的n秒之后执行
    队列栈优先级队列
        import queue
        线程之间数据安全
        多个线程get不可能同时取走一个数据,导致数据的重复获取
        多个线程put也不可能同时存入一个数据,导致数据的丢失
        队列 先进先出
        栈   先进后出
        优先级 优先级高的先出
    线程池
        concurrent.futrues
        ThreadPoolExcuter
        ProcessPoolExcuter
    submit 异步提交任务
    shutdown 等待池内任务完成
    result  获取进程函数的返回值
    map     异步提交任务的简便用法
    add_done_callback 回调函数
        进程 主进程执行
        线程
    线程总结

    协程 - 

    进程 计算机中最小的资源分配单位
    线程 计算机中能被CPU调度的最小单位
    线程是由操作系统创建的,开启和销毁仍然占用一些时间
    调度
        1.一条线程陷入阻塞之后,这一整条线程就不能再做其他事情了
        2.开启和销毁多条线程以及cpu在多条线程之间切换仍然依赖操作系统
    你了解协程 ?
        了解
        协程(纤程,轻型线程)
        对于操作系统来说协程是不可见的,不需要操作系统调度
        协程是程序级别的操作单位
    协程效率高不高
        和操作系统本身没有关系,和线程也没有关系
        而是看程序的调度是否合理
    协程指的只是在同一条线程上能够互相切换的多个任务
    遇到io就切换实际上是我们利用协程提高线程工作效率的一种方式
    协程 只和程序相关 代码级别的
    # 切换 + 状态保存  yield
    import time
    def consumer(res):
        '''任务1:接收数据,处理数据'''
        pass
    
    def producer():
        '''任务2:生产数据'''
        res=[]
        for i in range(100000):  #1亿次
            res.append(i)
        return res
    #
    start=time.time()
    res=producer()
    consumer(res) # 写成consumer(producer())会降低执行效率
    stop=time.time()
    print(stop-start)
    
    
    import time
    def consumer():
        while True:
            res = yield
    
    def producer():
        g = consumer()
        next(g)
        for i in range(100000):
            g.send(i)
    start =time.time()
    producer()
    print(time.time() - start)
    
    # yield这种切换 就已经在一个线程中出现了多个任务,这多个任务之前的切换 本质上就是协程,consumer是一个协程,producer也是一个协程
    # 单纯的切换还会消耗时间
    # 但是如果能够在阻塞的时候切换,并且多个程序的阻塞时间共享,协程能够非常大限度的提高效率
    原生协程
    协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的
    # greenlet  协程模块 在多个任务之间来回切换
    # gevent 基于greenlet实现的,多个任务交给gevent管理,遇到IO就使用greenlet进行切换
    
    import time
    from greenlet import greenlet
    
    def play():
        print('start play')
        g2.switch()  # 开关
        time.sleep(1)
        print('end play')
    def sleep():
        print('start sleep')
        time.sleep(1)
        print('end sleep')
        g1.switch()
    g1 = greenlet(play)
    g2 = greenlet(sleep)
    g1.switch()  # 开关
    代码 协程
    import time
    import gevent
    def play():   # 协程1
        print(time.time())
        print('start play')
        gevent.sleep(1)
        print('end play')
    def sleep():  # 协程2
        print('start sleep')
        print('end sleep')
        print(time.time())
    
    g1 = gevent.spawn(play)
    g2 = gevent.spawn(sleep)
    # g1.join()
    # g2.join()  # 精准的控制协程任务,一定是执行完毕之后join立即结束阻塞
    gevent.joinall([g1,g2])
    greelet 
    from gevent import monkey;monkey.patch_all()
    import time
    import gevent
    url_lst = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/']
    
    def get_page(url):
        ret = urlopen(url).read()
        return ret.decode('utf-8')
    start = time.time()
    g_l = []
    for url in url_lst:
        g = gevent.spawn(get_page,url)
        g_l.append(g)
    #
    gevent.joinall(g_l)
    print(time.time()-start)
    协程 爬虫
    - server.py -
    from gevent import monkey;monkey.patch_all()
    import socket
    import gevent
    
    def talk(conn):
        while True:
            msg = conn.recv(1024).decode()
            conn.send(msg.upper().encode())
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',9000))
    sk.listen()
    
    while True:
        conn,addr = sk.accept()
        gevent.spawn(talk,conn)
    
    - client.py - 
    import socket
    import threading
    def task():
        sk = socket.socket()
        sk.connect(('127.0.0.1',9000))
        while True:
            sk.send(b'hello')
            print(sk.recv(1024))
    
    for i in range(500):
        threading.Thread(target=task).start()
    协程 socket 并发 
    协程
    一条线程在多个任务之间相互切换
    数据安全的
    不能利用多核
    能够规避一个线程上的IO阻塞
    
    一条线程能够起500个协程
    4c的机器
    5个进程
    每一个进程20个线程
    每一个线程500个协程
    5*20*500 = 50000
    协程总结 一条线程上的 规避IO
  • 相关阅读:
    android 54 播放音视频
    android 53 ContentProvider内容提供者
    android 52 粘滞广播
    android 51 有序广播
    android 50 进程优先级
    android 49 广播接收者中启动其他组件
    android 48 广播
    android 47 service绑定
    swap
    GetBulkRequest PDU的应用
  • 原文地址:https://www.cnblogs.com/zhangchen-sx/p/11019951.html
Copyright © 2011-2022 走看看