zoukankan      html  css  js  c++  java
  • python-study-36

    python并发编程之协程

    1、协程:
        单线程实现并发
        在应用程序里控制多个任务的切换+保存状态
        优点:
            应用程序级别速度要远远高于操作系统的切换
        缺点:
            多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地
            该线程内的其他的任务都不能执行了
    
            一旦引入协程,就需要检测单线程下所有的IO行为,
            实现遇到IO就切换,少一个都不行,以为一旦一个任务阻塞了,整个线程就阻塞了,
            其他的任务即便是可以计算,但是也无法运行了
    
    2、协程的目的:
        想要在单线程下实现并发
        可利用的cpu只有一个
        并发本质=切换+保存状态
        并发指的是多个任务看起来是同时运行的
    介绍
    from gevent import monkey,spawn;monkey.patch_all()
    from threading import current_thread
    import time
    
    def eat():
        print('%s eat 1' %current_thread().name)
        time.sleep(3)
        print('%s eat 2' %current_thread().name)
    
    def play():
        print('%s play 1' %current_thread().name)
        time.sleep(1)
        print('%s play 2' %current_thread().name)
    
    g1=spawn(eat,)
    g2=spawn(play,)
    
    print(current_thread().name)
    g1.join()
    g2.join()
    
    
    
    并发套接字通信
    server:
    from gevent import spawn,monkey;monkey.patch_all()
    from socket import *
    from threading import Thread
    
    def talk(conn):
        while True:
            try:
                data=conn.recv(1024)
                if len(data) == 0:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server(ip,port,backlog=5):
        server = socket(AF_INET, SOCK_STREAM)
        server.bind((ip, port))
        server.listen(backlog)
    
        print('starting...')
        while True:
            conn, addr = server.accept()
            spawn(talk, conn,)
    
    if __name__ == '__main__':
        g=spawn(server,'127.0.0.1',8080)
        g.join()
    
    client:
    from threading import Thread,current_thread
    from socket import *
    import os
    
    def task():
        client=socket(AF_INET,SOCK_STREAM)
        client.connect(('127.0.0.1',8080))
    
        while True:
            msg='%s say hello' %current_thread().name
            client.send(msg.encode('utf-8'))
            data=client.recv(1024)
            print(data.decode('utf-8'))
    
    if __name__ == '__main__':
        for i in range(500):
            t=Thread(target=task)
            t.start()
    代码演示

    python并发编程之IO模型

    1 阻塞IO(blocking IO)

    2 非阻塞IO(non-blocking IO)

    3 多路复用IO(IO multiplexing)

    4 异步IO(Asynchronous I/O)

    1 阻塞IO(blocking IO)

    '''
    网络IO:
        recvfrom:
            wait data:等待客户端产生数据——》客户端OS--》网络--》服务端操作系统缓存
            copy data:由本地操作系统缓存中的数据拷贝到应用程序的内存中
    
        send:
            copy data
    
    '''
    
    # conn.recv(1024) ==>OS
    
    
    在linux中,默认情况下所有的socket都是blocking
        当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。
    
        而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
        所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
    
        几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图
    
        ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。
    
        实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
    阻塞IO

    2 非阻塞IO(non-blocking IO)  代码演示

    Linux下,可以通过设置socket使其变为non-blocking
    
    从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
    
        也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
    
      所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
    
        但是非阻塞IO模型绝不被推荐。
    
        我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
    
        但是也难掩其缺点:
    
    #1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
    #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
        此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
    介绍
    from socket import *
    import time
    
    server = socket(AF_INET, SOCK_STREAM)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    server.setblocking(False)
    
    conn_l=[]
    while True:
        try:
            print('总连接数[%s]' % len(conn_l))
            conn,addr=server.accept()
            conn_l.append(conn)
        except BlockingIOError:
            del_l=[]
            for conn in conn_l:
                try:
                    data=conn.recv(1024)
                    if len(data) == 0:
                        del_l.append(conn)
                        continue
                    conn.send(data.upper())
                except BlockingIOError:
                    pass
                except ConnectionResetError:
                   del_l.append(conn)
    
            for conn in del_l:
                conn_l.remove(conn)
    server
    from socket import *
    import os
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    while True:
        msg='%s say hello' %os.getpid()
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    client

    上节课复习

    上节课复习:
        1、GIL全局解释器锁
            什么是?
                GIL本质就是一个把互斥锁,是将多个并发的线程对共享数据的修改
                变成“串行”
    
            为何有?
               Cpython解释器的垃圾回收机制不是线程安全的
    
    
            如何用?
                有GIL的存在,导致同一个进程内的多个线程同一时刻只能有一个运行
                即同一进程的多个线程无法实现并行=》无法利用多核优势
                但是可以实现并发的效果
    
            什么是多核优势?
                多核即多个cpu,多个cpu带来的优势是计算性能的提升,所以
    
                IO密集型:
                    多线程
                计算密集型
                    多进程
    
            GIL vs 自定义互斥锁
                GIL相当于执行权限,意思是在一个进程内的多个线程想要执行,必须
                先抢GIL,这把锁的特点是,当一个线程被剥夺走cpu的执行权限的同时会被
                解释器强行释放GIL
    
        2、进程池与线程池
            什么是?
                池:池用来限制进程/线程个数的一种机制
            为何用?
                当并发的任务数远大于计算机的承受能力,应该用池的概念
                将并发的进程数/线程数控制在计算机能承受的数目
    
            如何用?
                from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
                def task(n):
                    return n**2
                def func(future):
                    print(future.result())
    
                if __name__ == '__main__':
                    p=ProcessPoolExecutor(4)
                    p.submit(task,10).add_done_callback(func)
    
                    p.shutdown(wait=True)
                    # pool.close()
                    # pool.join()
                    print('')
    
            提交任务的两种方式:
                1、同步:提交完任务后就在原地等待,直到任务运行完毕并且拿到返回值后,才运行下一行代码
                2、异步:提交完任务(绑定一个回调函数)后不原地等待,直接运行下一行代码,等到任务运行有返回值自动触发回调的函数的运行
    
            程序的运行状态(阻塞,非阻塞)
                1、阻塞:
                    IO阻塞
                2、非阻塞:
                    运行
                    就绪
    
    今日内容:
        1、协程
        2、IO模型
            阻塞IO
            非阻塞IO
            IO多路复用
            异步IO
    View Code
  • 相关阅读:
    XCTF-crypto---转轮机加密
    [Audio processing] wav音频文件读取int和double数组的关系
    [已解决问题] An error occurred while automatically activating bundle com.android.ide.eclipse.adt
    [C++关键字] alignof & alignas 内存对齐 sizeof 占内存大小
    [C++关键字] 内置类型
    [Git] MAC上Git初探
    [基础] 各种分类器比较
    [基础] 虚函数
    [工具] Numpy
    [基础] 模板+友元类外定义
  • 原文地址:https://www.cnblogs.com/xujinjin18/p/9325821.html
Copyright © 2011-2022 走看看