zoukankan      html  css  js  c++  java
  • 进程之间的通信与数据共享

    进程间通信

    IPC(Inter-Process Communication)

    队列 

    创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

    Queue([maxsize]) 
    创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
    Queue的实例q具有以下方法:
    
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.get_nowait( ) 
    同q.get(False)方法。
    
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    
    q.empty() 
    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    q.full() 
    如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    
    方法介绍
    队列方法
    '''
    multiprocessing模块支持进程间通信的两种主要形式:管道和队列
    都是基于消息传递实现的,但是队列接口
    '''
    
    from multiprocessing import Queue
    q=Queue(3)
    
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)
    q.put(3)
    q.put(3)
    # q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
               # 如果队列中的数据一直不被取走,程序就会永远停在这里。
    try:
        q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
        print('队列已经满了')
    
    # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
    print(q.full()) #满了
    
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
    try:
        q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    
    print(q.empty()) #空了
    
    单看队列用法
    单队列用法
    from multiprocessing import Process, Queue
    
    def consume(q):
        print('son-->', q.get())
        q.put('abc')
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=consume, args=(q,))
        p.start()
        q.put({'123': 123})
        # p.join()
        print('Foo--->', q.get())
    
    '''
    son--> {'123': 123}
    Foo---> abc
    
    
    不加join:
    Foo---> {'123': 123}
    '''
    

      

    生产者消费者模型介绍

    为什么要使用生产者消费者模型

    生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。

    同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者和消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,

    消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    这个阻塞队列就是用来给生产者和消费者解耦的

    import time
    import random
    from multiprocessing import Process, Queue
    
    
    def consumer(q, name):  # 处理数据
        while True:
            food = q.get()
            if food is None: break
            time.sleep(random.uniform(0.5, 1))
            print('%s吃了一个%s' % (name, food))
    
    
    def producer(q, name, food):  # 获取数据
        for i in range(10):
            time.sleep(random.uniform(0.3, 0.8))
            print('%s生产了%s %s' % (name, food, i))
            q.put(food+str(i))
    
    if __name__ == '__main__':
        q = Queue()
        c1 = Process(target=consumer, args=(q, 'luffy'))
        c2 = Process(target=consumer, args=(q, 'zoro'))
        c1.start()
        c2.start()
        p1 = Process(target=producer, args=(q, 'sanji', 'meat'))
        p2 = Process(target=producer, args=(q, 'sanji', 'wine'))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        q.put(None)  # 有几个consumer就需要放几个None
        q.put(None)
    View Code
    sanji生产了meat 0
    sanji生产了wine 0
    sanji生产了meat 1
    luffy吃了一个meat0
    sanji生产了wine 1
    zoro吃了一个wine0
    sanji生产了meat 2
    luffy吃了一个meat1
    sanji生产了wine 2
    sanji生产了meat 3
    zoro吃了一个wine1
    luffy吃了一个meat2
    sanji生产了wine 3
    sanji生产了meat 4
    sanji生产了wine 4
    luffy吃了一个meat3
    sanji生产了meat 5
    zoro吃了一个wine2
    sanji生产了wine 5
    luffy吃了一个wine3
    sanji生产了wine 6
    zoro吃了一个meat4
    sanji生产了meat 6
    luffy吃了一个wine4
    sanji生产了wine 7
    zoro吃了一个meat5
    sanji生产了meat 7
    luffy吃了一个wine5
    sanji生产了wine 8
    zoro吃了一个wine6
    sanji生产了meat 8
    luffy吃了一个meat6
    sanji生产了wine 9
    sanji生产了meat 9
    zoro吃了一个wine7
    luffy吃了一个meat7
    zoro吃了一个wine8
    luffy吃了一个meat8
    zoro吃了一个wine9
    luffy吃了一个meat9
    输出

    JoinableQueue使用

    就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

    参数介绍

    maxsize是队列中允许最大项数,省略则无大小限制。
    

    方法介绍

    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
    from multiprocessing import JoinableQueue
    # JoinableQueue 是一个类
    # put
    # get
    # task_done 通知队列已经有一个数据被处理了
    # q.join()  阻塞直到放入队列中的所有数据都被处理掉(有多少个数据就接收多少个数据)
    
    import time
    import random
    from multiprocessing import Process,JoinableQueue
    def consumer(q,name):
        # 处理数据
        while True:
            food = q.get()
            time.sleep(random.uniform(0.5,1))
            print('%s吃了一个%s' % (name, food))
            q.task_done()
    
    def producer(q,name,food):
        # 获取数据
        for i in range(10):
            time.sleep(random.uniform(0.3,0.8))
            print('%s生产了%s%s'%(name,food,i))
            q.put(food+str(i))
    
    if __name__ == '__main__':
        jq = JoinableQueue()
        c1 = Process(target=consumer, args=(jq, 'luffy'))
        c2 = Process(target=consumer, args=(jq, 'zoro'))
        c1.daemon = True
        c2.daemon = True
        c1.start()
        c2.start()
        p1 = Process(target=producer, args=(jq, 'sanji', 'pizza'))
        p2 = Process(target=producer, args=(jq, 'sanji', 'tea'))
        p1.start()
        p2.start()
        p1.join()  # p1生产者要先把所有的数据都放到队列中
        p2.join()
        jq.join()
    View Code
    sanji生产了tea0
    sanji生产了pizza0
    sanji生产了tea1
    luffy吃了一个tea0
    sanji生产了tea2
    zoro吃了一个pizza0
    sanji生产了pizza1
    luffy吃了一个tea1
    sanji生产了tea3
    sanji生产了pizza2
    zoro吃了一个tea2
    luffy吃了一个pizza1
    sanji生产了tea4
    zoro吃了一个tea3
    sanji生产了pizza3
    sanji生产了tea5
    luffy吃了一个pizza2
    zoro吃了一个tea4
    sanji生产了pizza4
    sanji生产了tea6
    sanji生产了tea7
    luffy吃了一个pizza3
    sanji生产了pizza5
    sanji生产了tea8
    zoro吃了一个tea5
    sanji生产了pizza6
    sanji生产了tea9
    luffy吃了一个pizza4
    sanji生产了pizza7
    zoro吃了一个tea6
    sanji生产了pizza8
    luffy吃了一个tea7
    zoro吃了一个pizza5
    sanji生产了pizza9
    luffy吃了一个tea8
    zoro吃了一个pizza6
    zoro吃了一个pizza7
    luffy吃了一个tea9
    luffy吃了一个pizza9
    zoro吃了一个pizza8
    
    Process finished with exit code 0
    输出

    管道

    如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

    队列是基于管道实现的
    管道是基于socket实现的
    队列 --> 管道--> socket
    队列 在进程之间数据安全  管道+锁实现的
    管道 进程之间数据不安全 且存取数据复杂
    

      

    from multiprocessing import Pipe, Process
    
    
    def consumer(left, right):
        left.close()
        while True:
            try:
                print(right.recv())
            except EOFError:
                break
    
    if __name__ == '__main__':
        left, right = Pipe()
        Process(target=consumer, args=(left, right)).start()
        right.close()
        for i in range(10):
            left.send('tea%s'%i)
        left.close()
    

     进程池

    为什么要有进程池?进程池的概念。

    在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

    在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

    主进程: 相当于生产者,只管向进程池提交任务。

    并不关心进程池是如何执行任务的。

    因此,并不关心是哪一个进程执行的这个任务

    进程池: 相当于消费者,负责接收任务,

    并将任务分配到一个空闲的进程中去执行

     

    为什么要有进程池
        开启过多的进程并不能提高你的效率,反而会降低效率
    
    计算型  充分占用cpu  多进程可以充分利用多核。
            适合开启多进程,但是不适合开启很多多进程
    IO型  大部分时间都在阻塞队列,而不是在运行状态
            根本不太适合开启多进程
    

      

    进程池方法一览

    multiprocess.Pool模块

    Pool([numprocess  [,initializer [, initargs]]]):创建进程池
    
    
      numprocess:要创建的进程数,如果省略,将默认为cpu_count()的值,可os.cpu_count()查看
      initializer:是每个工作进程启动时要执行的可调用对象,默认为None
      initargs:是要传给initializer的参数组
    p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
       
    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    
    import os,time
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(3)
        return n**2
    
    if __name__ == '__main__':
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
                                        # 但不管该任务是否存在阻塞,同步调用都会在原地等着
        print(res_l)
    进程池同步调用
    import os
    import time
    import random
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(random.random())
        return n**2
    
    if __name__ == '__main__':
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
                                              # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
                                              # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
                                              # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。  
            res_l.append(res)
    
        # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
        # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        p.close()
        p.join()
        for res in res_l:
            print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
    进程池异步调用
    def task(num):
        time.sleep(1)
        print('%s : %s'%(num,os.getpid()))
        return num**2
    
    if __name__ == '__main__':
        p = Pool()
        p.map(task,range(10))
    
    
    >>>
    0 : 9824
    1 : 12232
    2 : 18732
    3 : 7208
    4 : 9824
    5 : 12232
    6 : 18732
    7 : 7208
    8 : 9824
    9 : 12232
    

      

    利用进程池,实现并发服务器

    #Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
    #开启6个客户端,会发现2个客户端处于等待状态
    #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
    from socket import *
    from multiprocessing import Pool
    import os
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    
    def talk(conn):
        print('进程pid: %s' %os.getpid())
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        p=Pool(4)
        while True:
            conn,*_=server.accept()
            p.apply_async(talk,args=(conn,))
            # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
    
    server:进程池版socket并发聊天
    server
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    client

    使用进程池,爬取多个网站信息

    from multiprocessing import Pool
    import requests
    import os
    
    
    def get_page(url):
        """
        得到网页内容
        :param url: 
        :return: 
        """
        print('<%os> get [%s]' %(os.getpid(), url))
        response = requests.get(url)  # 得到地址  response响应
        return {'url': url, 'text': response.text}
    if __name__ == '__main__':
        p = Pool(4)
        urls = [
            'https://www.baidu.com',
            'https://www.python.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        obj_l= []
        for url in urls:
            obj = p.apply_async(get_page,args=(url,))
            obj_l.append(obj)
        p.close()
        p.join()
        print([obj.get() for obj in obj_l])
    View Code

    自定义线程池

    import threading
    import queue
    import time
    
    
    class MyThread(threading.Thread):
    
        def __init__(self, queue):
            super().__init__() # 对父类进行初始化
            self.queue = queue
            self.daemon = True  # 子线程跟着主线程一起退出
            self.start()
    
        def run(self):
            """
            1、让他始终去运行, 
            2、去获取queue里面的任务,
             3、然后给任务分配函数去执行(获取任务在执行)
            :return: 
            """
            while True:
                func, args, kwargs = self.queue.get() # 从队列中获取任务
                func(*args, **kwargs)
                self.queue.task_done()  # 计数器  执行完这个任务后  (队列-1操作)
        #
        # def join(self):
        #     self.queue.join()
    
    
    class MyPool(object):
        """
        在任务来到之前,提前创建好线程,等待任务
        """
        def __init__(self, num): # 线程数量
            self.num = num
            self.queue = queue.Queue()
            for i in range(self.num):
                MyThread(self.queue)
    
        def apply_async(self,func,args=(),kwargs={}):
            self.queue.put((func, args, kwargs))
    
        def join(self):
            self.queue.join() # 等待队列里面的任务处理完毕
    
    def func1():
        print(threading.current_thread().getName())
        time.sleep(2)
    
    if __name__ == '__main__':
        start = time.time()
        pool = MyPool(3)  # 实例化一个线程池
        for i in range(4):
            pool.apply_async(func1)
        pool.join()
        print('运行的时间{}秒'.format(time.time()- start))
    
    
    >>>结果
    Thread-1
    Thread-2
    Thread-3
    Thread-1
    运行的时间4.011605739593506秒
    View Code

      

      

  • 相关阅读:
    JAVA 设计模式 状态模式
    JAVA 设计模式 访问者模式
    JAVA 设计模式 策略模式
    python获取本机IP地址
    如何在python的字符串中输入纯粹的{}
    在终端打印有颜色的文本
    vim文本替换命令
    selenium WebDriverException: Message: unknown error: DevToolsActivePort file doesnt exist
    history显示历史操作记录,并显示操作时间
    linux下chrome和chromedriver的安装
  • 原文地址:https://www.cnblogs.com/eaoo/p/9703639.html
Copyright © 2011-2022 走看看