zoukankan      html  css  js  c++  java
  • 进程相关

    * 进程对象及其他方法

    multiprocessing模块介绍

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

    multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

    Process类的介绍

    创建进程的类:

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:

    group参数未使用,值始终为None
    target表示调用对象,即子进程要执行的任务
    args表示调用对象的位置参数元组,args=(1,2,'egon',)
    kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    name为子进程的名称

    方法介绍:

    p.start():启动进程,并调用该子进程中的p.run() 
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果p仍然运行,返回True
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

    属性介绍:

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    p.name:进程的名称
    p.pid:进程的pid
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功

    案例介绍:

    from multiprocessing import Process, current_process
    import time
    import os


    def task():
    # print('%s is running'%current_process().pid) # 查看当前进程的进程号
    print('%s is running' % os.getpid()) # 查看当前进程的进程号
    # print('子进程的主进程号%s'%os.getppid()) # 查看当前子进程的主进程号
    time.sleep(30)


    if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.terminate() # 杀死当前进程
    # 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
    time.sleep(0.1)
    print(p.is_alive()) # 判断当前进程是否存活
    """
    一般情况下我们会默认将
    存储布尔值的变量名
    和返回的结果是布尔值的方法名
    都起成以is_开头
    """
    print('主')
    # print('主',current_process().pid)
    # print('主',os.getpid())
    # print('主主',os.getppid()) # 获取父进程的pid号

    * 僵尸进程与孤儿进程

    僵尸进程所有的进程都会步入僵尸进程):死了但是没有死透

    当你开设了子进程之后 该进程死后不会立刻释放占用的进程号
    因为我要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间。。。

    孤儿进程:子进程存活,父进程意外死亡
    操作系统会开设一个“儿童福利院”专门管理孤儿进程回收相关资源

    * 守护进程:被守护进程死亡,守护进程会跟着死亡

    主进程创建守护进程

    其一:守护进程会在主进程代码执行结束后就终止

    其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    案例:

    from multiprocessing import Process
    import time


    def task(name):
    print('%s总管正在活着' % name)
    time.sleep(3)
    print('%s总管正在死亡' % name)


    if __name__ == '__main__':
    p = Process(target=task, args=('egon',))
    # p = Process(target=task,kwargs={'name':'egon'})
    p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
    p.start()
    print('皇帝jason寿终正寝')

    * 互斥锁

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

    而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

    多个进程共享同一打印终端并发运行,效率高,但竞争同一打印终端,带来了打印错乱(多个进程操作同一份数据的时候,会出现数据错乱的问题

    针对上述问题,解决方式就是加锁处理:**将并发变成串行,牺牲效率但是保证了数据的安全**

    案例:

    data文件中存放json格式数据,模拟余票为1{"ticket_num": 1}

    模拟10个用户同时抢票,但车票只有一张:

    from multiprocessing import Process, Lock
    import json
    import time
    import random


    # 查票
    def search(i):
    # 文件操作读取票数
    with open('data', 'r', encoding='utf8') as f:
    dic = json.load(f)
    print('用户%s查询余票:%s' % (i, dic.get('ticket_num')))
    # 字典取值不要用[]的形式 推荐使用get 你写的代码打死都不能报错!!!


    # 买票 1.先查 2.再买
    def buy(i):
    # 先查票
    with open('data', 'r', encoding='utf8') as f:
    dic = json.load(f)
    # 模拟网络延迟
    time.sleep(random.randint(1, 3))
    # 判断当前是否有票
    if dic.get('ticket_num') > 0:
    # 修改数据库 买票
    dic['ticket_num'] -= 1
    # 写入数据库
    with open('data', 'w', encoding='utf8') as f:
    json.dump(dic, f)
    print('用户%s买票成功' % i)
    else:
    print('用户%s买票失败' % i)


    # 整合上面两个函数
    def run(i, mutex):
    search(i)
    # 给买票环节加锁处理
    # 抢锁
    mutex.acquire()

    buy(i)
    # 释放锁
    mutex.release()


    if __name__ == '__main__':
    # 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
    mutex = Lock()
    for i in range(1, 11):
    p = Process(target=run, args=(i, mutex))
    p.start()

    * 队列介绍

    队列:先进先出
    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
    参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。
    主要方法:
    q = Queue(5) 创建队列,括号内可以传数字 标示生成的队列最大可以同时存放的数据量
    q.full()  判断当前队列是否满了
    q.empty() 判断当前队列是否空了
    q.put(111) 往队列中存数据,当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错
    v1 = q.get() 去队列中取数据,
    队列中如果已经没有数据的话 get方法会原地阻塞
    v2 = q.get_nowait() 没有数据直接报错queue.Empty
    v3 = q.get(timeout=3) 没有数据之后原地等待三秒之后再报错  queue.Empty

    ps:q.full()、q.empty()、q.get_nowait()在多进程的情况下是不精确,一个进程刚查看了队列,结果另一个进程又操作了队列,就出现了不精确的情况

    * 进程间通信IPC机制

    1.主进程跟子进程借助于队列通信
    2.子进程跟子进程借助于队列通信

    案例:
    from multiprocessing import Queue, Process


    def producer(q):
    q.put('我是23号技师 很高兴为您服务')


    def consumer(q):
    print(q.get())


    if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q,))
    p1 = Process(target=consumer, args=(q,))
    p.start()
    p1.start()

    * 生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    案例:

    from multiprocessing import Process, Queue, JoinableQueue
    import time
    import random


    def producer(name,food,q):
    for i in range(5):
    data = '%s生产了%s%s'%(name,food,i)
    # 模拟延迟
    time.sleep(random.randint(1,3))
    print(data)
    # 将数据放入 队列中
    q.put(data)


    def consumer(name,q):
    # 消费者胃口很大 光盘行动
    while True:
    food = q.get() # 没有数据就会卡住
    # 判断当前是否有结束的标识
    # if food is None:break
    time.sleep(random.randint(1,3))
    print('%s吃了%s'%(name,food))
    q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了


    if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer,args=('大厨egon','包子',q))
    p2 = Process(target=producer,args=('马叉虫tank','泔水',q))
    c1 = Process(target=consumer,args=('春哥',q))
    c2 = Process(target=consumer,args=('新哥',q))
    p1.start()
    p2.start()
    # 将消费者设置成守护进程
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    # 等待生产者生产完毕之后 往队列中添加特定的结束符号
    # q.put(None) # 肯定在所有生产者生产的数据的末尾
    # q.put(None) # 肯定在所有生产者生产的数据的末尾
    q.join() # 等待队列中所有的数据被取完再执行往下执行代码
    """
    JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
    没当你调用task_done的时候 计数器-1
    q.join() 当计数器为0的时候 才往后运行
    """
    # 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了
  • 相关阅读:
    观察者模式学习--使用jdk的工具类简单实现
    观察者模式的初始学习--自己实现
    反射 reflect 初始学习
    eclipse java 空心J文件的回复
    linux 命令 more
    spring 3 的 @value的使用
    linux rm 命令
    linux log find 查询
    Resource is out of sync with the file system
    JavaScript(七)数组
  • 原文地址:https://www.cnblogs.com/h1227/p/12763555.html
Copyright © 2011-2022 走看看