zoukankan      html  css  js  c++  java
  • 并发编程之多进程

    进程:
    进程是正在进行的一个过程或者一个任务。负责执行这个任务的是cpu

    进程与程序的区别:
    程序:仅仅只是一堆代码
    进程:是程序运行的过程

    并发与并行
    并发只能达到伪并行,看上去是同时进行的,其实并不是,单个cpu+多道技术就可以实现并发
    并行:是同时运行,必须具备多个cpu,才能实现并行的功能

    进程的状态
    其实在两种情况下会导致一个进程在逻辑上不能运行,
     1、进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作
     2、与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

     开启进程的两种方式
     

    # 1、第一种方式
     from multiprocessing import Process,current_process
    
    
    def task(name):
        print("%s is running, 进程名:%s" % (name, current_process().name))
    
    if __name__ == '__main__':
        p = Process(target=task, args=(1,) ,name="子进程")
        p.start() # 向操作系统发送一个开启一个进程的请求
    
        print("主")
    '''
    打印结果:
    主
    1 is running, 进程名:子进程
    '''
    
    # 2、第二种方式
    class MyProcess(Process):
        def __init__(self, process, name):
            super().__init__()
            self.process = process
            self.name = name
    
        def run(self): # 将需要调用进程的方法写入run中, 函数名run不能修改
            print("%s is running, 进程名:%s" % (self.process, current_process().name))
    
    
    if __name__ == '__main__':
        p = MyProcess("xu","子进程")
        p.start()
        print("主")
    
    '''
    打印结果:
    主
    1 is running, 进程名:子进程
    '''
    

     
    进程的join方法:

    # join方法
    from multiprocessing import Process, current_process
    import time
    
    
    def task(name):
        print("%s is running, 进程名:%s" % (name, current_process().name))
        time.sleep(1)
    '''
    执行结果:
    1 is running, 进程名:子进程1
    1 is end, 进程名:子进程1
    主
    '''
    

    进程的p.join()方法是当p进程结束之后,才会执行join方法后面的程序

    # Process的其他属性和方法

    1、terminate和is_alive
    from multiprocessing import Process
    import time
    import random
    
    def task(name):
        print('%s is piaoing' %name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %name)
    
    if __name__ == '__main__':
        p1=Process(target=task,args=('egon',))
        p1.start()
    
        p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
        print(p1.is_alive()) #结果为True
    
        # is_alive()为判断进程是否还存活
    
        print('主')
        print(p1.is_alive()) # 此时p1进程已经关闭,所以结果为False
    
    '''
    执行结果:
    True
    主
    False
    '''
    
    2、name与pid
    from multiprocessing import Process
    import time
    import random
    import os
    
    def task(name):
        print('%s is running' %name)
        time.sleep(random.randrange(1,5))
        print("父进程pid:%s, 子进程pid:%s" %(os.getppid(), os.getpid()))
        print('%s is end' % name)


    if __name__ == '__main__':
        p1=Process(target=task,args=('xu',), name="子进程1") # name属性设置进程的名称
        p1.start()
        print(p1.name) # 打印p1进程的名称
        print('主', os.getpid())

    '''
    打印结果:
    子进程1
    主 15752
    xu is running
    父进程pid:15752, 子进程pid:15753
    xu is end
    '''

    僵尸进程:
    当子进程结束掉后,并不是完完整整清除掉,其还会保留它的pid,供给主进程查看,只有当主进程结束掉以后,才会被回收掉。
    1、子进程结束掉以后,会保留一些子进程的状态信息,如pid,当主进程结束掉以后,才会被回收掉
    2、僵尸进程有害,当子进程结束后,会保留pid,其他进程无法使用,会使得服务器负载量过大。
    孤儿进程
    1、当子进程未结束,主进程结束,则会使得子进程变成孤儿进程
    2、在linux系统中,有一个init进程,此进程是所有进程的父亲,当一个进程变成孤儿进程后,会被init接管,由此进程将那些孤儿进程结束掉。


    守护进程
    1、守护进程会在主进程执行结束后结束。
    2、守护进程内无法再开启子进程,否则会抛出异常。

    from multiprocessing import Process,current_process
    import time
    
    
    def task():
        print("%s is running" % current_process().name)
        time.sleep(2)
        print("%s is end" % current_process().name)
    
    if __name__ == '__main__':
        p1 = Process(target=task, name="进程1")
        p1.daemon = True # 开启守护进程,开启的动作必须要在start之前
        p1.start()
    
        print("主")  # 当此程序执行后,主进程便执行结束了,那么守护线程也需要结束
    
    '''
    打印结果:
    主
    
    '''
    

     
    互斥锁
    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱

    from multiprocessing import Process, current_process
    import time
    
    
    def task():
        print("%s is running" % current_process().name)
        time.sleep(2)
        print("%s is end" % current_process().name)
    
    if __name__ == '__main__':
        for i in range(3):
            p = Process(target=task, name="进程%s" %i)
            p.start()
    '''
    打印结果:
    进程0 is running
    进程1 is running
    进程2 is running
    进程0 is end
    进程1 is end
    进程2 is end
    '''
    


    如果没有发生错乱,那么应该打印结果为:
    进程0 is running
    进程0 is end
    进程1 is running
    进程1 is end
    进程2 is running
    进程2 is end
    如何控制其行为呢,就需要加锁,而这把锁就为互斥锁

    from multiprocessing import Process, current_process, Lock
    import time
    
    
    def task(mutex):
        mutex.acquire()
        print("%s is running" % current_process().name)
        time.sleep(2)
        print("%s is end" % current_process().name)
        mutex.release()
    
    if __name__ == '__main__':
        mutex = Lock() # 创建互斥锁对象,需要保证每个进程里面的锁是同一把,所以需要将此对象传给task
        for i in range(3):
            p = Process(target=task, args=(mutex, ), name="进程%s" %i)
            p.start()
    '''
    打印结果:
    进程0 is running
    进程0 is end
    进程1 is running
    进程1 is end
    进程2 is running
    进程2 is end
    '''
    


    互斥锁模拟抢票功能
    # 模拟抢票功能

    from multiprocessing import Process, Lock
    import time
    import json
    
    def show_ticket():
        time.sleep(1)
        ticket_dict = json.load(open("ticket_num.json", "r", encoding="utf-8"))
        print("票数为:%s" %(ticket_dict["count"]))
    
    
    def pay_ticket(mutex):
        show_ticket()
        mutex.acquire()
        time.sleep(1)
        dict = json.load(open("ticket_num.json", "r", encoding="utf-8"))
        if dict["count"] > 0:
            print("购票成功")
            dict["count"] -= 1
            json.dump(dict, open("ticket_num.json", "w", encoding="utf-8"))
        else:
            print("没有票了, 购票失败")
        mutex.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(10):
            p = Process(target=pay_ticket, args=(mutex, ))
            p.start()
    

     

    队列
    from multiprocessing import Process,Queue

    q=Queue(3) # 3为队列中允许最大项数,省略则无大小限制

    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(1) # 在队列中放置数据
    q.put(2)
    q.put(3)
    print(q.full()) #满了
    # q.put(4) #再放就阻塞住了

    print(q.get())
    print(q.get())
    print(q.get())
    print(q.empty()) #空了
    # print(q.get()) #再取就阻塞住了

    ps:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

    # 两个队列形成一个栈
    '''
    队列特性:先进先出
    堆栈特性:先进后出
    
    实现思路:
    # 存数据阶段
    1、先创建两个队列
    2、将数据存入第一个队列中
    # 取数据阶段
    1、判断第一个队列是否为空, 若为空,则返回None
    2、将第一个队列中的数据与挨个取出存入第二个队列,当第一个队列中的数量为最后一个的时候,这最后一个数据便是最后存储进队列中的,将其取出,然后返回,然后将第一个队列和第二个队列对调,一直循环,这样就能形成先进后出
    
    '''
    import queue
    
    class Stack(object,):
        def __init__(self):
            self.one_queue = queue.Queue()
            self.two_queue = queue.Queue()
    
    
        def push(self, data):
            """
            将数据放入第一个队列中
            :return:
            """
            self.one_queue.put(data)
    
        def getdata(self):
            """
            将数据取出
            :return:
            """
            if self.one_queue.qsize() == 0:
                return None
    
            while 1:
                if self.one_queue.qsize() == 1:
                    value = self.one_queue.get()
                    break
                self.two_queue.put(self.one_queue.get())
         # 将队列对换 temp
    = self.one_queue self.one_queue = self.two_queue self.two_queue = temp return value if __name__ == '__main__': stack = Stack() stack.push("xu") stack.push("sy") stack.push("songy") print(stack.getdata()) print(stack.getdata()) print(stack.getdata())


    生产者消费者模型
    用多进程和队列实现生产者消费者模型

    from multiprocessing import Process, Queue
    import time
    
    
    def producer(name, q):
        for i in range(3):
            print("生产者%s生产了%s包子" %(name, i))
            q.put({name: i})
            time.sleep(0.5)
    
    
    def consumer(name, q):
        while True:
            package = q.get()
            print("消费者%s吃了包子%s" %(name, package))
            time.sleep(1)
    
    if __name__ == '__main__':
        q = Queue()
        producers = ["xu", "sy", "songyuan"]
        consumers = ["guang", "chong",]
        # 生产者们
        for one_producer in producers:
            p = Process(target=producer, args=(one_producer, q, ))
            p.start()
    
        # 消费者们
    
        for one_consumer in consumers:
            p = Process(target=consumer, args=(one_consumer, q, ))
            p.start()
    


    这就产生了一个问题, 如果消费者将队列中的包子取完了, 那么消费者就会一直阻塞在q.get()阶段,无发退出。
    所以就有了一个模块JoinableQueue

    from multiprocessing import Process, JoinableQueue
    import time
    
    
    def producer(name, q):
        for i in range(3):
            print("生产者%s生产了%s包子" %(name, i))
            q.put({name: i})
            time.sleep(0.5)
        q.join() # q没取完, 则进程便会在此阻塞住
    
    def consumer(name, q):
        while True:
            package = q.get()
            print("消费者%s吃了包子%s" %(name, package))
            time.sleep(1)
            q.task_done() # 给q.join()发送一个已经取走一个包子的消息
    
    if __name__ == '__main__':
        q = JoinableQueue()
        producers = ["xu", "sy", "songyuan"]
        consumers = ["guang", "chong",]
        # 生产者们
    
        p1 = Process(target=producer, args=("xu", q, ))
        p2 = Process(target=producer, args=("sy", q, ))
        p3 = Process(target=producer, args=("songy", q, ))
    
        # 消费者们
    
    
        c1 = Process(target=consumer, args=("guang", q, ))
        c2 = Process(target=consumer, args=("chong", q, ))
        c1.daemon = True
        c2.daemon = True
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
        p3.join()
    



  • 相关阅读:
    UVALive 3664:Guess(贪心 Grade E)
    uva 1611:Crane(构造 Grade D)
    uva 177:Paper Folding(模拟 Grade D)
    UVALive 6514:Crusher’s Code(概率dp)
    uva 11491:Erasing and Winning(贪心)
    uva 1149:Bin Packing(贪心)
    uva 1442:Cave(贪心)
    学习 linux第一天
    字符编码问题
    orm 正向查询 反向查询
  • 原文地址:https://www.cnblogs.com/zrxu/p/11755286.html
Copyright © 2011-2022 走看看