zoukankan      html  css  js  c++  java
  • python之管道, 事件, 信号量, 进程池

    管道:双向通信 2个进程之间相互通信

    from  multiprocessing import Process, Pipe
    
    def f1(conn):
        from_zjc_msg = conn.recv()
        print('我是子进程')
        print('来自主进程的消息:', from_zjc_msg)
    
    if __name__ == '__main__':
        conn1, conn2 = Pipe() # 创建一个管道对象, 全双工, 返回管道的两端, 但是一端发送的消息,只能另一端接受;自己这一端无法接收
        # 可以将一端或两端发送给其他的进程, 那么多个进程之间就这一通过这一管道进行通信
        p1 = Process(target=f1, args=(conn2, ))
        p1.start()
        conn1.send('有点困了')
    
        print('我是主进程')

    Event用法:

    from mutiprocessing import Event  # 导入Event模块

    event=Event() #设置一个事件对象, 初始标志位是False

    event.set() # 将标志位改为True

    event.clear() # 将标志位改为False

    event.wait()  # 等待设置标志位, 直到为True,再 继续向下执行

    from multiprocessing import Process, Event
    
    e = Event() # 创建事件对象, 这个对象的初始状态为False
    print('e的状态是', e.is_set())
    
    print('程序运行到了这里')
    
    e.set() # 将e的状态改为True
    print('e的状态是', e.is_set())
    # e.clear() # 将e的状态改为False
    
    e.wait() # e这个事件对象如果值为False, 就在此处等待.
    print('程序过了wait')

    基于事件的进程通信

    import time
    from multiprocessing import Process, Event
    
    def f1(e):
        time.sleep(2)
        n = 100
        print('子进程计算结果为:', n)
        e.set()
    
    if __name__ == '__main__':
        e = Event()
        p = Process(target=f1, args=(e, ))
        p.start()
    
        print('主进程等待.....')
        e.wait()
        print('结果已经出来, 可以拿到该值')

    信号量(Semaphore),用于控制线程并发数

    需要导入模块: 

    from multiprocessing import Process, Semaphore
    重要方法有2个 对象.acquire() 和 对象.release()
    import time
    import random
    from multiprocessing import Process, Semaphore
    
    def f1(i, s):
        s.acquire()
    
        print('%s号嘉宾进去了' % i)
        time.sleep(random.randint(1, 3))
        print('%s号嘉宾出来了' % i)
        s.release()
    
    if __name__ == '__main__':
        s = Semaphore(2) # 计数器2, acquire 一次减一, 为0, 其他人等待, release加1
        for i in range(5):
            p = Process(target=f1, args=(i, s))
            p.start()

    进程池:定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。
    如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,
    那么同一时间最多有固定数量的进程在运行。
    # # 进程池和多进程执行时间的对比
    import time
    from multiprocessing import Process, Pool
    
    def f(n):
        for i in range(5):
            n += i
    if __name__ == '__main__':
        # 统计进程池执行100个任务的时间
        s_time = time.time()
        pool = Pool(4) # 里面这个参数是指定进程池中有多少个进程, 4表示4个进程, 如果不传参, 默认开启的进程数一般是cpu的个数
        pool.map(f, range(100)) # 参数数据必须是可迭代的, 异步提交任务, 自带join功能
        e_time = time.time()
        dif_time = e_time - s_time
    
        # 统计100个进程, 来执行100个任务的执行时间
        p_s_t = time.time() # 多进程起始时间
        p_list = [ ]
        for i in range(100):
            p = Process(target=f, args=(i,))
            p.start()
            p_list.append(p)
        [pp.join() for pp in p_list]
        p_e_t = time.time()
        p_dif_t = p_e_t - p_s_t
        print('进程池的执行时间:', dif_time)
        print('多进程的执行时间:', p_dif_t)

    进程池同步方法:

    import time
    from multiprocessing import Process, Pool
    
    def f1(n):
        # print(n)
        time.sleep(2)
        return n * 2
    
    if __name__ == '__main__':
        pool = Pool(2)
    
        for i in range(5):
            print('xxxxx')
            res = pool.apply(f1, args=(i, ))
            print(res)

    结果: 先执行xxxxx  过2s执行0, xxxxx  过2s执行2 xxxxx  
    进程池异步:
    import time
    from multiprocessing import Process, Pool
    
    def f1(n):
        time.sleep(2)
        return n * 2
    
    if __name__ == '__main__':
        pool = Pool(2)
        res_list = [ ]
        for i in range(5):
            print('xxxxx')
            res = pool.apply_async(f1, args=(i,))
            res_list.append(res)
        for i  in res_list:
            print(i.get())
    
    

    进程池的回调函数:

    import os
    from multiprocessing import Pool, Process
    
    def f1(n):
        print('进程池里面的进程pid', os.getpid())
        print(n)
        return 2 * n
    
    def f2(n):
        print('回调函数里面的进程pid', os.getpid())
        print(n)
    
    if __name__ == '__main__':
        pool = Pool(4)
        res = pool.apply_async(f1, args=(5,), callback=f2)
        pool.close()
        pool.join()
        print('主程序里面的进程pid', os.getpid())

     
  • 相关阅读:
    【原创】Spring连接、事务代码分析
    【原创】某银行综合报表系统优化片段
    【参考】将表建在指定的数据文件上
    【转载】犀利的 oracle 注入技术
    【参考】JDBC执行存储过程的四种情况
    【原创】不重启was server重新加载应用class文件
    【学习】java下实现调用oracle的存储过程和函数
    【参考】.class文件的JDK编译版本查看
    【转载】java编程中'为了性能'一些尽量做到的地方
    【转载】计算汉字相似程度
  • 原文地址:https://www.cnblogs.com/q455674496/p/10250899.html
Copyright © 2011-2022 走看看