zoukankan      html  css  js  c++  java
  • IPC 机制的 Python 实现

    IPC机制

    from multiprocessing import Queue, Process
    
    """
    研究思路
        1.主进程跟子进程借助于队列通信
        2.子进程跟子进程借助于队列通信
    """
    def producer(q):
        q.put('很高兴为您服务')
    
    
    def consumer(q):
        print(q.get())
    
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=producer,args=(q,))
        p1 = Process(target=consumer,args=(q,))
        p.start()
        p1.start()
    

     


    多进程IPC与Python支持

    linux下进程间通信的几种主要手段简介:

    1. 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信;

    2. 信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数);

    3. 报文(Message)队列(消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。

    4. 共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。

    5. 信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。

    6. 套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。

    python 原生支持的有:1, 2, 6. 信号这个比较简单, 一种注册监听机制.本文不涉及

    管道是可以通过 (mutiprocessing.Pipe) 获得, 由c写的

    套接字这个通过 AF_UNIX协议 就可以完成啦, 和网络编程类似的~

    其实仔细想想还有第三种即, 利用文件, 生产者写到文件中, 消费者从文件中读.(简单化成一个生产者, 一个消费者, 否者竞争关系有点复杂.), 当然我们知道文件写入肯定很慢, 但是有多慢还是要测试一下的.

    工具函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #
    #   Author  :   zhangxiaolin
    #   E-mail  :   petelin1120@gmail.com
    #   Date    :   17/8/17 12:08
    #   Desc    :   ...
    # through pipe 269667.7903995848 KB/s
     
    data_size = 8 * 1024  # KB
     
     
    def gen_data(size):
        onekb = "a" * 1024
        return (onekb * size).encode('ascii')

      

    管道:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #
    #   Author  :   zhangxiaolin
    #   E-mail  :   petelin1120@gmail.com
    #   Date    :   17/8/17 12:08
    #   Desc    :   ...
    import multiprocessing
     
    from mutiprocesscomunicate import gen_data, data_size
     
     
    def send_data_task(pipe_out):
        for in range(data_size):
            pipe_out.send(gen_data(1))
        # end EOF
        pipe_out.send("")
        print('send done.')
     
     
    def get_data_task(pipe_in):
        while True:
            data = pipe_in.recv()
            if not data:
                break
        print("recv done.")
     
     
    if __name__ == '__main__':
        pipe_in, pipe_out = multiprocessing.Pipe(False)
        = multiprocessing.Process(target=send_data_task, args=(pipe_out,), kwargs=())
        p1 = multiprocessing.Process(target=get_data_task, args=(pipe_in,), kwargs=())
     
        p.daemon = True
        p1.daemon = True
        import time
     
        start_time = time.time()
        p1.start()
        p.start()
        p.join()
        p1.join()
        print('through pipe', data_size / (time.time() - start_time), 'KB/s')

      

    注意这个地方 Pipe(True)默认为双工的, 然而标准的是单工的, 单工缓冲区大小在OSX上有64KB, 设置缓存区是为了协调流入流出速率, 否者写的太快, 没人取走也是浪费. 结果: through pipe 99354.71358973449 KB/s

    file

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #
    import os
    from mutiprocesscomunicate import gen_data, data_size
     
     
    def send_data_task(file_name):
        # 是否同步写入磁盘, 如果同步写进去, 慢的一 b, 牛逼的是, 不同步写进去, 也可以读.操作系统厉害了.
        # os.sync()
        with open(file_name, 'wb+') as fd:
            for in range(data_size):
                fd.write(gen_data(1))
                fd.write(' '.encode('ascii'))
                # end EOF
            fd.write('EOF'.encode('ascii'))
        print('send done.')
     
     
    def get_data_task(file_name):
        offset = 0
        fd = open(file_name, 'r+')
        = 0
        while True:
            data = fd.read(1024)
            offset += len(data)
            if 'EOF' in data:
                fd.truncate()
                break
            if not data:
                fd.close()
                fd = None
                fd = open(file_name, 'r+')
                fd.seek(offset)
                continue
        print("recv done.")
     
     
    if __name__ == '__main__':
        import multiprocessing
     
        pipe_out = pipe_in = 'throught_file'
        = multiprocessing.Process(target=send_data_task, args=(pipe_out,), kwargs=())
        p1 = multiprocessing.Process(target=get_data_task, args=(pipe_in,), kwargs=())
     
        p.daemon = True
        p1.daemon = True
        import time
     
        start_time = time.time()
        p1.start()
        import time
     
        time.sleep(0.5)
        p.start()
        p.join()
        p1.join()
        import os
        print('through file', data_size / (time.time() - start_time), 'KB/s')
        open(pipe_in, 'w+').truncate()

      

    有两个点, 一个是, 打开文件之后, 如果有人在写入, 需要重新打开才能发现新内容, 另外需要设置offset,只读取新内容.

    !!!重点, 测试的时候这个速度有 through file 110403.02025891568 KB/s这么多, 甚至比管道还要高一点, 这是怎么回事呢?

    quite often file data is first written into the page cache (which is in RAM) by the OS kernel.

    并没有被写入文件, 而是被写到内存中了, 随后(不会通知你)被操作系统调度写入文件.操作系统比较厉害的是, 即使没有写到文件中, 读写仍然像写到文件中一样.

    如果设置了 os.sync(), 所有写操作立即执行, 会发现慢的…类似于卡死.

    本地socket

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #
    #   Author  :   zhangxiaolin
    #   E-mail  :   petelin1120@gmail.com
    #   Date    :   17/8/17 12:08
    #   Desc    :   ...
    import multiprocessing
    import os
    import socket
     
    from mutiprocesscomunicate import gen_data, data_size
     
     
    minissdpdSocket = '/tmp/m.sock'  # The socket for talking to minissdpd
     
     
    def send_data_task():
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     
            try:
                os.remove(minissdpdSocket)
            except OSError:
                pass
     
            server.bind(minissdpdSocket)
     
            server.listen(1)
     
            conn, _ = server.accept()
            with conn:
                for in range(data_size):
                    conn.send(gen_data(1))
                conn.shutdown(socket.SHUT_WR)
                print('send done.')
     
     
    def get_data_task():
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
            client.connect(minissdpdSocket)
            client.shutdown(socket.SHUT_WR)
            while True:
                data = client.recv(1024)
                if not data:
                    break
            print("recv done.")
     
     
    if __name__ == '__main__':
        = multiprocessing.Process(target=send_data_task, args=(), kwargs=())
        p1 = multiprocessing.Process(target=get_data_task, args=(), kwargs=())
     
        p.daemon = True
        p1.daemon = True
        import time
     
        start_time = time.time()
        p.start()
     
        p1.start()
        p.join()
        p1.join()
        print('through pipe', data_size / (time.time() - start_time), 'KB/s')

      

    本地socket, 会走传输层也就是被tcp或者udp封装一下,到网络层,网络层自己有路由表, 发现是本机, 则走本地回环接口, 不经过物理网卡, 发到接受队列中去.

    这个速度不稳定, 最快有through socket 261834.36615940317 KB/s

    参考

    1. 深刻理解Linux进程间通信(IPC)
    2. 文件没有直接写入磁盘
    3. pipe缓存区大小
    本文作者:陌雨翎
    本文链接:https://www.cnblogs.com/rianley/p/9014017.html
    关于博主:评论和私信会在第一时间回复。或者直接私信我。
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
    声援博主:如果您觉得文章对您有帮助,可以点击文章右下角推荐】一下。您的鼓励是博主的最大动力!

    在linux下的多个进程间的通信机制叫做IPC(Inter-Process Communication),它是多个进程之间相互沟通的一种方法。在linux下有多种进程间通信的方法:半双工管道、命名管道、消息队列、信号、信号量、共享内存、内存映射文件,套接字等等。使用这些机制可以为linux下的网络服务器开发提供灵活而又坚固的框架。

    1. 管道 (PIPE)
        管道实际是用于进程间通信的一段共享内存,创建管道的进程称为管道服务器,连接到一个管道的进程为管道客户机。一个进程在向管道写入数据后,另一进程就可以从管道的另一端将其读取出来。
    管道的特点:
    1、管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道;
    2、 只能用于父子进程或者兄弟进程之间( 具有亲缘关系的进程)。 比如fork或exec创建的新进程, 在使用exec创建新进程时,需要将管道的文件描述符作为参数传递给exec创建的新进程。 当父进程与使用fork创建的子进程直接通信时,发送数据的进程关闭读端,接受数据的进程关闭写端。
    3、单独构成一种独立的文件系统:管道对于管道两端的进程而言,就是一个文件,但它不是普通的文件,它不属于某种文件系统,而是自立门户,单独构成一种文件系统,并且只存在与内存中。
    4、数据的读出和写入:一个进程向管道中写的内容被管道另一端的进程读出。写入的内容每次都添加在管道缓冲区的末尾,并且每次都是从缓冲区的头部读出数据。
    管道的实现机制:

        管道是由内核管理的一个缓冲区,相当于我们放入内存中的一个纸条。管道的一端连接一个进程的输出。这个进程会向管道中放入信息。管道的另一端连接一个进程的输入,这个进程取出被放入管道的信息。一个缓冲区不需要很大,它被设计成为环形的数据结构,以便管道可以被循环利用。当管道中没有信息的话,从管道中读取的进程会等待,直到另一端的进程放入信息。当管道被放满信息的时候,尝试放入信息的进程会等待,直到另一端的进程取出信息。当两个进程都终结的时候,管道也自动消失。

    管道只能在本地计算机中使用,而不可用于网络间的通信。 

    2. 命名管道(FIFO)
    命名管道是一种特殊类型的文件,它在系统中以文件形式存在。这样克服了管道的弊端,他可以 允许没有亲缘关系的进程间通信。 

    管道和命名管道的区别:
    对于命名管道FIFO来说,IO操作和普通管道IO操作基本一样,但是两者有一个主要的区别,在命名管道中,管道可以是事先已经创建好的,比如我们在命令行下执行
    mkfifo myfifo
    就是创建一个命名通道,我们必须用open函数来显示地建立连接到管道的通道,而在管道中,管道已经在主进程里创建好了,然后在fork时直接复制相关数据或者是用exec创建的新进程时把管道的文件描述符当参数传递进去。
    一般来说FIFO和PIPE一样总是处于阻塞状态。也就是说如果命名管道FIFO打开时设置了读权限,则读进程将一直阻塞,一直到其他进程打开该FIFO并向管道写入数据。这个阻塞动作反过来也是成立的。如果不希望命名管道操作的时候发生阻塞,可以在open的时候使用O_NONBLOCK标志,以关闭默认的阻塞操作。

    3. 信号 (signal)
    信号机制是unix系统中最为古老的进程之间的通信机制,用于一个或几个进程之间传递异步信号。信号可以有各种异步事件产生,比如键盘中断等。shell也可以使用信号将作业控制命令传递给它的子进程。

    4. 消息队列(Message queues)
    消息队列是内核地址空间中的内部链表,通过linux内核在各个进程直接传递内容,消息顺序地发送到消息队列中,并以几种不同的方式从队列中获得,每个消息队列可以用 IPC标识符 唯一地进行识别。内核中的消息队列是通过IPC的标识符来区别,不同的消息队列直接是相互独立的。每个消息队列中的消息,又构成一个 独立的链表。
    消息队列克服了信号承载信息量少,管道只能承载无格式字符流。 

    5. 信号量(Semaphore)
    信号量是一种计数器,用于控制对多个进程共享的资源进行的访问。它们常常被用作一个锁机制,在某个进程正在对特定的资源进行操作时,信号量可以防止另一个进程去访问它。 
    信号量是特殊的变量,它只取正整数值并且只允许对这个值进行两种操作:等待(wait)和信号(signal)。(P、V操作,P用于等待,V用于信号) 
    p(sv):如果sv的值大于0,就给它减1;如果它的值等于0,就挂起该进程的执行 
    V(sv):如果有其他进程因等待sv而被挂起,就让它恢复运行;如果没有其他进程因等待sv而挂起,则给它加1 
    简单理解就是P相当于申请资源,V相当于释放资源 

    6. 共享内存(Share Memory)
    共享内存是在多个进程之间共享内存区域的一种进程间的通信方式,由IPC为进程创建的一个特殊地址范围,它将出现在该进程的地址空间(这里的地址空间具体是哪个地方?)中。其他进程可以将 同一段共享内存连接到自己的地址空间中。所有进程都可以访问共享内存中的地址,就好像它们是malloc分配的一样。如果一个进程向共享内存中写入了数据,所做的改动将立刻被其他进程看到。 
    共享内存是 IPC最快捷的方式,因为共享内存方式的通信没有中间过程,而管道、消息队列等方式则是需要将数据通过中间机制进行转换。共享内存方式直接将某段内存段进行映射,多个进程间的共享内存是同一块的物理空间,仅仅映射到各进程的地址不同而已,因此不需要进行复制,可以直接使用此段空间。
    注意:共享内存本身并没有同步机制,需要程序员自己控制。 

    消息队列、信号量以及共享内存的相似之处:
    它们被统称为XSI IPC,它们在内核中有相似的IPC结构(消息队列的msgid_ds,信号量的semid_ds,共享内存的shmid_ds),而且都用一个非负整数的标识符加以引用(消息队列的msg_id,信号量的sem_id,共享内存的shm_id,分别通过msgget、semget以及shmget获得),标志符是IPC对象的内部名,每个IPC对象都有一个键(key_t key)相关联,将这个键作为该对象的外部名。

    XSI IPC和PIPE、FIFO的区别:
    1、XSI IPC的IPC结构是在系统范围内起作用,没用使用引用计数。如果一个进程创建一个消息队列,并在消息队列中放入几个消息,进程终止后,即使现在已经没有程序使用该消息队列,消息队列及其内容依然保留。而PIPE在最后一个引用管道的进程终止时,管道就被完全删除了。对于FIFO最后一个引用FIFO的进程终止时,虽然FIFO还在系统,但是其中的内容会被删除。
    2、和PIPE、FIFO不一样,XSI IPC不使用文件描述符,所以不能用ls查看IPC对象,不能用rm命令删除,不能用chmod命令删除它们的访问权限。只能使用ipcs和ipcrm来查看可以删除它们。

    7. 内存映射(Memory Map)
    内存映射文件,是由一个文件到一块内存的映射。内存映射文件与 虚拟内存有些类似,通过内存映射文件可以保留一个地址的区域,
    同时将物理存储器提交给此区域,内存文件映射的物理存储器来自一个已经存在于磁盘上的文件,而且在对该文件进行操作之前必须首先对文件进行映射。使用内存映射文件处理存储于磁盘上的文件时,将不必再对文件执行I/O操作。 每一个使用该机制的进程通过把同一个共享的文件映射到自己的进程地址空间来实现多个进程间的通信(这里类似于共享内存,只要有一个进程对这块映射文件的内存进行操作,其他进程也能够马上看到)。
    使用内存映射文件不仅可以实现多个进程间的通信,还可以用于 处理大文件提高效率。因为我们普通的做法是 把磁盘上的文件先拷贝到内核空间的一个缓冲区再拷贝到用户空间(内存),用户修改后再将这些数据拷贝到缓冲区再拷贝到磁盘文件,一共四次拷贝。如果文件数据量很大,拷贝的开销是非常大的。那么问题来了,系统在在进行内存映射文件就不需要数据拷贝?mmap()确实没有进行数据拷贝,真正的拷贝是在在缺页中断处理时进行的,由于mmap()将文件直接映射到用户空间,所以中断处理函数根据这个映射关系,直接将文件从硬盘拷贝到用户空间,所以只进行一次数据拷贝。效率高于read/write。

    共享内存和内存映射文件的区别:
    内存映射文件是利用虚拟内存把文件映射到进程的地址空间中去,在此之后进程操作文件,就像操作进程空间里的地址一样了,比如使用c语言的memcpy等内存操作的函数。这种方法能够很好的应用在需要频繁处理一个文件或者是一个大文件的场合,这种方式处理IO效率比普通IO效率要高
      共享内存是内存映射文件的一种特殊情况,内存映射的是一块内存,而非磁盘上的文件。共享内存的主语是进程(Process),操作系统默认会给每一个进程分配一个内存空间,每一个进程只允许访问操作系统分配给它的哪一段内存,而不能访问其他进程的。而有时候需要在不同进程之间访问同一段内存,怎么办呢?操作系统给出了 创建访问共享内存的API,需要共享内存的进程可以通过这一组定义好的API来访问多个进程之间共有的内存,各个进程访问这一段内存就像访问一个硬盘上的文件一样。

    内存映射文件与虚拟内存的区别和联系:
    内存映射文件和虚拟内存都是操作系统内存管理的重要部分,两者有相似点也有不同点。
    联系:虚拟内存和内存映射都是将一部分内容加载到内存,另一部放在磁盘上的一种机制。对于用户而言都是透明的。
    区别:虚拟内存是硬盘的一部分,是内存和硬盘的数据交换区,许多程序运行过程中把暂时不用的程序数据放入这块虚拟内存,节约内存资源。内存映射是一个文件到一块内存的映射,这样程序通过内存指针就可以对文件进行访问。
    虚拟内存的硬件基础是分页机制。另外一个基础就是局部性原理(时间局部性和空间局部性),这样就可以将程序的一部分装入内存,其余部分留在外存,当访问信息不存在,再将所需数据调入内存。而内存映射文件并不是局部性,而是使虚拟地址空间的某个区域银蛇磁盘的全部或部分内容,通过该区域对被映射的磁盘文件进行访问,不必进行文件I/O也不需要对文件内容进行缓冲处理。

    8. 套接字 
    套接字机制不但可以单机的不同进程通信,而且使得跨网机器间进程可以通信。 
    套接字的创建和使用与管道是有区别的,套接字 明确地将客户端与服务器 区分开来,可以实现多个客户端连到同一服务器。 
    服务器套接字连接过程描述: 
    首先,服务器应用程序用socket创建一个套接字,它是系统分配服务器进程的类似文件描述符的资源。 接着,服务器调用bind给套接字命名。这个名字是一个标示符,它允许linux将进入的针对特定端口的连接转到正确的服务器进程。 然后,系统调用listen函数开始接听,等待客户端连接。listen创建一个队列并将其用于存放来自客户端的进入连接。 当客户端调用connect请求连接时,服务器调用accept接受客户端连接,accept此时会创建一个新套接字,用于与这个客户端进行通信。 
    客户端套接字连接过程描述: 
    客户端首先调用socket创建一个未命名套接字,让后将服务器的命名套接字作为地址来调用connect与服务器建立连接。 
    只要双方连接建立成功,我们就可以像操作底层文件一样来操作socket套接字实现通信。 

    原文:https://blog.csdn.net/a987073381/article/details/52006729





    如果这篇文章帮助到了你,你可以请作者喝一杯咖啡

  • 相关阅读:
    架构的上层是系统,是系统要素的组织形式
    计数与方法论、哲学
    网络编程--会话层、表示层、应用层
    面向中间件编程--草稿
    泛型:基于类型组合的算法和结构构建---数据结构与算法
    面向对象:消息机制更适合描述;
    类型的连接:实连接、虚连接
    数据库 = filesystem + transcation + dsl + dslengine
    一文看透浏览器架构
    代码的结合性:继承 扩展 组合 变换--swift暗含的四根主线
  • 原文地址:https://www.cnblogs.com/sddai/p/14487013.html
Copyright © 2011-2022 走看看