zoukankan      html  css  js  c++  java
  • 并发编程

    并发编程

    计算机发展史

    #早期计算机的结构为	
    	输入设备 ->人工-> 处理设备 ->人工-> 输出设备
    	穿孔卡片 ->人工-> 处理设备 ->人工-> 穿孔卡片
        
    #批处理
        脱机批处理
        	输入设备 ->人工-> 处理设备 ->人工-> 输出设备
            
    	联机批处理
    			输入设备 -> 处理设备 -> 输出设备
    
    #多道程序设计(实现并发)
    	CPU有切换、保存两种状态,可以同时处理,多程序I/O请求
        进程长时间占用CPU的情况下,CPU的利用率较高
        进程短时间占用CPU的情况下,CPU的利用率较低,因为CPU状态的切换需要时间
        #空间上的复用
        	多个程序共用一套计算机硬件
        #时间上的复用
        	切换+保存状态
            	当一个程序要进行IO操作,CPU会保存该程序的状态
            	当一个程序进行IO操作,操作系统会剥夺该程序的CPU权限
                CPU去处理别的进程,之后再处理该进程
                #当一个程序长时间占用CPU,操作系统也会剥夺该程序的CPU执行权限
        #进程调度
        	时间片轮转法+多级反馈队列
    
    #并发
    #并行
    	单核计算机不能实现并行,但是可以实现并发
        多核计算机才能实现并行
        
    #阻塞(I/O)
    	input、sleep、accept、recv
        日常生活中使用的软件通常都有阻塞态
    

    程序与进程

    img

    #程序 
    	程序就是一堆代码
    #进程
    	进程就是运行的程序
        程序创建进程,该进程不会立即进入运行态,而是先进入就绪态,等待CPU的资源分配
        
    #同一个程序可以同时创建两个进程,我们可以同时用不同的进程做不同的事
    #程序经过用户的操作,创建一个进程,这个进程有三种状态:就绪态、运行态、阻塞态,进程运行结束之后释放CPU,进程运行结束
    	就绪态:进程刚创建出来,等待使用CPU资源
        运行态:进程使用CPU进行数学运算、逻辑运算
    	阻塞态:进程请求IO操作,IO操作不使用CPU,此时进程的状态就是阻塞态
        
    #同步、异步
    	表示的是任务的提交方式(从进程角度看)
        #同步
        	任务提交之后,原地等待任务的执行,并拿到返回结果然后才走,期间不做任何事(程序层面的表现就是卡住)
        #异步
        	任务提交之后,不再原地等待返回结果,而是继续执行下一行代码(通过异步回调回得到任务执行结果)
    #阻塞、非阻塞
    	表示的是程序的运行状态
    #同步、异步与阻塞、非阻塞是两对儿概念,不能混为一谈
    	#异步非阻塞态是进程运行的理想状态
        
    #例
    #进程进入就绪态
    #运行态
    import time
    def test():
        #阻塞态
        time.sleep(3)
        #就绪态
        #运行态
        return 123
    res = test()        #同步
    print('hello')
    #程序退出
    

    创建进程的两种方式

    #windows创建进程,会将代码以模块的方式,从上往下执行一遍
    	Windows创建进程,一定要在 if __name__ == '__main__':内,否则会报错(无限循环)
    #linux创建进程,会直接将代码完完整整的拷贝一份(不会出现无限循环的现象)
    
    #创建进程就是在内存中重新开辟一块内存空间,将请求创建的代码丢进去
    	一个进程对应在内存中,就是一块独立的内存空间
        进程与进程之间数据是隔离的,无法直接交互,但是可以通过某些技术实现间接交互
    
    #创建进程方式1
    	通过Process类创建进程
    from multiprocessing import  Process
    import time
    def test(name):
        print('%s is running'%name)
        time.sleep(3)
        print('%s is over'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('syy',))      #创建一个进程对象
        p.start()       #告诉操作系统,帮你创建一个进程,异步
        print('主')		#打印与创建进程是2个进程的不同操作
        
    主				#因为创建进程的时间远大于打印时间,所以CPU先执行'打印操作',再执行'创建进程'
    syy is running
    syy is over
    
    #创建进程方式2
    	通过自定义类创建进程
    from multiprocessing import  Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            print('%s is running'%self.name)
            time.sleep(3)
            print('%s is over'%self.name)
    if __name__ == '__main__':
        p = MyProcess('syy')      #创建一个进程对象
        p.start()       		  #告诉操作系统,帮你再创建一个进程
        print('主')
        
    主
    syy is running
    syy is over
    

    join方法

    #怎么控制主进程与子进程代码的运行顺序
    	1.通过代码的执行时间控制CPU执行代码的顺序
        2.使用join方法
        	主进程等待指定的子进程运行结束,不影响其他子进程的运行
    
    from multiprocessing import  Process
    import time
    def test(name):
        print('%s is running'%name)
        time.sleep(3)
        print('%s is over'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('syy',))
        p.start()
        p.join()
        print('主')
        
    syy is running
    syy is over
    主
    

    多进程执行顺序

    #多进程执行顺序1
    	进程执行顺序随机
    from multiprocessing import  Process
    import time
    def test(name):
        print('%s is running'%name)
        time.sleep(3)
        print('%s is over'%name)
    if __name__ == '__main__':
        p1 = Process(target=test,args=('syy',))
        p2 = Process(target=test,args=('yyy',))
        p3 = Process(target=test,args=('zyy',))
        p1.start()		#仅仅是告诉操作系统帮你创建一个进程,这三个请求可以认为是同时发出的(也就是说,操作系统会随机创建进程)
        p2.start()
        p3.start()
        # p.join()
        print('主')
        
    主
    syy is running		#CPU处理这三个请求的顺序是随机的
    yyy is running
    zyy is running
    syy is over
    yyy is over
    zyy is over
    
    #多进程执行顺序2
    	进程执行顺序固定
    from multiprocessing import  Process
    import time
    def test(name,i):
        print('%s is running'%name)
        time.sleep(i)
        print('%s is over'%name)
    if __name__ == '__main__':
        for i in range(3):
            p = Process(target=test,args=('进程%s'%i,i))
            p.start()
            p.join()
        print('主')
        
    进程0 is running
    进程0 is over
    进程1 is running
    进程1 is over
    进程2 is running
    进程2 is over
    主
    
    #多进程执行顺序3
    	先执行子进程,子进程执行顺序随机
    from multiprocessing import  Process
    import time
    def test(name,i):
        print('%s is running'%name)
        time.sleep(i)
        print('%s is over'%name)
    if __name__ == '__main__':
        p_list = []
        for i in range(3):
            p = Process(target=test,args=('进程%s'%i,i))
            p.start()
            p_list.append(p)
        for p in p_list:
            p.join()
        print('主')
        
    进程0 is running
    进程0 is over
    进程1 is running
    进程2 is running
    进程1 is over
    进程2 is over
    主 
    
    #多进程代码的执行时间
    from multiprocessing import  Process
    import time
    def test(name,i):
        print('%s is running'%name)
        time.sleep(i)
        print('%s is over'%name)
    if __name__ == '__main__':
        p_list = []
        start_time = time.time()
        for i in range(3):
            p = Process(target=test,args=('进程%s'%i,i))
            p.start()
            p_list.append(p)
        for p in p_list:
            p.join()
        print('主')
        print('代码执行时间: ',time.time() - start_time)
        
    进程0 is running
    进程0 is over
    进程1 is running
    进程2 is running
    进程1 is over
    进程2 is over
    主
    代码执行时间:  2.151885509490967
    

    进程之间数据是隔离的

    #同一个程序可以运行多个进程,多个进程之间数据是隔离的
    
    from multiprocessing import  Process
    money = 100
    def test():
        global money 
        money = 999
    if __name__ == '__main__':
        p = Process(target=test)
        p.start()
        p.join()
        print(money)	#100
    

    获取进程PID的两种方式

    #windows中,通过PID进程号,找到指定的进程
    C:Users17575>tasklist | findstr 12920
    conhost.exe                  12920 Console                    4     50,076 K
    
    #使用current_process模块查看当前进程的PID
    from multiprocessing import Process,current_process
    import os
    import time
    def test(name):
        print('%s is running'%name,current_process().pid)
        time.sleep(10)
        print('%s is over'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('syy',))
        p.start()
        p.join()
        print('主',current_process().pid)
        time.sleep(10)
    
    syy is running 10624
    syy is over
    主 13628
    
    #使用tasklist命令查找进程,使用findstr命令查找进程号
    	#多进程对应着多个python解释器代码
    C:Users17575>tasklist| findstr 10624
    python.exe                   10624 Console                    4     12,424 K
    
    C:Users17575>tasklist| findstr 13628
    python.exe                   13628 Console                    4     12,552 K
    
    #使用OS模块查看当前进程的PID
    from multiprocessing import Process
    import os
    import time
    def test(name):
        print('%s is running'%name,os.getpid())
        time.sleep(10)
        print('%s is over'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('syy',))
        p.start()
        p.join()
        print('主',os.getpid())
        time.sleep(10)
        
    #使用OS模块查看父进程的PID
    from multiprocessing import Process
    import os
    import time
    def test(name):
        print('%s is running'%name,'子进程PID: %s'%os.getpid(),'父进程: %s'%os.getppid())
        time.sleep(10)
        print('%s is over'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('syy',))
        p.start()
        p.join()
        print('主','当前进程PID: %s'%os.getpid(),'当前进程的父进程: %s'%os.getppid())
        
    syy is running 子进程PID: 7556 父进程: 11036
    syy is over
    主 当前进程PID: 11036 当前进程的父进程: 12272
            
    #查看当前进程的父进程
    	当代码在pycharm中运行,当前进程是pycharm的子进程
    C:Users17575>tasklist| findstr 12272
    pycharm64.exe                12272 Console                    4  1,053,480 K
    
    	当代码在CMD中运行,当前进程的父进程是CMD
    C:Users17575>python E:python_testa.py
    syy is running 子进程PID: 19312 父进程: 10500
    syy is over
    主 当前进程PID: 10500 父进程: 9616
    C:Users17575>tasklist | findstr 9616
    cmd.exe                       9616 Console                    4      5,064 K
    

    杀死进程

    #
    from multiprocessing import Process
    import os
    import time
    def test(name):
        print('%s is running'%name)
        time.sleep(10)
        print('%s is over'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('syy',))
        p.start()
        p.terminate()            #告诉操作系统杀死子进程(此时该进程是否存活不确定)
        print(p.is_alive())     #判断进程当前是否存活
        p.join()
        print('主')
        
    True
    主
    

    僵尸进程与孤儿进程

    #僵尸进程
    	僵尸进程是一种状态,也是一种过程,即进程运行结束,所占用的资源(CPU、PID等)被回收的过程的状态
        所有的进程最后都会进入僵尸进程的状态
        
    #父进程回收子进程资源的两种方式
        1.join方法
        2.父进程正常死亡
        
    #孤儿进程
        在linux中,如果父进程意外死亡,其名下的子进程将会进入孤儿状态,成为孤儿进程
        在linux中,孤儿进程统一被进程init管理,也就是说,在linux中,所有孤儿进程的父进程都是init
        
    

    守护进程

    #守护进程
    	守护进程指的是子进程,而且是特殊的子进程,即主进程死亡,子进程也立即死亡
    
    from multiprocessing import Process
    import os
    import time
    def test(name):
        print('总管%s正常活着'%name)
        time.sleep(0.6)
        print('总管%s正常死亡'%name)
    if __name__ == '__main__':
        p = Process(target=test,args=('ji',))
        p.daemon = True     #将该子进程设置为守护进程,该设置必须放在start之前
        p.start()
        time.sleep(0.5)
        print('皇帝ji正常死亡,总管...')
    

    互斥锁

    #互斥锁
    	当多个进程操作同一份数据的时候,会造成数据的错乱
        这个时候必须加锁处理,将并发变成串行
        这样虽然降低了代码运行的效率,但是提高了数据的安全性
        #锁不要轻易使用,容易造成死锁现象
        #只在处理数据的部分加锁,不要在全局加锁
        #锁必须在主进程中创建,然后交给子进程去使用
        
    #什么时候使用互斥锁
    	当多个进程操作同一份数据的时候,必须使用互斥锁,来保证数据的安全性
    
    #多进程的并发造成的数据混乱
    from multiprocessing import Process
    import json
    import time
    #查票
    def search(i):
        with open('data','r',encoding='utf-8') as f:
            data = f.read()
        ticket_d = json.loads(data)
        print('用户%s,查询余票为: %s'%(i,ticket_d.get('ticket')))
    #买票
    def buy(i):
        with open('data', 'r', encoding='utf-8') as f:
            data = f.read()
        ticket_d = json.loads(data)
        time.sleep(3)
        if ticket_d.get('ticket') > 0:
            #票数减一
            ticket_d['ticket'] -= 1
            #更新票数
            with open('data','w',encoding='utf-8') as f:
                json.dump(ticket_d,f)
                print('用户%s,抢票成功'%i)
        else:
            print('没票了')
    def run(i):
        search(i)
        buy(i)
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=run,args=(i,))
            p.start()
            
    用户0,查询余票为: 1		#多进程同时执行代码,造成数据错乱
    用户1,查询余票为: 1
    用户3,查询余票为: 1
    用户4,查询余票为: 1
    用户2,查询余票为: 1
    用户5,查询余票为: 1
    用户6,查询余票为: 1
    用户7,查询余票为: 1
    用户8,查询余票为: 1
    用户9,查询余票为: 1
    用户0,抢票成功
    用户1,抢票成功
    用户4,抢票成功
    用户3,抢票成功
    用户2,抢票成功
    用户5,抢票成功
    用户6,抢票成功
    用户7,抢票成功
    用户8,抢票成功
    用户9,抢票成功
    
    #互斥锁
    from multiprocessing import Process,Lock
    import json
    import time
    #查票
    def search(i):
        with open('data','r',encoding='utf-8') as f:
            data = f.read()
        ticket_d = json.loads(data)
        print('用户%s,查询余票为: %s'%(i,ticket_d.get('ticket')))
    #买票
    def buy(i):
        with open('data', 'r', encoding='utf-8') as f:
            data = f.read()
        ticket_d = json.loads(data)
        time.sleep(1)
        if ticket_d.get('ticket') > 0:
            #票数减一
            ticket_d['ticket'] -= 1
            #更新票数
            with open('data','w',encoding='utf-8') as f:
                json.dump(ticket_d,f)
                print('用户%s,抢票成功'%i)
        else:
            print('没票了')
    def run(i,mutex):
        search(i)
        mutex.acquire()     #抢锁,只要有一个进程抢到了锁,其他进程必须等待这个进程释放锁,然后再抢锁
        buy(i)
        mutex.release()     #释放锁
    if __name__ == '__main__':
        mutex = Lock()      #生成一把锁
        for i in range(10):
            p = Process(target=run,args=(i,mutex))
            p.start()
            
    用户0,查询余票为: 1
    用户1,查询余票为: 1
    用户3,查询余票为: 1
    用户2,查询余票为: 1
    用户5,查询余票为: 1
    用户6,查询余票为: 1
    用户4,查询余票为: 1
    用户7,查询余票为: 1
    用户9,查询余票为: 1
    用户8,查询余票为: 1
    用户0,抢票成功
    没票了
    没票了
    没票了
    没票了
    没票了
    没票了
    没票了
    没票了
    没票了
    

    API

    百度api参考

    科大讯飞api参考

    图灵机器人参考

    API的使用

    队列

    #队列
    	队列就是管道+锁,队列中的数据只有一份,无法重复获取同一份数据
        队列解决了供需不平衡的问题
        先进先出
    #堆栈
    	先进后出
        
    #队列代码
    	full()、get_nowait()、empty()都不适用于多进程的情况,只能在单进程中使用
    from multiprocessing import Queue
    q = Queue(5)    #生成一个队列对象。括号内可以传参数,表示这个队列的最大存储数,默认2147483647
    #向队列中添加值
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    print(q.full())
    q.put(5)
    print(q.full())   #判断队列是否存满
    # q.put(6)        #当队列满了之后,再存入数据的话,不会报错,会一直等待(阻塞),直到队列中有数据取出
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty())
    print(q.get())
    print(q.empty())	 #判断队列是否为空
    # print(q.get())     #当队列中的数据被取完之后,如果再取的话,程序会一直等待,直到队列中有新的数据存入
    print(q.get_nowait())   #取值,如果队列中没有值的话,程序不会等待,直接报错,有值的话就正常取出
    

    进程间通信 (IPC机制)

    #使用队列来实现两个进程之间通信的两种方式
    
    #子进程放数据,主进程获取数据
    from multiprocessing import Queue,Process
    def producer(q):
        q.put('hello baby')
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=(q,))
        p.start()
        print(q.get())
        
    #两个子进程相互放、取数据
    from multiprocessing import Queue,Process
    def producer(q):
        q.put('hello baby')
    def consumer(q):
        print(q.get())
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=(q,))
        p.start()
        c = Process(target=consumer,args=(q,))
        c.start()
    

    生产者、消费者模型

    #生产者
    	生产/制造数据的
    #消费者
    	消费/处理数据的
        
    #使用队列来实现生产者、消费者之间的数据通信
        
    #生产者、消费者模型1
    	有几个消费者就要加几个None
    from multiprocessing import Process,Queue
    import random
    import time
    def producer(name,food,q):
        for i in range(1,11):
            data = '%s生产了第%s份%s'%(name,i,food)
            time.sleep(random.random())
            q.put(data)
            print(data)
    def consumer(name,q):
        while True:
            data = q.get()
            if data == None:break
            print('%s吃了"%s"'%(name,data))
            time.sleep(random.random())
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=('大厨syy','满汉全席',q))
        p2 = Process(target=producer,args=('跟班ji','小青菜',q))
        c1 = Process(target=consumer,args=('灰太狼',q))
        c2 = Process(target=consumer,args=('蕉太狼',q))
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        q.put(None)
        q.put(None)
        
    #生产者、消费者模型2
    from multiprocessing import Process,Queue,JoinableQueue
    import random
    import time
    def producer(name,food,q):
        for i in range(1,11):
            data = '%s生产了第%s份%s'%(name,i,food)
            time.sleep(random.random())
            q.put(data)
            print(data)
    def consumer(name,q):
        while True:
            data = q.get()
            if data == None:break
            print('%s吃了"%s"'%(name,data))
            time.sleep(random.random())
            q.task_done()   #告诉队列,你已经从队列中取出一个数据,并且处理完毕了
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer,args=('大厨syy','满汉全席',q))
        p2 = Process(target=producer,args=('跟班ji','小青菜',q))
        c1 = Process(target=consumer,args=('灰太狼',q))
        c2 = Process(target=consumer,args=('蕉太狼',q))
        p1.start()
        p2.start()
        c1.daemon = True    #主进程死亡,子进程立即死亡
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        q.join()    #等待队列中数据全部取出
    

    线程

    #什么是线程
    	进程是资源单位
        线程才是执行单位、工作单位
        如果把'内存'比作工厂,那么'进程'就相当于工厂里面的车间,'线程'就相当于车间里面的流水线
        每个进程'最少有一个线程',线程才是真正的执行单位,进程只是在线程运行的时候提供代码所需要的资源
        进程、线程其实都是虚拟单位,都是用来帮助我们形象的描述某种事物
        一个进程可以开多个线程,并且线程与线程之间的数据是'共享的'
    
    #进程与线程的区别
    	开进程
        	1.申请名称空间	消耗资源
            2.拷贝代码		 消耗资源
        开线程
        	节约资源,开启线程的开销要远远小于开启线程的开销
            开启线程的速度与代码运行的速度在同一级别
    

    创建线程的两种方式

    #方式一
    from threading import Thread
    import time
    def task(name):
        print('%s is runing'%name)
        time.sleep(1)
        print('%s is over'%name)
    #开启线程不需要在__main__代码块内,但是也可以写在__main__内
    #创建一个线程
    t = Thread(target=task,args=('syy',))
    #告诉操作系统,开启线程,线程的开销要远远小于进程,小到代码执行完线程就已经开启了
    t.start()
    print('主')
    
    #方式二
    from threading import Thread
    import time
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):
            print('%s is runing'%self.name)
            time.sleep(1)
            print('%s is over'%self.name)
    #Windows开启线程不需要在__main__代码块内,但是也可以写在__main__内
    #创建一个线程
    t = MyThread('syy')
    #告诉操作系统,开启线程,线程的开销要远远小于进程,小到代码执行完线程就已经开启了
    t.start()
    #主进程/线程代码
    print('主')
    

    创建线程及其他方法

    #
    from threading import Thread,current_thread,active_count
    import time
    import os
    def task(name,i):
        print('线程: %s,%s is runing,子线程线程号: %s,子线程进程号: %s'%(i,name,current_thread().name,os.getpid()))
        time.sleep(1)
        print('线程: %s,%s is over'%(i,name))
    #创建线程
    t1 = Thread(target=task,args=('syy',1))
    t2 = Thread(target=task,args=('syy',2))
    t1.start()
    t2.start()
    t1.join()       #主线程等待子线程运行完毕
    t2.join()
    print('当前活跃的线程数: %s'%active_count())
    print('主线程线程号: %s,主线程进程号: %s'%(current_thread().name,os.getpid()))
    
    线程: 1,syy is runing,子线程线程号: Thread-1,子线程进程号: 18504
    线程: 2,syy is runing,子线程线程号: Thread-2,子线程进程号: 18504
    线程: 2,syy is over
    线程: 1,syy is over
    当前活跃的线程数: 1
    主线程线程号: MainThread,主线程进程号: 18504
    

    守护线程

    #主线程运行结束,也就意味着该进程的结束,所以主线程即便是运行完毕,也要等待子线程(非守护线程)运行结束
    
    from threading import Thread,current_thread,active_count
    import time
    import os
    def task(i):
        print('线程: %s,runing'%(i))
        time.sleep(1)
        print('线程: %s,over'%(i))
    #创建线程
    for i in range(3):
        t = Thread(target=task,args=(i,))
        t.start()
    print('主线程/进程')
    
    线程: 0,runing
    线程: 1,runing
    线程: 2,runing
    主
    线程: 2,over
    线程: 0,over
    线程: 1,over
    

    线程之间通信

    # 线程之间的数据可以共享
    
    from threading import Thread
    money = 666
    def task(i):
        global money
        money = 999
    #创建线程
    t = Thread(target=task,args=(1,))
    t.start()
    t.join()
    print('主线程/进程,%s'%money)
    
    999
    

    线程互斥锁

    #多线程操作同一份数据导致数据混乱
    from threading import Thread
    import time
    n = 100
    def task():
        global n
        tem = n
        time.sleep(1)
        n = tem -1
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    #主线程等待100个子线程运行结束之后再运行
    for t in t_list:
        t.join()
    print(n)
    
    99	#100个线程从主线程拿到的n都是 n=100,然后再各自减一 n-1=99,所以结果为99
    
    #锁/互斥锁
    from threading import Thread,Lock
    import time
    n = 100
    def task(mutex):
        global n
        #抢锁
        mutex.acquire()
        tem = n
        time.sleep(0.1)
        n = tem -1
        #释放锁
        mutex.release()
    t_list = []
    #主线程加锁
    mutex = Lock()
    for i in range(100):
        t = Thread(target=task,args=(mutex,))
        t.start()
        t_list.append(t)
    #主线程等待100个子线程运行结束之后再运行
    for t in t_list:
        t.join()
    print(n)
    
    0	#多线程从主线程依次拿到数据,抢锁,依次修改数据
    
    #针对不同的数据,要加不同的锁处理
    

    小例子

    #
    
    from threading import Thread
    import time
    def foo():
        print('1running')
        time.sleep(1)
        print('1end')
    def bra():
        print('2running')
        time.sleep(3)
        print('2end')
    if __name__ == '__main__':
        t1 = Thread(target=foo)
        t2 = Thread(target=bra)
        t1.daemon = True		#线程1设置为守护线程,主线程结束则线程1必须立即结束
        t1.start()
        t2.start()
        print('主')
        
    1running		#线程的创建所需资源少,所以先打印running再打印'主'
    2running
    主
    1end
    2end			#线程2sleep 3秒,主线程必须等待子线程(非守护进程)运行结束再结束,等线程2运行结束的时候,线程1已经运行结束,所以会打印出1end
    

    TCP服务端实现并发

    #服务端
    	1.有固定的IP和端口
        2.24小时不间断提供服务
        3.支持并发
    
    #server
    import socket
    from threading import Thread
    server = socket.socket()
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0:break
                print(data.decode('utf-8'))
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(e)
                break
            conn.close()
    while True:
        conn,addr = server.accept()
        print(addr)
        t = Thread(target=talk,args=(conn,))
        t.start()
    

    GIL全局解释器锁

    #python有很多种,最常用的就是CPython,用C语言写出来的
    #GIL
    	GIL本质上也是一把互斥锁,将并发编程串行,牺牲效率保证数据的安全
    	GIL还可以用来阻止同一个进程下多个线程的同时执行(只能依次执行),同一个进程中,多个线程无法实现并发,但是可以实现并行(无论单核还是多核)
        #GIL的存在是因为CPython解释器的内存管理不是线程安全的
        	系统创建一个进程,就分配一个python解释器线程,也分配一个垃圾回收机制线程
            如果没有GIL的话,Cpython代码就会利用多核优势实现并发,这也就意味着可能一串代码正在执行(使用一核),此时垃圾回收机制运行,那么垃圾回收机制就可能清理掉正在运行的代码,这种情况也就是'CPython的内存管理不安全'
            GIL全局解释器锁,是对python加锁,那么同一个进程内的多个线程(CPython解释器线程、垃圾回收机制线程、代码线程)将会抢锁、代码运行、释放锁、再抢锁、运行代码...,这样就不会出现内存不安全的情况了
        GIL不是python解释器的特点,它只是CPython解释器的特点
        单进程下,多个线程无法使用多核优势,这种情况是所有解释型语言的通病(Cpyhon、PHP),编译型语言不会出现这种情况
        #针对不同的数据,应该加不同的锁进行处理
        	GIL是对CPython解释器进行加锁,是用来保证线程的安全的(如果正在运行的代码线程需要进行IO,那么锁将会释放,此时代码的运行安全问题使用异步回调解决),但是代码线程之间的数据交互仍需要加锁
    	
    #垃圾回收机制
    	1.引用计数
        	判断变量名、函数名有没有值绑定,有的话计数就是非零
        2.标记清除
        	内存使用率较高的时候,自动进行全内存扫描,把引用计数为0的进程清理掉
        3.分代回收
    		把进程按不同的等级分配,不用的等级有不同的扫描频率
    

    GIL锁与普通锁

    #
    from threading import Thread
    import time
    n = 100
    def task():
        global n
        tmp = n
        time.sleep(1)	#该进程释放锁,其他进程强锁,此时别的进程得到n=100,可以再加普通锁来保证数据的安全
        n = tmp - 1
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    print(n)		#99
    
    from threading import Thread
    import time
    n = 100
    def task():
        global n
        tmp = n
        # time.sleep(1)		#该进程不进行IO操作,知道代码运行结束才会释放锁,其他进程再抢锁(串行)
        n = tmp - 1
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t.start()		#该进程与其他进程抢锁
        t_list.append(t)
    for t in t_list:
        t.join()
    print(n)		#0
    

    CPython的多线程是否有用

    #python的多线程是否有用,需要分情况讨论
    	1.计算密集型
        2.IO密集型
    
    #例
    	四个任务,计算密集型
    		1.单核,开一个进程,四个线程,较短
    		2.多核,开四个进程,再分别开一个线程,较长
    	四个任务,IO密集型,每个任务的执行时间是1秒
    		1.单核,开一个进程,四个线程,4秒
    		2.多核,开四个进程,再分别开一个线程,1秒+开线程的时间 > 4秒
            
    #进程与线程的使用
    	一般进程与线程组合使用
    
    #计算密集型
    from multiprocessing import Process
    from threading import Thread
    import time,os
    def work():
        res = 0
        for i in range(80000):
            res *=i
    if __name__ == '__main__':
        l = []                        #列表是一种队列,管道+锁
        # print(os.cpu_count())       #本机CPU核心数为8
        start = time.time()
        for i in range(8):
            # p = Process(target=work)    #耗时0.26081156730651855
            p = Thread(target=work)     #耗时0.03786516189575195
            l.append(p)
            p.start()
        for p in l:
            p.join()
        print(time.time()-start)
    
    #IO密集型
    from multiprocessing import Process
    from threading import Thread
    import time,os
    def work():
        time.sleep(0.1)
    if __name__ == '__main__':
        l = []                        #列表是一种队列,管道+锁
        # print(os.cpu_count())       #本机CPU核心数为8
        start = time.time()
        for i in range(8):
            p = Process(target=work)    #耗时0.3523292541503906
            # p = Thread(target=work)     #耗时0.10327386856079102
            l.append(p)
            p.start()
        for p in l:
            p.join()
        print(time.time()-start)
    

    死锁现象

    run方法与start方法

    #类与对象
    	只要类加括号实例化对象,无论传入的参数是否一样,生成的对象肯定不一样,单例模式除外
        
    #不要轻易的处理锁的问题,因为逻辑性太强
        
    from threading import Thread,Lock
    import time
    mutexA = Lock()
    mutexB = Lock()
    class MyThread(Thread):
        def run(self):			#类实例化产生对象的时候执行
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()     #self.name等价于current_thread().name
            print('%s抢到A锁'%self.name)       #线程2抢到A锁
            mutexB.acquire()
            print('%s抢到B锁'%self.name)       #线程2抢不到B锁,因为线程1卡死,线程2等线程1
            mutexB.release()
            print('%s释放了B锁'%self.name)
            mutexA.release()
            print('%s释放了A锁'%self.name)
        def func2(self):
            mutexB.acquire()
            print('%s抢到B锁'%self.name)  #线程1很容易就抢到B锁,因为别的线程此时正在抢A锁
            time.sleep(0.1)
            mutexA.acquire()    #线程1抢不到A锁,因为线程2还没释放A锁,线程1等线程2
            print('%s抢到A锁'%self.name)
            mutexA.release()
            print('%s释放了A锁'%self.name)
            mutexB.release()
            print('%s释放了B锁'%self.name)
    for i in range(10):
        t = MyThread()
        t.start()
    

    递归锁

    #RLock
    	RLock可以被第一个抢到锁的人连续的acquire和release
    	每acquire一次,锁身上的计数加一
    	每release一次,锁身上的计数减一
    	只要锁的计数不为0,其他人都不能抢
    	
    from threading import Thread,RLock
    import time
    mutexA = mutexB = RLock()
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('%s抢到A锁'%self.name)
            mutexB.acquire()
            print('%s抢到B锁'%self.name)
            mutexB.release()
            print('%s释放了B锁'%self.name)
            mutexA.release()
            print('%s释放了A锁'%self.name)
        def func2(self):
            mutexB.acquire()
            print('%s抢到B锁'%self.name)
            time.sleep(0.1)
            mutexA.acquire()
            print('%s抢到A锁'%self.name)
            mutexA.release()
            print('%s释放了A锁'%self.name)
            mutexB.release()
            print('%s释放了B锁'%self.name)
    for i in range(10):
        t = MyThread()
        t.start()
    

    信号量

    #信号量
    	信号量在不同的领域中,对应不同的知识点
        多个线程抢多个锁
        
    #互斥锁
    	多个线程抢一把锁
        
    from threading import Semaphore,Thread
    import time
    import random
    sm = Semaphore(5)   #造一个含有5个坑位的厕所
    def task(name):
        sm.acquire()
        print('%s占了一个坑位'%name)
        time.sleep(random.randint(1,3))
        sm.release()
        # print(f'%s释放了一个坑位'%(name))
    for i in range(40):
        t = Thread(target=task,args=(i,))
        t.start()
    

    event事件

    #join()方法
    	join方法实现了主进程、主线程等待子进程、子线程的执行,然后再执行
    #event()
    	event()方法实现了,子进程、子线程等待另一个子进程、子线程的执行,然后再执行
        
    from threading import Event,Thread
    import time
    e = Event()     #生成一个Event对象
    def light():
        print('红灯')
        time.sleep(3)
        print('绿灯')
        e.set()     #发信号
    def car(name):
        print('%s正在等红灯'%name)
        e.wait()    #等待信号
        print('%s加油门冲过路口'%name)
    t = Thread(target=light)
    t.start()
    for i in range(10):
        t = Thread(target=car,args=('伞兵%s号'%i,))
        t.start()
    

    线程与队列

    #线程与队列
    	同一个进程下的多个线程,数据本来就是共享的,为什么还要使用队列呢?
        因为队列是管道+锁,使用队列的话就不用手动操作锁的问题(避免产生死锁现象)
        #queue不能基于网络传输
        
    import queue
    q = queue.Queue()   #创建队列
    q.put('ha?')
    q.put('hei~')
    print(q.get())
    
    q = queue.LifoQueue()      #创建堆栈
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    
    q = queue.PriorityQueue()     #创建管道,数字最小优先级越高
    q.put((10,'ha?'))
    q.put((0,'he!'))
    q.put((-10,'hei~'))
    print(q.get())
        
    ha?
    3
    (-10, 'hei~')  
    

    进程池与线程池

    参考网站

    #池
    	硬件的发展速度跟不上软件的发展速度
    	池的出现就是为了计算机硬件的安全,保证计算机硬件安全的情况下,最大限度的利用计算机
        计算机内部有多种池,进程池、线程池、数据库连接池
        池其实是降低了程序的运行效率,但是保证了计算机硬件的安全 
        
    #提交任务的方式
    	1.同步,提交任务之后,原地等待任务的返回结果,期间不做任何事
        2.异步,提交任务之后,不等待任务的返回结果,直接执行下一行代码
        
    #进程池    
        from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
        import time
        import os
        #创建进程池
            #括号内可以传参,指定进程池内线程的个数,不传参的话默认为当前计算机CPU的个数5
            #池子中创建的进程、线程,创建一次就不会在创建了,以后用的进程、线程都是前面创建的,节省资源
        pool = ProcessPoolExecutor(5)
        def task(n):
            print(n,os.getpid())       #查看当前进程的进程号
            time.sleep(2)
            return n**2
        # pool.submit(task,'哈?')     #向进程池中提交任务,异步提交
        if __name__ == '__main__':
            t_list = []
            for i in range(20):
                res = pool.submit(task,i)
                # print(res)        #res为对象,异步提交,打印结果无规律
                # print(res.result()) #原地等待任务的返回结果,1.将并发变成串行,2.打印结果为任务的返回值
                t_list.append(res)
            pool.shutdown()     #1.关闭池子,2.等待池子中所有的任务执行完毕之后,在执行下面的代码
            for p in t_list:
                print('===',p.result())
    
    #线程池
        from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
        import time
        #创建线程池
            #括号内可以传参,指定线程池内线程的个数,不传参的话默认为当前计算机CPU的个数*5
        pool = ThreadPoolExecutor(5)
        def task(n):
            print(n)
            time.sleep(2)
            return n**2
        # pool.submit(task,'哈?')     #向线程池中提交任务,异步提交
        t_list = []
        for i in range(20):
            res = pool.submit(task,i)  
            # print(res)        #res为对象,异步提交,打印结果无规律
            # print(res.result()) #原地等待任务的返回结果,1.将并发变成串行,2.打印结果为任务的返回值
            t_list.append(res)
        pool.shutdown()     #1.关闭池子,2.等待池子中所有的任务执行完毕之后,在执行下面的代码
        for p in t_list:
            print('===',p.result())
    
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    === 0
    === 1
    === 4
    === 9
    === 16
    === 25
    === 36
    === 49
    === 64
    === 81
    === 100
    === 121
    === 144
    === 169
    === 196
    === 225
    === 256
    === 289
    === 324
    === 361
    
    #异步回调机制
    	当异步提交的任务有返回结果的时候,会自动触发回调函数的执行
        
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import os
    pool = ProcessPoolExecutor(5)
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**2
    def call_back(n):
        print('拿到了异步提交任务的返回结果',n.result())  #n为Future函数,n.result()为任务的返回值
    if __name__ == '__main__':
        t_list = []
        for i in range(20):
            #1。异步提交,2.提交任务之后,绑定一个回调函数,一旦该任务有结果,立即执行对应的回调函数
            res = pool.submit(task,i).add_done_callback(call_back)
            t_list.append(res)
        
    0 19944
    1 8800
    2 21084
    3 23852
    4 19920
    5 19944
    拿到了异步提交任务的返回结果 0
    6 8800
    拿到了异步提交任务的返回结果 1
    7 21084
    拿到了异步提交任务的返回结果 4
    8 23852
    拿到了异步提交任务的返回结果 9
    9 19920
    拿到了异步提交任务的返回结果 16
    10 19944
    拿到了异步提交任务的返回结果 25
    11 8800
    拿到了异步提交任务的返回结果 36
    12 21084
    拿到了异步提交任务的返回结果 49
    13 23852
    拿到了异步提交任务的返回结果 64
    14 19920
    拿到了异步提交任务的返回结果 81
    15 19944
    拿到了异步提交任务的返回结果 100
    16 8800
    拿到了异步提交任务的返回结果 121
    17 21084
    拿到了异步提交任务的返回结果 144
    18 23852
    拿到了异步提交任务的返回结果 169
    19 19920
    拿到了异步提交任务的返回结果 196
    拿到了异步提交任务的返回结果 225
    拿到了异步提交任务的返回结果 256
    拿到了异步提交任务的返回结果 289
    拿到了异步提交任务的返回结果 324
    拿到了异步提交任务的返回结果 361
    

    协程

    #协程
    	单进程下实现并发,该效果称为协程
        
    #进程
    	资源单位
    #线程
    	执行单位
    #并发
    	切换+保存状态,看起来像是同时执行
        
    #程序员自己通过代码,检测程序中的IO,一旦遇到IO,程序自己通过代码切换,给操作系统的感觉就是锁运行的程序没有任何的IO,这样进程将一直在运行态和就绪态之间来回切换,提升代码运行的效率
        
    #程序的切换+保存状态一定会提高代码运行的效率吗?
    	1.当任务是IO密集型的情况下,会提高效率
        2.当任务是计算密集型的情况下,降低效率
        
    #1.当任务是计算密集型的情况下,降低效率
    import time
    def func1():
        for i in range(1000000):
            i+1
    def func2():
        for i in range(1000000):
            i+1
    start = time.time()
    func1()
    func2()
    stop = time.time()
    print(stop-start)		#0.09994626045227051
        
        #使用关键字yield,来实现代码的切换+保存状态
    import time
    def func1():
        while True:
            1000000+1
            yield		#yield无法识别IO,会导代码卡住
    def func2():
        g = func1()
        for i in range(1000000):
            i+1
            next(g)
    start = time.time()
    func1()
    func2()
    stop = time.time()
    print(stop-start)		#0.1855010986328125
    
    #2.gevent模块
        注意,gevent模块没办法识别time、sleep等IO代码,需要再导入monkey模块,配置一个参数monkey.path_all(),这样就可以识别所有的IO了
        #不使用spawn模块,计算代码执行上的时间
    from gevent import monkey;monkey.patch_all()    #该模块经常被使用,可以写成一行
    from gevent import spawn
    import time
    def heng():
        print('哼')
        time.sleep(2)
        print('哼')
    def ha():
        print('哈')
        time.sleep(3)
        print('哈')
    start = time.time()
    heng()
    ha()
    stop = time.time()
    print(stop-start)		#5.025084972381592
        
        #使用spawn模块,计算代码执行的时间
    from gevent import monkey;monkey.patch_all() 
    from gevent import spawn
    import time
    def heng():
        print('哼')
        time.sleep(2)
        print('哼')
    def ha():
        print('哈')
        time.sleep(3)
        print('哈')
    start = time.time()
    g1 = spawn(heng)       #把所有需要监测的代码传入spawn()内,切换+保存状态将会在这些代码进行
    g2 = spawn(ha)
    g1.join()
    g2.join()
    stop = time.time()
    print(stop-start)		#3.0223612785339355
    
    	#spawn模块的短板效应,spawn()内的代码运行的时间永远是,IO时间最长的那个代码的IO时间+代码切换时间
    from gevent import monkey;monkey.patch_all() 
    from gevent import spawn
    import time
    def heng():
        print('哼')
        time.sleep(2)
        print('哼')
    def ha():
        print('哈')
        time.sleep(3)
        print('哈')
    def hei():
        print('嘿')
        time.sleep(5)
        print('嘿')
    start = time.time()
    g1 = spawn(heng)		#g1是一个对象      
    g2 = spawn(ha)
    g3 = spawn(hei)
    g1.join()
    g2.join()
    g3.join()
    stop = time.time()
    print(stop-start)		#5.023629426956177
    

    使用协程实现TCP服务端的并发

    #server
    import socket
    from gevent import monkey;monkey.patch_all()
    from gevent import spawn
    server = socket.socket()
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0:break
                print(data.decode('utf-8'))
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(e)
                break
        conn.close()
    def server1():
        while True:
            conn,addr = server.accept()
            spawn(talk,conn)
    if __name__ == '__main__':
        g1 = spawn(server1)       #server不用加扩号,swpawn函数自动加括号调用
        g1.join()
        
    #client
    import socket
    import time
    from threading import Thread,current_thread
    def client():
        client = socket.socket()
        client.connect(('127.0.0.1', 8080))
        n = 0
        while True:
            data = '%s,%s'%(current_thread().name,n)
            client.send(data.encode('utf-8'))
            res = client.recv(1024)
            print(res.decode('utf-8'))
            n +=1
    for i in range(400):
        t = Thread(target=client)
        t.start()
    
    #进程、线程、协程
    	多进程下开多线程,多线程下开协程(使用spawn模块)
    

    IO模型

    参考网站

    
    
  • 相关阅读:
    Win8 消费者预览版中文版下载地址 官方原版
    Easyui datagrid加载本地Json数据
    myeclipse 8.510.0 安装 svn 方法
    easyui tree使用方法
    安装Oracle 11g r2先决条件检查失败解决方法
    Win8/Win7或XP 双系统安装图文教程
    Oracle存储过程与函数
    MyEclipse 中 使用 TortoiseSVN(1)
    MyEclipse 中 使用 TortoiseSVN(2)
    easyui使用Ajax提交表单,返回Json数据
  • 原文地址:https://www.cnblogs.com/syy1757528181/p/14121294.html
Copyright © 2011-2022 走看看