zoukankan      html  css  js  c++  java
  • python 进程 进程间的数据隔离 守护进程 锁 信号量 事件

    Process

    进程之间的数据隔离问题

    守护进程    报活

    几个进程模型 ----- 进程同步工具

      有先后顺序的    就是同步

      进程之间     就是异步

      希望原本异步的多进程操作,维持一个顺序 --- 同步工具

    1 锁  Lock  重要   买票

    2信号量  Semaphore   ktv

    3事件  Event    信号灯

    一  进程之间的数隔离

    进程与进程之间的数据是隔离的

    内存空间是不能共享的

    所以要想进行通信,必须借助其他手段

    且这两个进程都是自愿的

    子进程的执行结果父进程获取不到

    父进程依赖子进程的执行结果

    父进程如何获取子进程的执行结果:父进程之间通过socket通信

    守护进程:

    会随着主进程的结束而结束

    主进程创建守护进程:

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,负责抛出异常:AssertionError: daemonic processes are not allowed to have children
    注意:进程之间是相互独立的,主进程代码运行结束,守护进程随即终止

    import os
    import time
    from multiprocessing import Process
    
    class Myprocess(Process):
        def __init__(self,person):
            super().__init__()
            self.person = person
        def run(self):
            print(os.getpid(),self.name)
            print('%s正在和女主播聊天' %self.person)
    
    
    p=Myprocess('哪吒')
    p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id
    print('主')
    
    守护进程的启动
    
    from multiprocessing import Process
    
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    p1=Process(target=foo)
    p2=Process(target=bar)
    
    p1.daemon=True
    p1.start()
    p2.start()
    time.sleep(0.1)
    print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止.
    
    主进程代码执行结束守护进程立即结束
    
    import time
    from multiprocessing import Process
    
    # 例一
    # def func1():
    #     print('begin')
    #     time.sleep(3)
    #     print('wahaha')
    
    # if __name__ == '__main__':
    #     p = Process(target=func1)
    #     p.daemon = True
    #     # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程
    #     # 设置守护进程的操作应该在开启子进程之前
    #     p.start()
    #
    #     time.sleep(1)
    #     print('主进程')
    
    # 例二
    def func1():
        print('begin')
        time.sleep(3)
        print('wahaha')
    
    def func2():
        while True:
            print('in func2')
            time.sleep(0.5)
    
    if __name__ == '__main__':
        Process(target=func1).start()
        p = Process(target=func2)
        p.daemon = True
        # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程
        # 设置守护进程的操作应该在开启子进程之前
        p.start()
        time.sleep(1)
        print('主进程')
    

    设置守护进程之后,会有什么效果呢?

    守护进程会在主进程的代码执行完毕后直接结束,无论守护进程是否执行完毕

    应用:

      报活 主进程还活着

      100台机器 100个进程  10000进程

      应用是否在正常工作 ----- 任务管理器来查看

      守护进程如何向监测机制报活呢?

      为什么要用守护进程来报活呢?为什么不用主进程来工作呢??

        守护进程报活几乎不占用cpu ,也不需要操作系统去调度

        主进程能不能严格的每60s就发送一条信息

      

      

    使用并发的实例:

    socet聊天并发实例: 

     所有的进程的基本使用
    # 进程 : 同一时刻可以做多件事情 互相之间不影响
    # socket tcp server
    # 采用多进程的知识点 来解决原生socket同一时刻只能和一个conn通信的弊端

    from socket import *
    from multiprocessing import Process
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    
    def talk(conn,client_addr):
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__': #windows下start进程一定要写到这下面
        while True:
            conn,client_addr=server.accept()
            p=Process(target=talk,args=(conn,client_addr))
            p.start()
    
    使用多进程实现socket聊天并发-server
    
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    
    client端
    
    import socket
    from multiprocessing import Process
    def talk(conn):
        try:
            while True:
                conn.send(b'hello')
                print(conn.recv(1024))
        finally:
            conn.close()
    if __name__ == '__main__':
        sk = socket.socket()
        sk.bind(('127.0.0.1',9091))
        sk.listen()
        try:
            while True:
                conn,addr = sk.accept()
                Process(target=talk,args=(conn,)).start()
        finally:
            sk.close()

    server
    import socket
    import os
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',9091))
    
    while True:
        print(sk.recv(1024))
        sk.send(str(os.getpid()).encode('utf-8'))
    client

    进程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

    锁 —— multiprocess.Lock

    对它的认识是数据的保护,在多人同时使用一个数据库时查看用不到,但是稍微更改一点点就需要一个一个来改,不能一起更改,会出现数据的错误,使用时的大方向是多人,但是只能一个一个的确认后才能执行接下来的人:

    多进程共享一段数据的时候,数据会出现不安全的现象:

    需要锁来维护数据的安全性:

    lock= Lock()
    lock.acquire()  # 拿钥匙
    print(111)
    lock.release()  # 还钥匙
    lock.acquire()  # 阻塞
    print(222)

    from multiprocessing import Lock
    from multiprocessing import Process
    # 锁
    # lock = Lock() # 创造了一把锁
    # lock.acquire() # 获取了这把锁的钥匙
    # lock.release() # 归还这把锁的钥匙
    
    # 抢票的例子
    # 每个人都能
    # 查看余票
    # 买票
    import json
    import time
    from multiprocessing import Lock
    from multiprocessing import Process
    def search(i):
        with open('db','r') as f:count_dic = json.load(f)
        time.sleep(0.2)
        print('person %s 余票 : %s张'%(i,count_dic['count']))
    
    def buy(i):
        with open('db','r') as f:count_dic = json.load(f)
        time.sleep(0.2)
        if count_dic['count'] > 0:
            count_dic['count'] -= 1
            print('person %s 购票成功'%i)
        time.sleep(0.2)
        with open('db','w') as f:json.dump(count_dic,f)
    
    def task(i,lock):
        search(i)
        lock.acquire()   # 如果之前已经被acquire了 且 没有被release 那么进程会在这里阻塞
        buy(i)
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=task,args=(i,lock))
            p.start()
    

      

          通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。

      当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

    import os
    import time
    import random
    from multiprocessing import Process
    
    def work(n):
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' %(n,os.getpid()))
    
    if __name__ == '__main__':
        for i in range(3):
            p=Process(target=work,args=(i,))
            p.start()
    
    多进程抢占输出资源
    
    # 由并发变成了串行,牺牲了运行效率,但避免了竞争
    import os
    import time
    import random
    from multiprocessing import Process,Lock
    
    def work(lock,n):
        lock.acquire()
        print('%s: %s is running' % (n, os.getpid()))
        time.sleep(random.random())
        print('%s: %s is done' % (n, os.getpid()))
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        for i in range(3):
            p=Process(target=work,args=(lock,i))
            p.start()
    
    使用锁维护执行顺序
    

    上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。

      接下来,我们以模拟抢票为例,来看看数据安全的重要性。

    #文件db的内容为:{"count":1}
    #注意一定要用双引号,不然json无法识别
    #并发运行,效率高,但竞争写同一文件,数据写入错乱
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db'))
        time.sleep(0.1) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2) #模拟写数据的网络延迟
            json.dump(dic,open('db','w'))
            print('33[43m购票成功33[0m')
    
    def task():
        search()
        get()
    
    if __name__ == '__main__':
        for i in range(100): #模拟并发100个客户端抢票
            p=Process(target=task)
            p.start()
    
    多进程同时抢购余票
    
    #文件db的内容为:{"count":5}
    #注意一定要用双引号,不然json无法识别
    #并发运行,效率高,但竞争写同一文件,数据写入错乱
    from multiprocessing import Process,Lock
    import time,json,random
    def search():
        dic=json.load(open('db'))
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
    def get():
        dic=json.load(open('db'))
        time.sleep(random.random()) #模拟读数据的网络延迟
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(random.random()) #模拟写数据的网络延迟
            json.dump(dic,open('db','w'))
            print('33[32m购票成功33[0m')
        else:
            print('33[31m购票失败33[0m')
    
    def task(lock):
        search()
        lock.acquire()
        get()
        lock.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(100): #模拟并发100个客户端抢票
            p=Process(target=task,args=(lock,))
            p.start()
    
    使用锁来保证数据安全
    
    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    2.需要自己加锁处理
    
    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    队列和管道都是将数据存放于内存中
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

    信号量:

    # 信号量的本质
        # 多把钥匙对应一把锁
        # lock+count计数

    它是锁的派生,原理是锁+计数器:

    可以控制多人的修改,但是修改的分别是不同的修改物'

    # from multiprocessing import Process
    # from multiprocessing import Semaphore  # 信号量
    
    # ktv
    # 4个小房子
        # 10个人站在房子外面要进去玩儿
    
    
    
    # sem = Semaphore(3)
    # sem.acquire()
    # print(1)
    # sem.acquire()
    # print(2)
    # sem.acquire()
    # print(3)
    # sem.release()
    # sem.acquire()  # 阻塞
    # print(4)
    import time
    import random
    from multiprocessing import Process,Semaphore
    def ktv(num,sem):
        sem.acquire()
        print('person%s进入了ktv' % num)
        time.sleep(random.randint(1,4))
        print('person%s走出了ktv' % num)
        sem.release()
    if __name__ == '__main__':
        sem = Semaphore(4)
        for i in range(10):
            p = Process(target=ktv,args=(i,sem))
            p.start()
    

    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    实现:
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念  

    from multiprocessing import Process,Semaphore
    import time,random
    
    def go_ktv(sem,user):
        sem.acquire()
        print('%s 占到一间ktv小屋' %user)
        time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
        sem.release()
    
    if __name__ == '__main__':
        sem=Semaphore(4)
        p_l=[]
        for i in range(13):
            p=Process(target=go_ktv,args=(sem,'user%s' %i,))
            p.start()
            p_l.append(p)
    
        for i in p_l:
            i.join()
        print('============》')
    
    例子
    

    事件,

    事件的会将事物分的更为细致:

    is_set 查看此时的状态 状态有两种True 和Flase

    set  设置为True  

    clear  设置为Flase

    wait(参数)    在不设置参数时,是True为pass,而Flase阻塞,可以通过改变来改变

          默认是阻塞,可以设置阻塞的时间按秒来进行,设置后时间过后不论是True或Flase都会pass  

    # 并发的时候
    # 很多模型
        # 事件
    from multiprocessing import Event,Process
    # wait() 方法 等待
        # 阻塞  如果这个标志是False 那么就阻塞
        # 非阻塞  如果这个标志是True 那么就非阻塞
    # 查看标志 is_set()
    # 修改标志 set()将标志设置为True
              #clear() 将标志设置为False
    # e = Event()
    # print(e.is_set())  # 在事件的创建之初 默认是False
    # e.set()            # 将标志设置为True
    # print(e.is_set())
    # e.wait()           # 相当于什么都没做pass
    # e.clear()          # 将标志设置为False
    # # e.wait()           # 永远阻塞
    # e.wait(timeout=10) # 如果信号在阻塞10s之内变为True,那么不继续阻塞直接pass,
    #                     # 如果就阻塞10s之后状态还是没变,那么继续,
    # print(e.is_set())  # 无论前面的wait的timeout是否通过,我的状态都不会因此改变
    
    # 红绿灯模型
    # 控制交通灯的进程
    import time
    import random
    def traffic_light(e):
        print('33[1;31m 红灯亮33[0m')
        while True:
            time.sleep(2)
            if e.is_set():
                print('33[1;31m 红灯亮33[0m')
                e.clear()
            else:
                print('33[1;32m 绿灯亮33[0m')
                e.set()
    
    
    # 车 等或者通过
    def car(id,e):
        if not e.is_set():
            print('car %s 等待' % id)
            e.wait()
        print('car %s 通过'%id)
    
    def police_car(id,e):
        if not e.is_set():
            e.wait(timeout = 0.5)
        print('police car %s 通过' % id)
    
    # 主进程 启动交通控制灯 启动车的进程
    if __name__ == '__main__':
        e = Event()
        p = Process(target=traffic_light,args=(e,))
        p.start()
        car_lst = [car,police_car]
        for i in range(20):
            p = Process(target=random.choice(car_lst), args=(i,e))
            p.start()
            time.sleep(random.randrange(0,3,2))
    
    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    
        事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    
    clear:将“Flag”设置为False
    set:将“Flag”设置为True
    
    事件介绍
    

    总结:

      进程之间是数据隔离的

      进程与进程之间是不能自由的交换内存数据的

      全局变量在子进程中修改,其他进程是感知不到的

    守护进程

      特点 生命周期值和主进程的代码有关系,和其他子进程没关系

      用处 报活

     多进程启动tcp协议的socket来完成并发

    进程的同步控制  -进程之间有一些简单的信号传递,但是用户并不能感知,且用户不能传递自己想传递的内容

      锁   

      信号量  10个进程 4把钥匙

          锁加 计数器实现的

      事件 wait

      

      

      

      

      

      

      

      

      

      

  • 相关阅读:
    flex 圣杯布局
    .net mvc里AutoMapper更为便捷的使用方法
    使用ClaimsIdentity来实现登录授权
    判断js中数据类型 的最短代码
    输入分钟、秒倒计时
    .net 导出Excel插件Npoi的使用
    seajs简单使用
    图片懒加载
    dropload上拉加载更多
    基于移动端的左滑删除
  • 原文地址:https://www.cnblogs.com/lnrick/p/9366978.html
Copyright © 2011-2022 走看看