zoukankan      html  css  js  c++  java
  • 并发编程

    目标:让服务端能够支持高并发+高性能

    一、 操作系统发展史
       多道技术(*****)
          产生背景:想要在单核下实现并发
          多道技术的核心:
             1、空间上的复用(具体指的是内存中同时读入多道程序,多道程序的内存空间是物理隔离)
             2、时间上的复用(复用cpu的时间)
              切换+保存状态=》并发
    
              切换:
              1、遇到IO切换(可以提升效率)
              2、运行时间过长或者有一个优先级更高的进程抢走了cpu(反而会降低效率)

     

    二、进程 (参考博客:https://www.cnblogs.com/linhaifeng/articles/7428874.html

    1、进程理论(*****1、进程与程序区别
               2、并发与并行
               并发:看起来同时运行,单核就可以实现并发,但是单核无法实现并行
               并行:真正意义上的同时运行,一个cpu同一时刻只能做一件事
                   只有多核才能同时做多件事,即并行的效果
               串行:按照固定的顺序一个个地执行
    
           3、不同操作系统开启子进程的区别
               4、一个进程的三种运行状态
    2、开启进程的两种方式(*****)
    #方式一
    from multiprocessing import Process
    import time
    
    def task(name):
        print('%s is running' %name)
        time.sleep(3)
        print('%s is done' %name)
    
    
    if __name__ == '__main__':
        # 在windows系统之上,开启子进程的操作一定要放到这下面
        # Process(target=task,kwargs={'name':'egon'})
        p=Process(target=task,args=('egon',))
        p.start() # 向操作系统发送请求,操作系统会申请内存空间,然后把父进程的数据拷贝给子进程,作为子进程的初始状态
        print('======主')
    
    #打印结果
    # ======主
    # egon is running
    # egon is done
    
    
    #方式二
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self,name):
            super(MyProcess,self).__init__()     #继承父类功能
            self.name=name
    
        def run(self):
            print('%s is running' %self.name)
            time.sleep(3)
            print('%s is done' %self.name)
    
    
    if __name__ == '__main__':
        p=MyProcess('egon')
        p.start()
        print('')
    
    
    #打印结果
    #
    # egon is running
    # egon is done
    View Code
    了解:僵尸进程与孤儿进程(**):https://www.cnblogs.com/Anker/p/3271773.html

    3、守护进程(**)
    主进程创建守护进程
    
    

     其一:守护进程会在主进程代码执行结束后就终止

    
    

     其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    
    

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    
    

    4、互斥锁与信号量(**)

    互斥锁就将并发变成一个一个的执行,牺牲了效率保证了数据安全
    只有在多个任务修改共享的数据的时候才会考虑用互斥锁
    from multiprocessing import Process,Lock
    import time,random
    
    mutex=Lock()
    # 互斥锁:
    #强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()
    
    # 互斥锁vs join的区别一:
    # 大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序
    # 区别:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
    
    
    def task1(lock):
        lock.acquire() #
        print('task1:名字是egon')
        time.sleep(random.randint(1,3))
        print('task1:性别是male')
        time.sleep(random.randint(1,3))
        print('task1:年龄是18')
        lock.release()
    
    def task2(lock):
        lock.acquire()
        print('task2:名字是alex')
        time.sleep(random.randint(1,3))
        print('task2:性别是male')
        time.sleep(random.randint(1,3))
        print('task2:年龄是78')
        lock.release()
    
    
    def task3(lock):
        lock.acquire()
        print('task3:名字是lxx')
        time.sleep(random.randint(1,3))
        print('task3:性别是female')
        time.sleep(random.randint(1,3))
        print('task3:年龄是30')
        lock.release()
    
    
    if __name__ == '__main__':
        p1=Process(target=task1,args=(mutex,))
        p2=Process(target=task2,args=(mutex,))
        p3=Process(target=task3,args=(mutex,))
    
        # p1.start()
        # p1.join()
        # p2.start()
        # p2.join()
        # p3.start()
        # p3.join()
    
        p1.start()
        p2.start()
        p3.start()
    
    
    #打印结果
    task1:名字是egon
    task1:性别是male
    task1:年龄是18
    task2:名字是alex
    task2:性别是male
    task2:年龄是78
    task3:名字是lxx
    task3:性别是female
    task3:年龄是30
    互斥锁
    5、IPC机制:队列,管道(*)
    进程之间通信必须找到一种介质,该介质必须满足
      1、是所有进程共享的
      2、必须是内存空间
      附加:帮我们自动处理好锁的问题
     
    from multiprocessing import Queue
    
    q=Queue(3)
    
    #队列:
    #1、是内存空间
    #2、自动处理锁的问题
    
    #3、队列是先进先出,可以放任意的python数据类型
    #4、队列中不应该存放很大的数据,而是一些消息级的数据
    
    q.put('first')
    q.put('sencod')
    q.put('third')
    # q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    进程队列
    6、进程queue=管道+锁 (***)
    7、生产者消费者模型
    (*****)
    1、什么是生产者消费者模型?
        模型指的是解决问题的一种套路
        该模型中包含两种重要的角色:
        生产者:生产数据的任务
        消费者:处理数据的任务
    
    2、什么时候用生产者消费者模型?
         在程序中有明显地两类任务,一类负责生产数据,另外一个类则拿到生产的数据进行处理,此时就应该考虑使用生产者消费者模型来处理这种问题
    
    3、为什么要用生产者消费者模型?好处?
        1、将生产者与消费者解开耦合
        2、平衡了生产者的生产数据的能力和消费者处理数据的能力
            原理:
                          解耦和指的是生产者不与消费者直接打交道,
                          生产者可以不停地往队里里放数据
                          消费者可以不停地从队列里取走数据进行处理
    知识点
    #生产者消费者模型
    # 该模型中包含两类重要的角色:
    #1、生产者:将负责造数据的任务比喻为生产者
    #2、消费者:接收生产者造出的数据来做进一步的处理,该类人物被比喻成消费者
    
    
    # 实现生产者消费者模型三要素
    #1、生产者
    #2、消费者
    #3、队列
    
    # 什么时候用该模型:
    #程序中出现明显的两类任务,一类任务是负责生产,另外一类任务是负责处理生产的数据的
    
    # 该模型的好处:
    # 1、实现了生产者与消费者解耦和
    # 2、平衡了生产力与消费力,即生产者可以一直不停地生产,消费者可以不停地处理,因为二者
    #不再直接沟通的,而是跟队列沟通
    
    import time
    import random
    from multiprocessing import Process,Queue
    
    def consumer(name,q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[46m消费者===》%s 吃了 %s33[0m' %(name,res))
    
    
    def producer(name,q,food):
        for i in range(5):
            time.sleep(random.randint(1,2))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m生产者者===》%s 生产了 %s33[0m' %(name,res))
    
    
    if __name__ == '__main__':
        #1、共享的盆
        q=Queue()
    
        #2、生产者们
        p1=Process(target=producer,args=('egon',q,'包子'))
        p2=Process(target=producer,args=('刘清政',q,'泔水'))
        p3=Process(target=producer,args=('杨军',q,'米饭'))
    
        #3、消费者们
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('梁书东',q))
    
    
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    代码

    三、线程  (参考博客:https://www.cnblogs.com/linhaifeng/articles/7428877.html

    1、线程理论(*****)
    1、开一个进程内默认就有一个线程
    2、线程vs进程
    1、同一进程内的多个线程共享进程内的资源
    2、创建线程的开销要远小于进程
    2、开启线程的两种方式(*****)
       
    #方式一
    from threading import Thread
    import time
    
    def task(name):
        print('%s is running' %name)
        time.sleep(3)
    
    if __name__ == '__main__':
        t=Thread(target=task,args=('egon',))
        # t=Process(target=task,args=('egon',))
        t.start()
        print('主线程')
    
    #打印结果
    #egon is running
    #主线程
    
    
    #方式二
    from threading import Thread
    import time
    
    class MyThread(Thread):
    
        def run(self):
            print('%s is running' %self.name)
            time.sleep(3)
    
    if __name__ == '__main__':
        t=MyThread()
        t.start()
        print('主线程')
    
    
    #打印结果
    # Thread-1 is running
    # 主线程
    开启线程的两种方式
    3、守护线程(**)
    #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕#后才能结束。  
     
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(5)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    if __name__ == '__main__':
    
        t1=Thread(target=foo)
        t2=Thread(target=bar)
    
        t1.daemon=True  #在start之前做守护线程
        t1.start()
        t2.start()
        print("main-------")
    
    #主线程在代码运行完毕并不算完,要等子线程运行完毕才算整体完毕
    #非守护的子线程全都死之后主线程才算完毕,以上t1为守护线程,t2为非守护线程
    #等t2运行完后主线程结束
    #所以打印结果为
        '''
        123
        456
        main-------
        end456
        '''
    守护线程
    4、互斥锁与信号量(**)
     
    from threading import Thread,Lock
    import time
    
    mutex=Lock()
    x=100
    
    def task():
        global x
        mutex.acquire()
        temp=x
        time.sleep(0.1)
        x=temp-1
        mutex.release()
    
    
    if __name__ == '__main__':
        start=time.time()
        t_l=[]
        for i in range(100):
            t=Thread(target=task)
            t_l.append(t)
            t.start()
        for t in t_l:
            t.join()
    
        print('',x)
        print(time.time()-start)
    
    #打印结果
    #主 0
    #10.13818883895874
    线程互斥锁
    5、GIL vs 互斥锁(*****)
    1、什么是GIL
    GIL是全局解释器锁,是加到解释器身上的,
    同一进程内的所有的线程,但凡执行,必须拿到解释器执行才能之心个,要拿到解释器必须先抢GIL
    所以GIL可以被当做执行权限

    2、GIL的影响
    GIl会限制同一进程的内的多个线程同一时间只能有一个运行,也就说说python一个进程内的多线线程
    无法实现并行的效果,即无法利用多核优势

    然后多核提供的优势是同一时刻有多个cpu参与计算,意味着计算性能地提升,也就是说我们的任务是
    计算密集型的情况下才需要考虑利用多核优势,此时应该开启python的多进程

    在我们的任务是IO密集型的情况下,再多的cpu对性能的提升也用处不大,也就说多核优势在IO密集型程序面前
    发挥的作用微乎其微,此时用python的多线程也是可以的


    3、GIL vs 互斥锁
    GIL保护的是解释器级别的数据
    本质就是一个互斥锁,然而保护不同的数据就应该用不同的互斥锁,保护我们应用程序级别的数据必须自定义互斥锁

    运行流程?
    6、死锁现象与递归锁(**)
        
    from threading import Thread,Lock,RLock
    import time
    
    mutexA=mutexB=RLock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' %self.name)
            time.sleep(0.1)
    
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    
        print('')
    View Code

          

    Thread-1 拿到了A锁
    Thread-1 拿到了B锁
    Thread-1 拿到了B锁
    主
    Thread-1 拿到了A锁
    Thread-2 拿到了A锁
    Thread-2 拿到了B锁
    Thread-2 拿到了B锁
    Thread-2 拿到了A锁
    Thread-4 拿到了A锁
    Thread-4 拿到了B锁
    Thread-4 拿到了B锁
    Thread-4 拿到了A锁
    Thread-6 拿到了A锁
    Thread-6 拿到了B锁
    Thread-6 拿到了B锁
    Thread-6 拿到了A锁
    Thread-8 拿到了A锁
    Thread-8 拿到了B锁
    Thread-8 拿到了B锁
    Thread-8 拿到了A锁
    Thread-10 拿到了A锁
    Thread-10 拿到了B锁
    Thread-10 拿到了B锁
    Thread-10 拿到了A锁
    Thread-5 拿到了A锁
    Thread-5 拿到了B锁
    Thread-5 拿到了B锁
    Thread-5 拿到了A锁
    Thread-9 拿到了A锁
    Thread-9 拿到了B锁
    Thread-9 拿到了B锁
    Thread-9 拿到了A锁
    Thread-7 拿到了A锁
    Thread-7 拿到了B锁
    Thread-7 拿到了B锁
    Thread-7 拿到了A锁
    Thread-3 拿到了A锁
    Thread-3 拿到了B锁
    Thread-3 拿到了B锁
    Thread-3 拿到了A锁
    打印结果
    7、线程queue(***)
    8、Event事件(**)

    学完多进程多线程以后,可以把套接字变为并发的效果:
     
    #服务端
    from concurrent.futures import ThreadPoolExecutor
    import socket
    
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)  # 1024 接收数据的最大限制
                if not data: break  # 针对linux系统
                conn.send(data.upper())  # 注意:收发都是以bytes为单位
            except ConnectionResetError:
                break
        conn.close()
    
    def serve_forever(ip,port,func):
        server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        # print(server)
    
        #服务端和客户端都需要有ip和port,但只有服务端才绑定ip和port
        server.bind((ip,port))
        server.listen(5) # 半连接池:限制的是请求数,而不是连接数
        tpool=ThreadPoolExecutor(3)  #造线程池
        while True:
            conn,client_addr=server.accept() # 等待客户端发来连接请求
            print(conn)
            # t=Thread(target=talk,args=(conn,))
            # t.start()
            tpool.submit(func,conn)  #向线程池提交任务(异步提交),可以不停的往线程池里提交任务,但同一时间内容有三个线程在干活
        server.close()
    
    
    if __name__ == '__main__':
        serve_forever('127.0.0.1',8080,talk)
    
    
    #客户端
    import socket
    
    client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
    client.connect(('127.0.0.1',8080))
    
    while True:
        msg=input('>>: ').strip()
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    View Code
    四、池(*****)
    为何要用池:
    操作系统无法无限开启进程或线程
    池作用是将进程或线程控制操作系统可承受的范围内
    什么时候用池:
    当并发的任务数要远超过操作系统所能承受的进程数或
    线程数的情况应该使用池对进程数或线程数加以限制

    如何用池?
    池内装的东西有两种:
    装进程:进程池
    装线程:线程池

    进程线程池的使用
    #进程池
    from concurrent.futures import ProcessPoolExecutor
    import time,os,random
    
    def task(x):
        print('%s 接客' %os.getpid())
        time.sleep(random.randint(2,5))
        return x**2
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor() # 默认开启的进程数是cpu的核数
    
        for i in range(20):
            p.submit(task,i)
    
    
    
    #线程池
    from concurrent.futures import ThreadPoolExecutor
    import time,os,random
    
    def task(x):
        print('%s 接客' %x)
        time.sleep(random.randint(2,5))
        return x**2
    
    if __name__ == '__main__':
        p=ThreadPoolExecutor(4) # 默认开启的线程数是cpu的核数*5
    
        for i in range(20):
            p.submit(task,i)    #丢任务,不会等任务的执行
    进程池与线程池
    提交的两种方式:
    同步调用
    异步调用+回调机制
    同步调用:提交完任务后,就在原地等待,直到任务运行完毕后,拿到任务的返回值,才继续执行下一行代码
    异步调用:提交完任务后,不在原地等待,直接执行下一行代码,结果?
    from concurrent.futures import ThreadPoolExecutor
    import time,os,random
    
    def task(x):
        print('%s 接客' %x)
        time.sleep(random.randint(1,3))
        return x**2
    
    if __name__ == '__main__':
        # 异步调用(等提交完全部任务以后再拿结果)
        p=ThreadPoolExecutor(4) # 默认开启的线程数是cpu的核数*5
    
        obj_l=[]
        for i in range(10):
            obj=p.submit(task,i)  #往池子里丢活,i为参数
            obj_l.append(obj)    #列表里存放的是一个个对象obj,也就是提交一个个的任务,
                                 # obj下有一个result方法,可以查看结果
    
        # p.close()
        # p.join()
        p.shutdown(wait=True)  #替代上面close和join,不容许再往线程池里提交任务,保证手里拿的任务数是准确的,然后等着,做完一个任务数目减一,直到全部干完以后此步操作才结束,然后运行其他代码
    
        print(obj_l[3].result())    #等所有任务都结束之后,调对象下面的rusult方法,查看任务四执行的结果,也就是上面的i=3,响应的i^2=9
        print('')
    异步调用代码

         

        
    0 接客
    1 接客
    2 接客
    3 接客
    4 接客
    5 接客
    6 接客
    7 接客
    8 接客
    9 接客
    9
    打印结果
    from concurrent.futures import ThreadPoolExecutor
    import time,os,random
    
    def task(x):
        print('%s 接客' %x)
        time.sleep(random.randint(1,3))
        return x**2
    
    if __name__ == '__main__':
    
        # 同步调用(提交完任务以后等结果)
        p=ThreadPoolExecutor(4) # 默认开启的线程数是cpu的核数*5
    
        for i in range(10):
            res=p.submit(task,i).result()
            print(res)
    
        print('')
    同步调用代码
        
    0 接客
    0
    1 接客
    1
    2 接客
    4
    3 接客
    9
    4 接客
    16
    5 接客
    25
    6 接客
    36
    7 接客
    49
    8 接客
    64
    9 接客
    81
    打印结果
    任务执行的三种状态:
    阻塞:遇到IO就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源
       非阻塞(就绪、运行):没有遇到IO操作,或者通过某种手段让程序即便是遇到IO操作也不会停在原地,执行其他操作,力求尽可能多的占有CPU

    五、单线程下实现并发(***)https://www.cnblogs.com/zh-xiaoyuan/p/11779595.html
    协程:在应用程序级别实现多个任务之间切换+保存状态

    高性能:
    单纯地切换,或者说么有遇到io操作也切换,反而会降低效率
    检测单线程下的IO行为,实现遇到IO立即切换到其他任务执行

    gevent :https://www.cnblogs.com/zh-xiaoyuan/p/11774298.html

    六、网络IO模型(主要掌握理论***)https://www.cnblogs.com/zh-xiaoyuan/p/11784190.html
    阻塞IO
    非阻塞IO
    IO多路复用
    异步IO
  • 相关阅读:
    IE678下,select 诡异的样式
    跟着我一步一步的搭建一个基于springcloud的微服务实例
    关于Future踩过的坑
    Apache下的SocketClient的使用
    Jaxb处理泛型,转化成xml字符串
    Linux Centos虚拟机扩容
    docker 搭建zookeeper集群和kafka集群
    sysbench 数据库性能测试工具的使用
    docker 容器技术
    自己手写实现Dubbo
  • 原文地址:https://www.cnblogs.com/zh-xiaoyuan/p/11842027.html
Copyright © 2011-2022 走看看