zoukankan      html  css  js  c++  java
  • 32 管道 共享数据 进程池

    一.管道  conn1,conn2=Pipe()
    EOFError  conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。
    如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    from multiprocessing import Process, Pipe
    
    def f(parent_conn,child_conn):
        #parent_conn.close() #不写close将不会引发EOFError
        while True:
            try:
                print(child_conn.recv())
            except EOFError:
                child_conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(parent_conn,child_conn,))
        p.start()
        child_conn.close()
        parent_conn.send('hello')
        parent_conn.close()
        p.join()
    
    引发EOFError
    EOFError
    from multiprocessing import Process,Pipe
    
    def consumer(p,name):
        produce, consume=p
        produce.close()
        while True:
            try:
                baozi=consume.recv()
                print('%s 收到包子:%s' %(name,baozi))
            except EOFError:
                break
    
    def producer(seq,p):
        produce, consume=p
        consume.close()
        for i in seq:
            produce.send(i)
    
    if __name__ == '__main__':
        produce,consume=Pipe()
    
        c1=Process(target=consumer,args=((produce,consume),'c1'))
        c1.start()
    
    
        seq=(i for i in range(10))
        producer(seq,(produce,consume))
    
        produce.close()
        consume.close()
    
        c1.join()
        print('主进程')
    pipe实现生产者消费者模型

    二.数据共享 Manager 并不提供数据安全机制,数据不安全,需要加锁解决这个问题
    m=Manager() m.dict({"name":"alex"})
    def func(dic):
        dic["count"]-=1
    
    if __name__ == '__main__':
        # m=Manager()
        # dic=m.dict({"num":1})
        # print(dic)
        # dic["num"]=0
        # print(dic)
    
        m=Manager()
        d=m.dict({"num":100})#共享数据
        p=Process(target=func,args=(d,))
    
        d["num"] = 0
        p.start()
        p.join()   #必须要在p.start 之后加p.join(), 在进程里面才能正常的打印
    数据共享1
    from multiprocessing import Manager,Process,Lock
    
    def func(dic,lock):
        with lock:#加锁,保证数据安全,因为数据共享的数据是不安全的
            dic["num"]-=1
    
    
    if __name__ == '__main__':
        m=Manager()#数据共享对象创建
        lock=Lock()
        dic=m.dict({"num":1000})
        lst=[]
        for i in range(100):
            p=Process(target=func,args=(dic,lock))
            p.start()
            lst.append(p)
        [pp.join() for pp in lst]#主进程等待所有子进程结束
    
        print(dic["num"])
    数据共享2

    三.进程池:Pool
    p=Pool(4)
    p.map(func,可迭代对象) 异步提交任务 自带close+join
    res=p.apply() 同步提交任务 直接可以得到返回值 res
    res=p.apply_async() 异步提交任务 得到结果对象 res 需要用res.get()拿到结果,
    res.get()有阻塞效果,需要把res循环加入列表中,在用get取值 需要先关p.close() p.join()
    p.close()不允许在向进程池中加任务

    回调函数 在进程池 apply_async()方法中,参数 callback=
    p=Pool(4)
    p.apply_async(func,args=(参数,),callback=回调函数)
    异步执行返回结果直接给回调函数,call_back_func 在主进程中执行

    from multiprocessing import Pool,Process
    import time
    
    def func(i):
        sum=0
        for j in range(5):
            sum+=i
    
    if __name__ == '__main__':
        start_time=time.time()
        lst=[]
        for i in range(100):
            p=Process(target=func,args=(i,))
            p.start()
            lst.append(p)
        [pp.join() for pp in lst]
        end_time=time.time()
        print(end_time-start_time)#非进程池方式所用的时间,大部分耗时在创建进程上  1.7742531299591064
    
        start=time.time()
        pool = Pool(4)
        pool.map(func,range(100)) #异步
        end=time.time()
        print(end-start)  #用进程池 方式所用时间  #0.11170
    进程池map方法-异步
    from multiprocessing import Pool
    import time,os
    def func(i):
        time.sleep(1)
        num=i
        print(num,os.getpid())
    
    
    if __name__ == '__main__':
        p=Pool(4)
    
        p.map(func,range(20))#异步进行  args=next(itrable)  无序的,  map自带close和join,所以不用写下面的close和join
        # p.close() #不允许再向进程池中加任务
        # p.join() #主进程 等待子进程结束
        print("主进程")
    map
    from multiprocessing import Pool
    
    def func(i):
        i+=1
        print(i)
        return i
    
    if __name__ == '__main__':
        p=Pool(4)
        res_lst=[]
        for i in range(10):
            # p.apply(func,args=(i,))#同步提交任务的 ,此方法用的比较少
            res=p.apply_async(func,args=(i,)) #异步提交机制 ,此方法用的比较多,res为结果对象
            # print(res.get())  不能在这res.get()否则会变成同步,所以需要先用一个空列表先装res后用get获取
            res_lst.append(res)
    
        p.close()#close必须加在join之前,不允许提交任务
        p.join()#等待子进程结束,主进程在往下执行
    
        # for res in res_lst:
        #     print(res.get())
    进程池apply-同步,apply_async异步
    from multiprocessing import Pool
    
    def func(x):
    
        return x*2
    
    def call_back_func(x):
        print(x)
    
    if __name__ == '__main__':
    
        p=Pool(4)
        p.apply_async(func,args=(3,),callback=call_back_func)
        p.close()
        p.join()
    回调函数,apply_async参数callback=
    import socket
    from multiprocessing import Process
    server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
    server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    ip_port=('127.0.0.1',8080)
    server.bind(ip_port)
    server.listen(5)
    
    def talk(conn,addr):
        while 1:
            try:
                client_msg=conn.recv(1024)
                print(addr[1],client_msg.decode("utf-8"))
                msg=input("服务端说:")
                if not msg:
                    break
                else:
                    conn.send(msg.encode("utf-8"))
            except Exception:
                break
    
    if __name__ == '__main__':
        while 1:
            conn,addr=server.accept()
            p=Process(target=talk,args=(conn,addr))
            p.start()
    socket服务端聊天
    import socket
    client=socket.socket()
    ip_port=('127.0.0.1',8080)
    client.connect(ip_port)
    while 1:
        msg=input("客户:")
        if not msg:
            break
        else:
            client.send(msg.encode("utf-8"))
            server_msg=client.recv(1024)
            print(server_msg.decode("utf-8"))
    socket客户端并发聊天
    import socket
    client=socket.socket()
    ip_port=('127.0.0.1',8080)
    client.connect(ip_port)
    while 1:
        msg=input("客户:")
        if not msg:
            break
        else:
            client.send(msg.encode("utf-8"))
            server_msg=client.recv(1024)
            print(server_msg.decode("utf-8"))
    进程池socket客户端
    import socket
    client=socket.socket()
    ip_port=("127.0.0.1",8080)
    
    client.connect(ip_port)
    while 1:
        client_msg=input(">>>>>").encode("utf-8")
        if not client_msg:
            continue
        if client_msg=="Q":
            break
        client.send(client_msg)
        msg=client.recv(1024)
        print("服务端:",msg.decode("utf-8"))
    
    client.close()
    进程池客户端并发聊天
  • 相关阅读:
    Redux React-redux 理解
    React ---- 浅谈ReactJs
    javascript --- 鼠标事件
    javaScript --- localStorage 本地存储
    javascript -- 对象(深拷贝、浅拷贝)
    javascript --- 数组输出数组中最大值最小值差值
    Css中未知 div 宽高度,水平垂直居中方法
    Windows下使用NCL(Cygwin模拟Linux环境)
    Qt加载网页(加载浏览器插件)和制作托盘后台运行(南信大财务报账看号)
    OpenGL超级宝典第5版&&GLSL法线变换
  • 原文地址:https://www.cnblogs.com/knighterrant/p/10038722.html
Copyright © 2011-2022 走看看