zoukankan      html  css  js  c++  java
  • 网络编程并发 多进程 进程池,互斥锁,信号量,IO模型

    进程:程序正在执行的过程,就是一个正在执行的任务,而负责执行任务的就是cpu

    操作系统:操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。

    操作系统的作用:

    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口

    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序。

    多道技术产生的背景:针对单核,实现并发。

    多道技术:多道技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用。,

    时间复用:当一个程序在等待I/O时,另一个程序可以使用cpu。提高CPU的利用率。

    空间复用:同一时间内存可以缓存更多的程序。

    进程

    主进程和子进程的关系

    关于资源:子进程得到的是除了代码段是与父进程共享的意外,其他所有的都是得到父进程的一个副本,子进程的所有资源都继承父进程,得到父进程资源的副本,既然为副本,也就是说,二者并不共享地址空间。,两个是单独的进程,继承了以后二者就没有什么关联了,子进程单独运行。(采用写时复制技术)
    关于文件描述符:继承父进程的文件描述符时,相当于调用了dup函数,父子进程共享文件表项,即共同操作同一个文件,一个进程修改了文件,另一个进程也知道此文件被修改了。

    子进程回收机制:进程由谁开,由谁回收,等子程序完成后,才由父进程回收

    创建进程的方法

    方法一:利用subprocess 模块,具体方法见网络编程模块。subprocess模块有很大的局限性。) 我们总是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。2) 进程间只通过管道进行文本交流。以上限制了我们将subprocess包应用到更广泛的多进程任务。

    方法二:利用multiprocessing模块,这个模块叫多进行模块管理包,multiprocessing中的Process中的类可以创建一个子进程。

    multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
    一般需要指定的参数为target,表示为一个函数,这个函数必须是可调用的,
    args表示为target的参数,是一个元组的格式。注意在只有一个参数的时候,必须写上逗号,表示为一个tuple
    kwargs表示为target的参数,是一个字典的格式。

    注意:在Windows系统中开子进程的代码必须要写在if __name__="__main__" 下。mac笔记本不需要因为这两种系统开启子进程的方式不一样

    范例

    from multiprocessing import Process
    import time
    def task(name):
        print("%s is runing"%name)
        time.sleep(2)
        print("%s is done"%name)
    if __name__=="__main__":
        p=Process(target=task,args=("egon",))
        p.start()#只是给操作系统发了个信号,让操作系统开进程(同时开辟一块内存空间,拷贝父进程的地址空间)
        print("你好,中国")

    结果:

    你好,中国
    egon is runing
    egon is done

    子进程回收机制:进程由谁开,由谁回收,等子程序完成后,才由父进程回收

    p.start()#只是给操作系统发了个信号,让操作系统开一个进程(同时开辟一块内存空间,拷贝父进程的地址空间),同时我们还要知道start方法是一个异步非阻塞,
    因为当执行start方法的时候,并不会阻塞当前进程去等待子进程执行完,
    而是会继续执行主进程的代码 p.join() :#阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。也就是说主进程等子进程完了,自己再进行进程。join()方法是一个同步阻塞 p.pid #子进程id p.terminate() #强制结束进程,这个方法不能自己杀死自己 p.is_alive() #查看该进程是否活着 # 进程自杀 import os import signal os.kill(os.getpid(), signal.SIGKILL)

    python代码由python解释器解释运行,你启用的python进程其实是运用了python exe的进行。

    os .getppid()查看父进程的ID

    os.getpid()获取当前的进程的ID。

    方法三:

    from multiprocessing import Process
    
    
    class MyProcess(Process):
        def __init__(self, n):
            self.n = n
            super(MyProcess, self).__init__()  # 使用父类的方法才能传参
    
        def run(self):
            """一定要有这个函数"""
            print(self.n)
            print(os.getppid(), os.getpid())
    
    
    print("主进程", os.getppid(), os.getpid())
    my = MyProcess("小红")
    my.start()

    结果:

    主进程 241 6452
    小红
    6452 6454

     

    进程必须要知道的事情

     1.进程之间内存相互隔离

    举例

    from multiprocessing import Process
    n=1
    def f():
    global n
    n=13
    if __name__ == '__main__':
    p =Process(target=f)
    p.start()
    p.join()
    print(n)

    结果:

     1

    结论:子进程不能修改主进程的变量,用global的作用就是看是否会修改全局变量,因为在单进程中可以修改。

    2.子进程用主进程的值来作为自己的初始值。

    n=1
    def f():
        print(n)
    if __name__ == '__main__':
    p =Process(target=f)
    p.start()

    结果:

    1
    1

    3.不能获得子进程的返回值,因为进程之间是隔离的 

    创建并发进程服务器

    进程之间的通信(IPC)

    由于我们知道进程之间内存是隔离的,但是可以通过别的方法实现进程间的通信

    大体有两个方面:

    1. 基于文件 适合于同一台机器上多进程进行通信.基于socket的文件级别的通信
    2. 基于网络 消息中间件(redis,rabbitmq,memcach)
    以下是基于文件的实现的进程之间的通信
    1.消息队列 from multiprocessing import Queue ,一个进程向队列里放东西,另一个进程向队列中拿东西
      特点:
    1. 数据安全.因为管道加锁
    2. 可以实现进程之间的通信
    3. 先进先出

    实现原理:管道+加锁

    2.管道是一种半双工的通信方式,管道是指用于连接一个读进程和一个写进程以实现她们之间通信的一个文件。写进程将数据以字符流的形式放入管道,读进程则从管道中接收数据。要实现双向通信必须要创建两个管道.数据不安全,需要加锁
     

    第一: 消息队列

    在进程中引用队列的方式:  

    from multiprocessing import Queue

    在线程中引用队列的方式:

    import queue

     举例子:

    from multiprocessing import Process,Queue
    import os,time,random
    def write(q):
        """
        一个进程往队列中放东西
        :param q: 
        :return: 
        """
        for value in ["A","B","C"]:
            print('put %s to queue...'%value)
            q.put(value)#往里面放东西
            time.sleep(random.random())
    def read(q):
        """
        另一个进程从队列中拿数据
        :param q: 
        :return: 
        """
        while True:
            if not q.empty():
                value=q.get(True)#从里面拿东西
                print('GET %s from queue'%value)
                time.sleep(random.random())
            else:
                break
    if __name__ == '__main__':
        q=Queue()#可以不设置大小,受限于内存的大小
        pw=Process(target=write,args=(q,))
        pr=Process(target=read,args=(q,))
        pw.start()
        pw.join()
        pr.start()
        pr.join()
        print('所有的数据都拿完了')

    结果:

    put A to queue...
    put B to queue...
    put C to queue...
    GET A from queue
    GET B from queue
    GET C from queue
    所有的数据都完了
    View Code

    2管道

    import multiprocessing
    
    
    def processFun(conn, name):
        print(multiprocessing.current_process().pid, "进程发送数据:", name)
        conn.send(name)
    
    
    if __name__ == '__main__':
        # 创建一个管道,一个管道有两个口,一个发一个收
        conn1, conn2 = multiprocessing.Pipe()
        # 创建子进程
        process = multiprocessing.Process(target=processFun, args=(conn1, "http://c.biancheng.net/python/"))
        # 启动子进程
        process.start()
        process.join()
        print(multiprocessing.current_process().pid, "接收数据:")
        print(conn2.recv())

    结果:

    7840 进程发送数据: http://c.biancheng.net/python/
    7839 接收数据:
    http://c.biancheng.net/python/

    详情见:

    https://www.cnblogs.com/Nicholas0707/p/10787945.html

    进程间数据共享

    共享内存,正常情况下,每个进程都拥有自己的内存空间,因此进程间的内存是无法共享的它。在内存中划出了一块共享内存取,进程可以通过对该共享区的读或写交换信息,实现通信
    允许多个进程访问相同的内存,一个进程改变其中的数据后,其他的进程都可以看到数据的变化.python中提供了两种方法,也就是multiprocessiing中的两个类 array和Manger
    代价就是运行时间慢,数据不安全,需要加锁

    第二种共享内存

    from multiprocessing import Manager, Lock, Process
    
    
    def change_dict(dic, lock):
        with lock:  # 加锁,时间慢
            dic["count"] -= 1
    
    
    if __name__ == '__main__':
        m = Manager()
        lock = Lock()
        dic = m.dict({"count": 100})
        p_l = []
        for i in range(100):
            p = Process(target=change_dict, args=(dic, lock))
            p.start()
            p_l.append(p)
    
        for k in p_l: k.join() #为了使所有的进程运行完
        print(dic['count'])

    结果:

     0

    守护进程 

    见:https://www.cnblogs.com/sticker0726/p/7943412.html

    僵死进程和孤儿进程

    出现的背景:

    在linux系统中,当用ps命令观察进程的执行状态时,经常看到某些进程的状态栏为defunct,这就是所谓的“僵尸”进程。“僵尸”进程是一个早已死亡的进程,但在进程表(processs table)中仍占了一个位置(slot)。由于进程表的容量是有限的,所以,defunct进程不仅占用系统的内存资源,影响系统的性能,而且如果其数目太多,还会导致系统瘫痪。

     好的博客:

    https://blog.csdn.net/LEON1741/article/details/78142269

    出现的理论原因:

    僵尸进程和孤儿进程

    进程和现实与众不同的是,进程的世界通常是“白发人送黑发人“,父进程在调用子进程之后,通常会在子进程结束之后进行一些后续处理。

    但我们知道,父进程的运行和子进程的结束通常是不可能同时进行的,那父进程也不可能知道子进程是如何结束的,那么,父进程如何为子进程”收尸“呢。

    原来每个进程在结束自己之前通常会调用exit()命令,资源即使早就全部释放了,但进程号,运行时间,退出状态却会因此命令而保留,等到父进程调用了waitpid()时,才会释放这些内容。如果父进程不调用waitpid(),则子进程的信息永远不会释放,这就是所谓的僵尸进程

    除非,父进程在子进程exit之前就已经关闭,子进程便不会变为僵尸进程,这是因为,每次一个进程结束时,系统都会自动扫描一下这个进程的子进程,如果这个进程有子进程,此时这些子进程被称作孤儿进程,便会把这些进程转交给init接管。这些子进程结束后,自然init作为”继父“进程会以某种机制waitpid()(收尸)的。



    作者:姜乐衣
    链接:https://www.jianshu.com/p/d0a3b4f65b1a
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    在项目中我使用了下面这个语句来杀死进程,后来发现杀死的进程会成为僵死进程

    os.kill(os.getpid(), signal.SIGKILL)

    当时看了下边的这个文章,使用了第二种方法,解决了问题

    https://blog.csdn.net/god_yutaixin/article/details/103444586 

    在主进程中添加这句代码

    # 父进程忽略子进程退出信号,让操作系统来处理,父进程不会阻塞
    # signal.signal(signal.SIGCHLD, signal.SIG_IGN)

    进程池

    开多进程是为了并发,通常有几个cpu核心就开几个进程,但是进程开多了会影响效率,主要体现在切换的开销,所以引入进程池限制同时开进程的数量。

    注意:比如说你有10个任务,pool(4)表示:我只开4个进程,10个任务由这4个进程执行,而不是开10个不同的进程,这点要注意

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

    有两个可以实现进程池

    1.from multiprocessing import Pool     这个不推荐用,不好用,太麻烦了,在这里只作为了解

    2.from concurrent.futures import ProcessPoolExecutor   推荐使用这个

    from multiprocessing import Pool
    import time,os,random
    
    def long_time_task(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        time.sleep(random.random() * 3)
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    if __name__ == '__main__':
        print('parent process %s...'%os.getpid())
        p=Pool(4) #Pool的默认大小时CPU的核数
        for i in range(12):
            p.apply_async(long_time_task,args=(i,))
        print('等待所有子进程运行')
        p.close() #线程池用完后别忘记关闭
        p.join()
        print('所有的进程都已经运行完毕')

    结果:

    parent process 11448...
    等待所有子进程运行
    Run task 0 (6368)...
    Run task 1 (7564)...
    Run task 2 (10244)...
    Run task 3 (18544)...
    Task 2 runs 0.19 seconds.
    Run task 4 (10244)...
    Task 1 runs 0.66 seconds.
    Run task 5 (7564)...
    Task 0 runs 0.82 seconds.
    Run task 6 (6368)...
    Task 4 runs 1.48 seconds.
    Run task 7 (10244)...
    Task 3 runs 2.37 seconds.
    Run task 8 (18544)...
    Task 6 runs 1.98 seconds.
    Run task 9 (6368)...
    Task 5 runs 2.24 seconds.
    Run task 10 (7564)...
    Task 9 runs 0.93 seconds.
    Run task 11 (6368)...
    Task 7 runs 2.45 seconds.
    Task 10 runs 2.05 seconds.
    Task 11 runs 1.28 seconds.
    Task 8 runs 2.68 seconds.
    所有的进程都已经运行完毕
    View Code

    concurrent.futures模块

     concurrent.futures模块是python3提供的一个异步并发模块,主要用来实现多进程和多线程的异步并发.

     提交内容的方式有两种:

    同步调用:提交/调用一个任务,然后就在原地等着,等到该任务执行完毕拿到返回结果,再执行下一步代码。老版本才有,一般不用。同步提交任务会导致程序串行,也就是开多线程就没有意义了.

    异步调用:提交/调用一个任务,不在原地等着前一个任务执行完,就直接执行下一个任务。可以实现并发的效果。其实异步的前提是两个任务没有关系,下一个任务不需要上一个任务的结果,就可以用异步。

    同步调用范例:

    from concurrent.futures import ProcessPoolExecutor
    import time,random,os
    
    def piao(i):
        print('线程%s is 正在运行'%os.getpid())
        time.sleep(random.randint(1,4))
        return i
    if __name__ == '__main__':
        p=ProcessPoolExecutor(4) #创建进程池
        for i in range(10):
            a=p.submit(piao,i) #把进程放到进程池,这是同步提交任务的方式,即拿到结果后才执行下一行代码
            print('任务结果为:%s'%(a.result())) #获取返回值的方式p.result
        p.shutdown() #等所有的进程运行完了再执行主进程相当于join(),在这期间不能再添加新进程
        print('主进程%s'%os.getpid())

    结果体现了同步提交的方式,你可以发现他是顺序执行的

    进程13128 is 正在运行
    任务结果为:0
    进程6008 is 正在运行
    任务结果为:1
    进程5108 is 正在运行
    任务结果为:2
    进程8280 is 正在运行
    任务结果为:3
    进程13128 is 正在运行
    任务结果为:4
    进程6008 is 正在运行
    任务结果为:5
    进程5108 is 正在运行
    任务结果为:6
    进程8280 is 正在运行
    任务结果为:7
    进程13128 is 正在运行
    任务结果为:8
    进程6008 is 正在运行
    任务结果为:9
    主进程11900
    结果

    异步调用范例;

    from concurrent.futures import ProcessPoolExecutor
    import time,random,os
    
    def piao(i):
        print('线程%s is 正在运行'%os.getpid())
        time.sleep(random.randint(1,4))
        return i
    if __name__ == '__main__':
        p=ProcessPoolExecutor(4) #创建进程池
        objs=[]
        for i in range(10):
            print('该进程是>>>%s' % (os.getpid()))
            a=p.submit(piao,i) #把进程放到进程池,这是同步提交任务的方式,即拿到结果后才执行下一行代码
            objs.append(a)
        for obj in objs:
            print('该进程是----------%s' % (os.getpid()))
            print('任务结果为:%s' % (obj.result()))  # 获取返回值的方式p.result
        p.shutdown() #等所有的进程运行完了再执行主进程相当于join(),在这期间不能再添加新进程
        print('主进程%s'%os.getpid())
        

    结果:

    该进程是----------18908
    进程14424 is 正在运行
    进程2612 is 正在运行
    进程5076 is 正在运行
    进程17496 is 正在运行
    进程5076 is 正在运行
    进程2612 is 正在运行
    进程17496 is 正在运行
    进程14424 is 正在运行
    任务结果为:0
    该进程是----------18908
    任务结果为:1
    该进程是----------18908
    任务结果为:2
    该进程是----------18908
    任务结果为:3
    该进程是----------18908
    任务结果为:4
    进程5076 is 正在运行
    该进程是----------18908  ####这里和主进程的id一样
    进程17496 is 正在运行
    任务结果为:5
    该进程是----------18908
    任务结果为:6
    该进程是----------18908
    任务结果为:7
    该进程是----------18908
    任务结果为:8
    该进程是----------18908
    任务结果为:9
    主进程18908    
    View Code

    研究了半天终于明白了, 这个异步,其实把取结果的这步放在了主进程中,这样看起来就是异步调用了

    异步提交的回调机制

    异步调用真正的概念:  提交完任务(为该任务绑定一个回调函数)后,不用在原地等待该任务执行完拿到结果,就直接提交下一个任务.一个任务一旦执行完任务就会自动调用回调函数. 

    我们把上边异步的例子修改一下,把取结果这一步,写成一个函数get_value,当一个进程得到结果后会自动调用add_done_calkback()函数用来通知get_value函数让它取对象

    from concurrent.futures import ProcessPoolExecutor
    import time,random,os
    
    def piao(i):
        print('进程%s is 正在运行'%os.getpid())
        time.sleep(random.randint(1,4))
        return i
    def get_value(obj):
      #这里的obj就等于p.submit(piao,i)的返回值
    print('任务结果为:%s' % (obj.result())) if __name__ == '__main__': p=ProcessPoolExecutor(4) #创建进程池 for i in range(10): p.submit(piao,i).add_done_callback(get_value) #当该进程有结果了,就会自动调用回调函数,回调函数会通知get_value函数,让它取结果,注意被通知的函数的参数就是回调函数前边的结果. p.shutdown() #等所有的进程运行完了再执行主进程相当于join(),在这期间不能再添加新进程 print('主进程%s'%os.getpid())

    结果:

    进程9904 is 正在运行
    进程14028 is 正在运行
    进程19360 is 正在运行
    进程7572 is 正在运行
    进程14028 is 正在运行
    任务结果为:1
    进程7572 is 正在运行
    任务结果为:3
    进程19360 is 正在运行
    任务结果为:2
    进程9904 is 正在运行
    任务结果为:0
    进程14028 is 正在运行
    任务结果为:4
    进程7572 is 正在运行
    任务结果为:5
    任务结果为:7
    任务结果为:8
    任务结果为:6
    任务结果为:9
    主进程11352
    View Code

    互斥锁

    临界资源:一次仅允许一个进程使用的资源,比如打印机

    互斥:并发执行的多个进程由于竞争同一资源(临界资源)而产生的互相排斥的现象,

    为什么要加这把锁:

    当并发执行的进程取修改某一项任务时,其中一个线程执行时,遇到io切换到另外的近程,

    互斥锁的特点:

    1.每个线程或进程在不释放锁的情况下只能accquire()一次锁,如果想要再次accqure就会卡在原地
    2.互斥锁把并发变为串行,降低程序运行的速度

    如下代码:

    买火车票的例子

    注意在多进程中不要使用全局变量,因为进程之前的内容不共享,可以使用json文件

    json文件

    {"count": 1}
    from multiprocessing import Process, Lock
    import os, time, random
    
    
    def buy_ticket():
    
        with open('ticket_count.json', 'r') as f:
            ticket_count = json.load(f)
        if ticket_count["count"] > 0:
            print('进程%s 买到票了 ' % os.getpid())
            ticket_count['count'] -= 1
            time.sleep(random.randint(1, 2))  # 出现io切换到下一个近程中,但是这个近程还没有打印完,其它线程开始执行,这样所有的数据都会乱了.
        else:
            print('进程%s 没有票了' % os.getpid())
        with open('ticket_count.json', 'w') as f:
            json.dump(ticket_count, f)
    
    
    def read_ticket():
        """
        查票
        :return:
        """
        with open('ticket_count.json', 'r') as f:
            ticket_count = json.load(f)
            print("余票还剩%s" % ticket_count["count"])
    
    
    def check_and_buy_ticket():
        read_ticket()
    
        buy_ticket()
    
    
    if __name__ == '__main__':
            for i in range(3):
                t1 = Process(target=check_and_buy_ticket)
                t1.start()

    结果:

    进程6664 买票开始
    进程6665 买票开始
    主进程6661
    进程6666 买票开始
    进程6665 买到票了 
    进程6666 买到票了 
    进程6664 买到票了 

    命名只有一张票,为啥3人都买到了,这就是数据不安全想象

    解决方法

    加互斥锁

    注意以上方法虽然解决了数据不安全的缺点,但是运行效率变慢了,因为并发变成了串行

    仅演示互斥锁额例子

    加互斥锁的例子:

    from multiprocessing import Process, Lock
    import os, time, random
    
    
    
    def buy_ticket():
        time.sleep(random.randint(1, 2))  # 出现io切换到下一个近程中,但是这个近程还没有打印完,其它线程开始执行,这样所有的数据都会乱了.
        with open('ticket_count.json', 'r') as f:
            ticket_count = json.load(f)
        if ticket_count["count"] > 0:
            print('进程%s 买到票了 ' % os.getpid())
            ticket_count['count'] -= 1
        else:
            print('进程%s 没有票了' % os.getpid())
        with open('ticket_count.json', 'w') as f:
            json.dump(ticket_count, f)
    
    
    def read_ticket():
        """
        查票
        :return:
        """
        with open('ticket_count.json', 'r') as f:
            ticket_count = json.load(f)
            print("余票还剩%s" % ticket_count["count"])
    
    
    def check_and_buy_ticket(lock):
        read_ticket()
    
        # lock.acquire()  # 加锁 阻塞
        # buy_ticket()
        # lock.release()  # 释放锁
        with lock:    # with lock 能代替acquire()release()和自动处理
            buy_ticket()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(3):
            t1 = Process(target=check_and_buy_ticket, args=(mutex,))
            t1.start()

    结果:

    余票还剩1
    余票还剩1
    余票还剩1
    进程7144 买到票了 
    进程7145 没有票了
    进程7146 没有票了

    那么互斥锁和join有什么区别呢?

    join是真正的串行,必须等第一个任务执行完,第二个任务才能start()

    而互斥锁可以让局部代码(指的是修改共享数据的代码)串行起来

    下面我们来举例抢票的例子

    使用join

    from multiprocessing import Process,Lock
    import time ,os ,random,json
    
    def search():
        """
        查询剩余票的函数
        :return:
        """
        with open('ticket.txt',encoding='utf-8') as f:
            dic=json.load(f)
            print('进程%s 剩余票数%s'%(os.getpid(),dic['count']))
    
    def get():
        """
        抢票的函数
        :return:
        """
        with open('ticket.txt',encoding='utf-8') as read_f:
            dic=json.load(read_f)
            if dic['count']>0:
                dic['count'] -= 1
                time.sleep(random.randint(1,2))#模拟手速
                with open('ticket.txt','w',encoding='utf-8') as write_f:
                    json.dump(dic,write_f)
                    print('%s购买成功'%(os.getpid()))
            else:
                print('%s失败' % (os.getpid()))
    
    
    def task():
        search()
        get()
    
    
    if __name__ == '__main__':
        for i in range(20):
            p=Process(target=task)
            p.start()
            p.join()
    使用join

    结果:

    进程19244 剩余票数1
    19244购买成功
    进程14476 剩余票数0
    14476失败
    进程17520 剩余票数0
    17520失败
    进程18664 剩余票数0
    18664失败
    进程18164 剩余票数0
    18164失败
    进程18348 剩余票数0
    18348失败
    进程2232 剩余票数0
    2232失败
    进程15708 剩余票数0
    15708失败
    进程11236 剩余票数0
    11236失败
    进程11488 剩余票数0
    11488失败
    进程10448 剩余票数0
    10448失败
    进程3956 剩余票数0
    3956失败
    进程964 剩余票数0
    964失败
    进程11696 剩余票数0
    11696失败
    进程12732 剩余票数0
    12732失败
    进程5920 剩余票数0
    5920失败
    进程13520 剩余票数0
    13520失败
    进程16968 剩余票数0
    16968失败
    进程13592 剩余票数0
    13592失败
    进程18344 剩余票数0
    18344失败
    View Code

    问题:只要第一个查看余票不为0,就一定能抢到票,但是假设这个人查看月后,半天都没有付款,怎么?所有人都要等他.

    使用互斥锁

    from multiprocessing import Process,Lock
    import time ,os ,random,json
    
    def search():
        """
        查询剩余票的函数
        :return:
        """
        with open('ticket.txt',encoding='utf-8') as f:
            dic=json.load(f)
            print('进程%s 剩余票数%s'%(os.getpid(),dic['count']))
    
    def get():
        """
        抢票的函数
        :return:
        """
        with open('ticket.txt',encoding='utf-8') as read_f:
            dic=json.load(read_f)
            if dic['count']>0:
    
                dic['count'] -= 1
                time.sleep(random.randint(0,3))#模拟手速
                with open('ticket.txt','w',encoding='utf-8') as write_f:
                    json.dump(dic,write_f)
                    print('%s购买成功'%(os.getpid()))
    
            else:
                print('%s失败' % (os.getpid()))
    
    
    def task(mutex):
        search()
        mutex.acquire()  # 加锁的意思
        get()
        mutex.release()  # 释放的意思
    
    if __name__ == '__main__':
        mutex=Lock()
        for i in range(20):
            p=Process(target=task,args=(mutex,))
            p.start()
    使用互斥锁

    结果:

    进程10088 剩余票数1
    进程16132 剩余票数1
    10088购买成功
    16132失败
    进程884 剩余票数0
    884失败
    进程7940 剩余票数0
    7940失败
    进程14548 剩余票数0
    14548失败
    进程16104 剩余票数0
    16104失败
    进程12956 剩余票数0
    12956失败
    进程10264 剩余票数0
    10264失败
    进程19972 剩余票数0
    19972失败
    进程10740 剩余票数0
    10740失败
    进程14028 剩余票数0
    14028失败
    进程2392 剩余票数0
    2392失败
    进程11732 剩余票数0
    11732失败
    进程3468 剩余票数0
    3468失败
    进程11192 剩余票数0
    11192失败
    进程15936 剩余票数0
    15936失败
    进程5712 剩余票数0
    5712失败
    进程19732 剩余票数0
    19732失败
    进程9864 剩余票数0
    9864失败
    进程7316 剩余票数0
    7316失败
    View Code

    互斥锁关于线程

    我们在双十一的时候,要并发的购买同一商品时,如果不加锁

    代码如下:

    from threading import Thread ,Lock
    
    import time,random
    
    n = 100 #商品的总数为100件
    def task():
        global n
        temp=n
        time.sleep(0.1)
        n=temp-1
    
    if __name__ == '__main__':
        objs=[]
        for i in range(100):
            t=Thread(target=task)
            objs.append(t)
            t.start()
        for obj in objs: #添加这步的目的是为了让上边的子线程执行完,再执行主线程
            obj.join()
    
    print(n)

    结果:

    99

    应该是多少呢?应该为0,这就发生了数据安全的问题了

    所以我们这时候要加互斥锁

    from threading import Thread ,Lock #线程的互斥锁导入的方式不一样
    import time,random
    
    n = 100 #商品的总数为100件
    def task():
        global n
        with mutex: #这里是锁的简化写法和with open一样
            temp=n
            time.sleep(0.1)
            n=temp-1
    
    if __name__ == '__main__':
        mutex=Lock() #这里不用给每个线程传mutex了,因为线程之间是共享内存的
        objs=[]
        for i in range(100):
            t=Thread(target=task)
            objs.append(t)
            t.start()
        for obj in objs: #添加这步的目的是为了让上边的子线程执行完,再执行主线程
            obj.join()
    
    print(n)

    结果为:

     0

    互斥锁的简单写法

     with mutex: #这里是锁的简化写法和with open一样
            temp=n
            time.sleep(0.1)
            n=temp-1

    相当于:

        mutex.acquire()
        temp = n
        time.sleep(0.1)
        n = temp - 1
        mutex.release()

    信号量(semaphore)

    信号量:关于信号量操作系统中是这样解释的:信号量是一种数据结构,信号量的值与相应的资源的使用情况有关.

      理解的白话文这样解释: 

    1. 信号量是一个变量,控制着对公共资源或者临界区的访问。信号量维护着一个计数器,指定可同时访问资源或者进入临界区的线程数。
    2.  每次有一个线程获得信号量时,计数器-1。若计数器为0,其他线程就停止访问信号量,直到另一个线程释放信号量

    你可能会有疑问这不和进程池和线程池的概念一样吗? 有很大不一样

    和进程池的区别

    进程池:pool(4):代表最大只允许有4个进程同时运行. 由这4个进程把所有的任务都完成.

    信号量: semaphore(4) 代表最大只允许四个进程同时访问该4资源,但是可以允许存在多个进程

    总的来说:进程池控制开启进程的数量,而信号量控制进程访问某一资源的个数.

    代码实现

    信号量:

    from multiprocessing import Process,Semaphore
    import os, random,time
    #from threading import Thread,Semaphore   线程使用时信号量时
    
    def task(sem):
        with sem:
            print("线程%s正在上厕所" % os.getpid())
            time.sleep(random.randint(0, 2))
            print("线程%s上完厕所了" % os.getpid())
    
    if __name__ == '__main__':
        sem=Semaphore(4)#同一时间只允许四个进程访问该资源
        for i in range(10):
            p=Process(target=task,args=(sem,))
            p.start()

    结果:可以看到开启了10个不同的进程

    线程6104正在上厕所
    线程10908正在上厕所
    线程6104上完厕所了
    线程10908上完厕所了
    线程11220正在上厕所
    线程14176正在上厕所
    线程14176上完厕所了
    线程7152正在上厕所
    线程7152上完厕所了
    线程1520正在上厕所
    线程20264正在上厕所
    线程16440正在上厕所
    线程20264上完厕所了
    线程6444正在上厕所
    线程11220上完厕所了
    线程14668正在上厕所
    线程6444上完厕所了
    线程1520上完厕所了
    线程16440上完厕所了
    线程14668上完厕所了

    进程池的代码:

    from concurrent.futures import ProcessPoolExecutor
    import os, random,time
    
    
    def task2():
        print("线程%s正在上厕所" % os.getpid())
        time.sleep(random.randint(0, 2))
        print("线程%s上完厕所了" % os.getpid())
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor()
        for i in range(10):
            a=p.submit(task2)

    结果:

    进程8156正在上厕所
    进程4912正在上厕所
    进程1384正在上厕所
    进程14264正在上厕所
    进程14264上完厕所了
    进程14264正在上厕所
    进程8156上完厕所了
    进程8156正在上厕所
    进程8156上完厕所了
    进程8156正在上厕所
    进程8156上完厕所了
    进程8156正在上厕所
    进程4912上完厕所了
    进程4912正在上厕所
    进程1384上完厕所了
    进程1384正在上厕所
    进程1384上完厕所了
    进程14264上完厕所了
    进程8156上完厕所了
    进程4912上完厕所了
    View Code

    从结果上看出:只开了4个进程: 4912,8156,14264,1384

    死锁现象

    死锁现象也叫顶牛:各个并发线程彼此等待对方拥有的资源,且在得到对方资源前不释放自己占有的资源,就会发生死锁现象

    出现原因:

    1.有多个线程

    2.每个线程中交替使用多把锁

    from threading import Thread ,Lock,current_thread
    
    import time , random
    
    mutexA=Lock()
    mutexB=Lock()
    def f1():
            with mutexA:
                print('%s在f1拿到了A锁'%current_thread().getName())
                with mutexB:
                    print('%s在f1拿到了B锁' % current_thread().getName())
    
    
    def f2():
        with mutexB:
            print('%s在f2中拿到了B锁' % current_thread().getName())
            time.sleep(0.1)#模拟线程阻塞
            with mutexA:
                print('%s在f2拿到了A锁' % current_thread().getName())
    
    def task():
        f1()
        f2()
    if __name__ == '__main__':
        for i in range(10):
            t=Thread(target=task)
            t.start()

    结果:

    Thread-1在f1拿到了A锁
    Thread-1在f1拿到了B锁
    Thread-1在f2中拿到了B锁
    Thread-2在f1拿到了A锁

    线程1在f2中拿到了B锁,想要申请A锁,这时候A锁被线程2在f1中占有它想申请B锁,这时线程1不释放B锁,线程2不释放A锁,就会发生死锁现象.

    这时候,我们会想既然用两把锁会发生死锁,那么干脆用1把锁不就行了

    mutexA=mutexB=Lock()

    别忘了互斥锁的特点每个线程只能再不释放锁的情况下只能accquire()一次,如果想要再次accqure就会发生下面的现象

    Thread-1在f1拿到了A锁
    ....

    线程1卡在这里不动了,这是因为一个线程只能获得一次锁,不能重复获得。要解决这个问题只要能让一把锁获得多次不就行了。

    这时候来解决死锁的锁来了叫递归锁

    递归锁

    出现背景:

      1.在互斥锁中一个线程在不释放锁的情况下,无法获得另一把锁

      2.解决死锁现象

    我们分析了死锁现象,那么在python中是如何解决这个死锁现象的呢?

    ,python提供了可重入锁RLock,也叫递归锁

    递归锁特点:

    1. 该线程可以多次获得(accquire)这把锁。
    2. 直到该线程所有的锁都被释放了,别的线程才可以抢这把锁。
    3. 速度慢于互斥锁.

    递归锁本质:把多把锁变成了一把锁

    例子:假设该线程中有两把锁.A,B,只要该线程获得A锁,就相当于获得了AB锁,直到该线程所有的锁都被释放了,别的线程才可以抢AB锁

    原理:递归锁内部维护了Lock和counter变量,counter记录了accquire()的次数,当线程释放一个lock。counter就减去1,直到counter=0,也就是该线程所有的lock都被释放,别的线程才可以抢这个锁。

    from threading import Thread ,Lock,current_thread,RLock
    
    import time , random
    
    mutexA=mutexB=RLock()
    
    def f1():
            with mutexA:
                print('%s在f1拿到了A锁'%current_thread().getName())
                with mutexB:
                    print('%s在f1拿到了B锁' % current_thread().getName())
                    print('%s在f1释放了B锁' % current_thread().getName())
                print('%s在f1拿释放了A锁' % current_thread().getName())
    
    def f2():
        with mutexB:
            print('%s在f2中拿到了B锁' % current_thread().getName())
            time.sleep(0.1)
            with mutexA:
                print('%s在f2拿到了A锁' % current_thread().getName())
                print('%s在f2释放了了A锁' % current_thread().getName())
            print('%s在f2拿释放了A锁' % current_thread().getName())
    
    def task():
        f1()
        f2()
    if __name__ == '__main__':
        for i in range(10):
            t=Thread(target=task)
            t.start()

    结果:

    Thread-1在f1拿到了A锁
    Thread-1在f1拿到了B锁
    Thread-1在f1释放了B锁
    Thread-1在f1拿释放了A锁
    Thread-1在f2中拿到了B锁
    Thread-1在f2拿到了A锁
    Thread-1在f2释放了了A锁
    Thread-1在f2拿释放了A锁
    Thread-2在f1拿到了A锁
    Thread-2在f1拿到了B锁
    Thread-2在f1释放了B锁
    Thread-2在f1拿释放了A锁
    Thread-2在f2中拿到了B锁
    Thread-2在f2拿到了A锁
    Thread-2在f2释放了了A锁
    Thread-2在f2拿释放了A锁
    Thread-4在f1拿到了A锁
    Thread-4在f1拿到了B锁
    Thread-4在f1释放了B锁
    Thread-4在f1拿释放了A锁
    Thread-4在f2中拿到了B锁
    Thread-4在f2拿到了A锁
    Thread-4在f2释放了了A锁
    Thread-4在f2拿释放了A锁
    Thread-6在f1拿到了A锁
    Thread-6在f1拿到了B锁
    Thread-6在f1释放了B锁
    Thread-6在f1拿释放了A锁
    Thread-6在f2中拿到了B锁
    Thread-6在f2拿到了A锁
    Thread-6在f2释放了了A锁
    Thread-6在f2拿释放了A锁
    Thread-8在f1拿到了A锁
    Thread-8在f1拿到了B锁
    Thread-8在f1释放了B锁
    Thread-8在f1拿释放了A锁
    Thread-8在f2中拿到了B锁
    Thread-8在f2拿到了A锁
    Thread-8在f2释放了了A锁
    Thread-8在f2拿释放了A锁
    Thread-10在f1拿到了A锁
    Thread-10在f1拿到了B锁
    Thread-10在f1释放了B锁
    Thread-10在f1拿释放了A锁
    Thread-10在f2中拿到了B锁
    Thread-10在f2拿到了A锁
    Thread-10在f2释放了了A锁
    Thread-10在f2拿释放了A锁
    Thread-5在f1拿到了A锁
    Thread-5在f1拿到了B锁
    Thread-5在f1释放了B锁
    Thread-5在f1拿释放了A锁
    Thread-5在f2中拿到了B锁
    Thread-5在f2拿到了A锁
    Thread-5在f2释放了了A锁
    Thread-5在f2拿释放了A锁
    Thread-9在f1拿到了A锁
    Thread-9在f1拿到了B锁
    Thread-9在f1释放了B锁
    Thread-9在f1拿释放了A锁
    Thread-9在f2中拿到了B锁
    Thread-9在f2拿到了A锁
    Thread-9在f2释放了了A锁
    Thread-9在f2拿释放了A锁
    Thread-7在f1拿到了A锁
    Thread-7在f1拿到了B锁
    Thread-7在f1释放了B锁
    Thread-7在f1拿释放了A锁
    Thread-7在f2中拿到了B锁
    Thread-7在f2拿到了A锁
    Thread-7在f2释放了了A锁
    Thread-7在f2拿释放了A锁
    Thread-3在f1拿到了A锁
    Thread-3在f1拿到了B锁
    Thread-3在f1释放了B锁
    Thread-3在f1拿释放了A锁
    Thread-3在f2中拿到了B锁
    Thread-3在f2拿到了A锁
    Thread-3在f2释放了了A锁
    Thread-3在f2拿释放了A锁
    View Code

    我们分析下:

    线程1在f1中获得了AB锁并释放后,会出现两种情况,进程中的其他线程会抢到这把递归锁来执行f1,第二种情况是:线程1继续抢到锁来执行f2.

     IO模型

    在学io模型之前我们要清楚同步和异步,阻塞和非阻塞的概念

    阻塞和非阻塞指的是 当前线程或进程是不是被阻塞住了,
    
    异步和同步:指的是任务调用需不需要等待前一个任务完成。

    IO模型是为了能实现和gevent模块遇到io就切换的功能。

    io模型的种类

    1. 阻塞io模型
    2. 非阻塞io模型
    3. io多路复用模型
    4. 异步io模型
    5. 信号驱动io(不常用,了解)

      再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

    #1)等待数据准备 (Waiting for the data to be ready)
    #2)数据来了后,将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

        记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

    阻塞io模型

    阻塞io的特点:一旦阻塞了,任务就会卡在原地,比如我们写的服务端: 程序会在accept 和recv  这两步卡住,程序就会不动知道等到不阻塞了,才继续执行

      所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

     代码:

    import socket
    
    server=socket.socket()
    
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    print('start....')
    while True:
        conn, addr = server.accept()
        while True:
            try:
                date=conn.recv(1024)
                if not date: break  #这步是给linux准备的
                conn.send(date.upper())
            except Exception as e:
                break
        conn.close()
        server.close()
    服务端
    import  socket
    
    client=socket.socket()
    client.connect(('127.0.0.1',8081))
    
    while True:
        date = input('>>>>')
        client.send(date.encode())
        data = client.recv(1024)
        print('>>>', data.decode())
    
    client.close()
    客户端

    一个简单的解决方案:

    #在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接

       该方案的问题是:

    #开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

        改进方案:    

    #很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,
    尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

        改进后方案其实也存在着问题:

    #“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。
    所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

      对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

    非阻塞io模型

    非阻塞io: 当我们提交任务后,系统会自动返回结果,到底有没有结果,如果没有结果,线程可以去干别的事情,每过一段时间在来询问操作系统有没有结果.

    注意:非阻塞指的是io操作中的第一阶段即wait数据阶段不堵塞了, copy数据阶段还是阻塞状态

    在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

    非阻塞io实现单线程下的并发

    import socket
    
    server=socket.socket()
    
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    print('start....')
    server.setblocking(False) #默认是True, False变为非阻塞了
    conn_list=[]
    del_conn=[]
    while True:
        try:
            print(conn_list)
            conn,addr=server.accept()
            conn_list.append(conn)
        except BlockingIOError : #在非阻塞的时候,accept收不到数据会发生BlockingIOError错误
            for conn in conn_list:
                try:
                    data=conn.recv(1024)
                    conn.send(data.upper())
                except BlockingIOError :
                    continue
                except ConnectionResetError : #这步是为了关闭某个客户端的时候不发生异常
                    del_conn.append(conn) #不能在循环的时候删列表,把要删除的连接放到列表中统一删除
    
    
            for conn1 in del_conn:
                conn_list.remove(conn1)
                conn1.close()
            del_conn=[]
            continue
    server.close()
    服务端
    import  socket
    
    client=socket.socket()
    client.connect(('127.0.0.1',8081))
    
    while True:
        date = input('>>>>')
        client.send(date.encode())
        data = client.recv(1024)
        print('>>>', data.decode())
    
    client.close()
    客户端

    但是我们不推荐非io阻塞模型

    我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

        但是也难掩其缺点:

    #1. 循环调用recv(),会一直调用操作系统,将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
    #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
    白话应该应该这么说:我们可以在等待任务的时候做其他的事,但是必须要等待其他事做完后才能做我们该做的事,这样影响了,我们的执行效率

        此外,在这个方案中我们多次使用try 来捕捉异常来检测到底有没有返回结果没有结果则返回异常,我们只能手动检测一个套接字,不能检测多个套接字,实际上python提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否有数据产生.

    io多路复用(同步io)模型

    I/O多路复用,I/O就是指的我们网络I/O,多路指多个TCP连接(或多个Channel),复用指复用一个介质。串起来理解就是io多路复用就是:多个请求用一个介质来处理io阻塞问题.其实这个介质就是(select/epoll).

    io多路复用的是用来检测多个套接字是否有变化以便不同的变化做出不同的处理. 我认为比较好的解释:IO多路复用是指内核一旦发现线程指定的一个或者多个IO条件准备读取,它就通知该进程。

    select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间

    IO多路复用优点:

    与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

    它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
        这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

    调:

        1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

        2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

        结论: select的优势在于可以处理多个连接,不适用于单个连接  

     该模型的优点:

    #相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

        该模型的缺点:

    #首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,
    如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,
    所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。 #其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

     单线程下 使用select实现并发:

    import socket
    import select
    
    server=socket.socket()
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    print('start....')
    server.setblocking(False) #默认是True, False变为非阻塞了
    reads_l=[server,] #监听列表
    del_l=[]#删除列表
    while True:
       r_l,_,_= select.select(reads_l,[],[])#rlist 检测套接字,返回值是三个列表组成的元组
       # print('r_l',r_l)
       for obj in r_l: #由于r_L是个列表,所以要for 循环取值
           if obj==server: #因为我们要检测多个套接字,所以要做判断
               coon,addr=obj.accept()
               reads_l.append(coon)
               print(addr)
           else:
                   date = obj.recv(1024)
                   obj.send(date.upper())
    
    server.close()
    服务端

    客户端:

    import  socket
    
    client=socket.socket()
    client.connect(('127.0.0.1',8081))
    
    while True:
        date = input('>>>>')
        client.send(date.encode())
        data = client.recv(1024)
        print('>>>', data.decode())
    
    client.close()
    客户端

     epoll poll select 的区别?

    select,poll,epoll都是i/o多路复用的具体实现方式
    ###select 
        select会修改传入的参数的数组,这个对于一个需要调用很多次的函数是非常不友好的.
        select只能监视1024个连接
        select 当任何一个socket对象出现了数据,select不会告诉你哪个socket有数据,需要你for循环来问,效率低
        select不是线程安全,
     #线程安全就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致或者数据污染
    ###poll修复了select的很多问题
        1.去掉了1024个连接的限制
        2.poll不再修改传入的参数的数组
        3.但是还不是线程安全的
        4.不能准确的知道哪个select有数据,效率低
    ###epoll 修复了poll和select绝大多数的问题
        1.线程安全
        2.epoll可以准确的告诉你哪个socket里有数据,具体实现方式:当哪个socket有数据时会通知你,你不用for 循环来一一个问效率高

    selectors

    该模块会自动根据当前的操作系统来选择合适的io多路复用模型,比如linux就用epoll

    #服务端
    from socket import *
    import selectors
    
    sel=selectors.DefaultSelector()
    def accept(server_fileobj,mask):
        conn,addr=server_fileobj.accept()
        sel.register(conn,selectors.EVENT_READ,read)
    
    def read(conn,mask):
        try:
            data=conn.recv(1024)
            if not data:
                print('closing',conn)
                sel.unregister(conn)
                conn.close()
                return
            conn.send(data.upper()+b'_SB')
        except Exception:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    
    
    server_fileobj=socket(AF_INET,SOCK_STREAM)
    server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server_fileobj.bind(('127.0.0.1',8088))
    server_fileobj.listen(5)
    server_fileobj.setblocking(False) #设置socket的接口为非阻塞
    sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
    
    while True:
        events=sel.select() #检测所有的fileobj,是否有完成wait data的
        for sel_obj,mask in events:
            callback=sel_obj.data #callback=accpet
            callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
    
    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8088))
    
    while True:
        msg=input('>>: ')
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))
    selectors

      

    异步io模型

        用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

    这种模型的效率是最高的。

  • 相关阅读:
    模电电路分析
    正式答辩提问
    Dockerfile
    Docker常用命令
    docker镜像与容器
    Docker容器与容器数据
    docker命令自动安装
    Docker与虚拟机
    Java Lambda表达式 Stream
    Java Lambda表达式
  • 原文地址:https://www.cnblogs.com/sticker0726/p/7931573.html
Copyright © 2011-2022 走看看