zoukankan      html  css  js  c++  java
  • Python9-进程池-day38

    复习

    # 信号量
    from multiprocessing import  Semaphore
    # 用锁的原理实现的,内置了一个计数器
    # 在同一个事件,只能有指定数量的进程执行某一段被控制住的代码
    # 事件
    # wait阻塞受到事件控制的同步组件
    # 状态  True  Flase  is_set
    #         true--》false  用clear()
    #         false --->true  用set()
    # wait方法 状态为true不阻塞  状态为false的时候阻塞
    
    # 队列
    # Queue
    #     put     当队列满的时候阻塞等待队列有空位置
    #     get     当队列空的时候阻塞等待队列有数据
    #     full empty  不完全准确
    
    # JoinableQuere
    # task_done   与get连用
    # join        与put连用

    管道

    from multiprocessing import Pipe,Process
    def func(conn1,conn2):
        conn2.close()
        while True:
            try:
                msg = conn1.recv()
                print(msg)
            except EOFError:
                conn1.close()
                break
    
    if __name__ == '__main__':
        conn1,conn2 = Pipe()
        Process(target=func,args=(conn1,conn2)).start()
        conn1.close()
        for i in range(20):
            conn2.send('吃了吗')
        conn2.close()
    from multiprocessing import  Pipe,Process
    import time,random
    def producer(con,pro,name,food):
        con.close()
        for i in range(4):
           time.sleep(random.randint(1,3))
           f = '%s生产%s%s'%(name,food,i)
           print(f)
           pro.send(f)
        pro.close()
    def consumer(con,pro,name):
        pro.close()
        while True:
            try:
               food =  con.recv()
               print('%s吃了%s'%(name,food))
               time.sleep(random.randint(1,3))
            except EOFError:
                con.close()
                break
    if __name__ == '__main__':
        con,pro = Pipe()
        p = Process(target=producer,args = (con,pro,'egon','泔水'))
        p.start()
        c = Process(target=consumer,args = (con,pro,'alex'))
        c.start()
        con.close()
        pro.close()
    进程之间的数据共享
    from multiprocessing import  Manager,Process,Lock
    
    def main(dic,lock):
        lock.acquire()
        dic['count'] -= 1
        lock.release()
    
    
    if __name__ == '__main__':
        m = Manager()
        l = Lock()
        dic = m.dict({'count':100})
        p_list = []
        for i in range(50):
            p = Process(target=main,args=(dic,l))
            p.start()
            p_list.append(p)
        for i in p_list: i.join()
        print('主进程:',dic)

     进程池

    # 为什么有进程池
    # 效率
    # 每开启进程,开启属于这个进程的内存空间
    # 寄存器 堆栈 文件
    # 进程过多,操作系统调度进程
    # 进程池
    # python中的先创建一个属于进程的池子
    # 这个池子指定能存放多少个进程
    # 先将这些进程创建好
    from multiprocessing import Pool
    import  os,time
    def func(n):
        print('start func%s'%n,os.getpid())
        time.sleep(1)
        print('end func%s'%n,os.getpid())
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            p.apply_async(func,args=(i,))
        p.close()  #结束进程池接受任务
        p.join()  #感知进程池中的任务执行结束

     socket_server-进程池

    #server
    import socket
    from multiprocessing import Pool
    
    def func(conn):
        conn.send(b'hello')
        print(conn.recv(1024).decode('utf-8'))
        conn.close()
    if __name__ == '__main__':
        p = Pool(5)
        sk = socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            p.apply_async(func,args=(conn,))
        sk.close()
    #client
    import socket
    
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    ret = sk.recv(1024).decode('utf-8')
    print(ret)
    msg = input('>>>').encode('utf-8')
    sk.send(msg)
    sk.close()

     进程池返回值

    # p.map(funcname,iterable)  默认异步的执行任务,自带close和join
    # p.apply 同步调用
    # p.apply_async 异步调用 和主进程完全异步 需要手动close和join
    from multiprocessing import Pool
    def func(i):
        return i*i
    
    if __name__ == '__main__':
        p = Pool(5)
        for i in range(10):
            res = p.apply(func,args=(i,))  #apply的结果就是func的返回值
            print(res)
    import time
    from multiprocessing import Pool
    def func(i):
        time.sleep(0.5)
        return i*i
    
    if __name__ == '__main__':
        p = Pool(5)
        res_list = []
        for i in range(10):
            res = p.apply_async(func,args=(i,))  #
            res_list.append(res)
        for res in res_list:print(res.get())
    #map
    import time
    from multiprocessing import Pool
    def func(i):
        time.sleep(0.5)
        return i*i
    
    if __name__ == '__main__':
        p = Pool(5)
        ret = p.map(func,range(10))
        print(ret)
    
    
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

     进程池的回调函数

    from multiprocessing import Pool
    
    def func1(n):
        print('in func1')
        return n*n
    def func2(nn):
        print('in func2')
        print(nn)
    
    if __name__ == '__main__':
        p = Pool(5)
    
        p.apply_async(func1,args=(10,),callback=func2)
        p.close()
        p.join()

    in func1
    in func2
    100

     
    from multiprocessing import Pool
    import os
    def func1(n):
        print('in func1',os.getpid())
        return n*n
    def func2(nn):   #参数只能是func1的返回值
        print('in func2',os.getpid())
        print(nn)
    
    if __name__ == '__main__':
        print('主进程: ',os.getpid())
        p = Pool(5)
    
        p.apply_async(func1,args=(10,),callback=func2)
        p.close()
        p.join()
    
    主进程:  11172
    in func1 11760
    in func2 11172
    100
  • 相关阅读:
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    数据库作业15:关系数据理论
    IPv6 — 实践
    Provisional headers are shown 说明走了缓存没有发送请求
  • 原文地址:https://www.cnblogs.com/zhangtengccie/p/10392478.html
Copyright © 2011-2022 走看看