zoukankan      html  css  js  c++  java
  • python并发——进程间同步和通信(二)

    转自:https://github.com/chaseSpace/IPC-Inter-Process-Communication

    六种进程间通信方式的Python3.6代码实现

    1. msg_queue (消息队列)
    2. pipeline for single duplex (单工管道)
    3. pipeline for half duplex (半双工管道)
    4. name pipeline (命名管道)
    5. share memory (共享内存)
    6. semaphore (信号量)
    #消息队列
    from
    multiprocessing import Process from multiprocessing import Queue from time import sleep def write(q): for i in range(5): # sleep(1) print('put %s to queue..' % i) q.put(i) def read(q): while 1: sleep(0.5) v = q.get(True) print('get %s from queue..' % v) if __name__ == '__main__': q = Queue() p1 = Process(target=write, args=(q,)) p2 = Process(target=read, args=(q,)) p1.start() p2.start() p1.join() # 等待p1进程跑完后再往下执行 while not q.empty():# 队列不为空时阻塞在这 sleep(1) p2.terminate() # 结束p2进程
    '''
    #标准库提供的管道是普通管道,单工的,读写方向固定
    特点:只允许具有亲缘关系的两个进程通信
    '''
    import os, sys
    
    print("The child will write text to a pipe and ")
    print("the parent will read the text written by child...")
    
    # file descriptors r, w for reading and writing
    r, w = os.pipe()
    
    processid = os.fork() #fork 方法仅能在linux系统上运行,跨平台就用multiprocess
    
    print(processid)
    
    if processid:
        # This is the paunameent process
        # Closes file descriptor w
        os.close(w)
        r = os.fdopen(r)
        print("Parent reading")
        str = r.read()
        print("text =", str)
        sys.exit(0)
    else:
        # This is the child process
        os.close(r)
        w = os.fdopen(w, 'w')
        print("Child writing")
        w.write("Text written by child...")
        w.close()
        print("Child closing")
        sys.exit(0)
    '''
    使用多进程中的管道,它是半双工的,读写方向不固定
    特点:只允许具有亲缘关系的两个进程通信
    '''
    # 参考 http://www.cnblogs.com/konglinqingfeng/p/9696484.html
    from multiprocessing import Pipe, Process
    
    def func(conn1,conn2):
        conn2.close() #子进程只需使用connection1,故关闭connection2
        while True:
            try:
                msg = conn1.recv() #无数据则阻塞
                print(msg)
            except EOFError:  #对端关闭后,就无法继续接收了
                print(1111)
                conn1.close()
                break
    
    if __name__ == '__main__':
        conn1,conn2 = Pipe()#建立一个管道,管道返回两个connection,
        p = Process(target=func, args=(conn1,conn2))
        p.daemon = True  #子进程必须和主进程一同退出,防止僵尸进程
        p.start()
    
        conn1.close() #主进程只需要一个connection,故关闭一个
        for i in range(20):
            conn2.send('吃了吗') #主进程发送
        conn2.close()   #主进程关闭connection2
    '''
    使用FIFO(命名管道)来实现任意两个进程间的通信
    特点:
        1. 和无名管道一样,半双工
        2. 每个FIFO管道都与一个路径名相关联,类似一个文件
        3. 进程间通过读写FIFO来通信
    **注意:这种方式对子进程的生命周期管理并不方便,不建议在python中使用**
    '''
    import os, time
    pipe_name = 'pipe_test'
    
    def child():
        #子进程负责写
        pipeout = os.open(pipe_name, os.O_WRONLY)  # 必须以只写模式
        counter = 0
        while counter < 10: #不要写死循环,控制不好就是僵尸进程
            print('write Number %03d
    ' % counter)
            os.write(pipeout, b'Number %03d
    ' % counter)   #这里是非阻塞的
            counter +=1
            time.sleep(1)
    
    def parent():
        #父进程负责读
        # pipein = open(pipe_name, 'r') #也可以用内置函数open,和操作文件一样
        fd = os.open(pipe_name, os.O_RDONLY)  # 必须以只读模式,获取一个file descriptor
        #fd是一个非负整数,只在单个进程中有意义,单个进程中一个fd对应一个待操作文件。
        while True:
            # line = pipein.readlines()[:-1]
            line = os.read(fd, 20)  # 每次读20个字节,读出后FIFO管道中会马上清除数据
            #open和os.read都是非阻塞的
            if line:
                print('Parent %d got "%s" at %s' % (os.getpid(), line, time.time()))
            else:
                print('Parent %d no data'% os.getpid())
            time.sleep(1)
    
    if not os.path.exists(pipe_name):
        os.mkfifo(pipe_name)  #创建FIFO管道
    
    pid = os.fork()  #创建子进程
    if pid:
        parent()
    else:
        child()
    
    '''
    这种模式允许多个进程(>=2)间通信
    '''
    '''
    进程间通信之---信号量
    特点:不传输数据,用于多进程的同步
    参考: https://blog.csdn.net/sinat_27864123/article/details/78490164
    '''
    
    import multiprocessing
    import time
    
    def consumer(s):
        s.acquire() #信号量的使用与互斥量类似
        print(multiprocessing.current_process().name+' 正在执行')
        time.sleep(2) #执行时会发现同一时刻只有2个进程在执行
        s.release()
        print(multiprocessing.current_process().name+' release')
    
    
    if __name__ == '__main__':
        s = multiprocessing.Semaphore(5) #把信号量值设置为2,一次提供给两个消费者服务
        for i in range(5): #启5个进程
            p = multiprocessing.Process(target=consumer,args=(s,))
            p.daemon = True #跟随主进程死亡,如果生产环境使用,最好加上这个,否则主进程结束时,子进程还在运行的话,就会造成孤儿进程(内存泄露)
            p.start()
    
        time.sleep(3) #等一下子进程
        print('main end')
    '''
    进程间通信之---共享内存
    特点:
        1. 最快的IPC方式
        2. 因为多个进程可以同时操作,所以需要进行同步
        3. 可通过与信号量或锁结合使用实现多个进程间同步
    共享方案:
        a. 消息队列
        b. 文件映射(为了在进程间共享内存,内核专门留有一个内存区,主进程将需要共享的数据映射到这块内存中,其他进程访问这个共享内存)
    '''
    
    from multiprocessing import Process,Queue,Semaphore,current_process
    from time import sleep
    my_semaphore = Semaphore(2)
    
    '''
    错误的示例!
    '''
    global_list = list(range(5)) #进程间不共享内存,所以每个进程访问的是这个对象的副本,并不是同一个对象
    
    def change_global_list(global_list,s):
        s.acquire()
        print('process_id:%s before change'% current_process().name, global_list)
        global_list.pop()
        sleep(1)
        s.release()
        print('process_id:%s after change'% current_process().name,global_list)
    
    
    if __name__ == '__main__':
        from multiprocessing import Manager, freeze_support
        manager = Manager()
        print(dir(manager))#'address', 'connect', 'dict', 'get_server', 'join', 'list', 'register', 'shutdown', 'start'
        global_list = manager.list()
    
        global_list.extend(list(range(5)))
    
        for i in range(5):
            p = Process(target=change_global_list,args=(global_list,my_semaphore))
            p.daemon = True
            p.start()
    
        sleep(3)  # 等一下子进程
        print('over!')
  • 相关阅读:
    C#實現列舉DB中所有StoredProcedur
    Configure the browserJMeter
    DB 字段
    SQL Server 角色類型
    將N行數據合併成一行顯示
    性能计数器
    一千萬條以上記錄分頁數據庫優化
    SPFA静态链表优化+队列储存
    多源最短路pku1125
    图的连通性——folyd检验
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/12669124.html
Copyright © 2011-2022 走看看