zoukankan      html  css  js  c++  java
  • 进程数据共享-进程池

    数据共享

    Manager  内部管理了很多数据类型,并不是所有的数据类型都是用来做数据分享,只是顺便包含了能够处理数据共享问题的数据类型 list dict

    列表/字典  自带的方法基本都是数据安全的,但是对其中的元素进行+= -= *=  /=   都是数据不安全的

    from multiprocessing import Manager,Process,Lock
    
    def func(dic,lock):
        with lock:
            dic['count'] -= 1
    
    if __name__ == '__main__':
        m = Manager()
        lock = Lock()
        dic = m.dict({'count':100})
        p_l = []
        for i in range(100):
            p = Process(target=func,args=(dic,lock))
            p.start()
            p_l.append(p)
        for p in p_l:p.join()
        print(dic)

    数据共享:速度很慢,牵扯到锁的问题,一般使用数据库进行数据共享,利用第三方工具--消息中间件(消息队列)进行通信

    进程池

    在一台四核计算机上,如果有上百个没有IO阻塞的计算任务,开启上百个进程会使效率降低,可以利用进程池,开启几个进程,用过异步提交执行任务

    异步提交

    import time
    from multiprocessing import Pool,Process
    
    def func(i):
        i * i
    
    if __name__ == '__main__':
        start = time.time()
        p = Pool()    #括号里的参数默认是cpu的个数,开启cpu个数的进程
        for i in range(20):
            p.apply_async(func,(i,))   #apply提交任务,async异步
        p.close()           #关闭进程池,不允许再继续向这个池子中添加任务了
        p.join()            #阻塞  直到已经被提交到进程池中的任务全部结束
        print(time.time() - start)

    获取返回值

    import time
    from multiprocessing import Pool
    
    def func(i):
        i * i
        time.sleep(1)
        return 'i'* i
    
    if __name__ == '__main__':
        p = Pool()
        ret_l = []
        for i in range(50):
            ret = p.apply_async(func,(i,))     
            ret_l.append(ret)    # ret接收返回值,并存放在列表里
        for ret in ret_l:
            print(ret.get())

    map方法

    import time
    from multiprocessing import Pool
    
    def func(i):
        i * i
        time.sleep(1)
        return 'i'* i
    
    if __name__ == '__main__':
        p = Pool()
        ret_l = p.map(func,range(5))   #map就是一种简便的apply_async的方式,并且内置了close和join的功能,参数必须规定个数
        for ret in ret_l:
            print(ret)

    同步提交

    按照顺序一个一个执行,而且还有关于进程的开销,反而降低效率,不推荐使用

    import os
    import time
    from multiprocessing import Pool
    
    def func(i):
        time.sleep(1)
        print(i,os.getpid())
    
    if __name__ == '__main__':
        p = Pool()
        for i in range(20):
            p.apply(func,(i,))     # 同步提交

    回调函数

    # 利用网址得到网页信息
    import os
    from urllib import request
    from multiprocessing import Pool
    def parser_page(content):
        print(os.getpid())
        print('len : ',len(content))
    
    def get_url(url):
        ret = request.urlopen(url)
        content = ret.read().decode('utf-8')
        return content
    
    if __name__ == '__main__':
        print(os.getpid())
        url_lst = [
            'http://www.cnblogs.com/Eva-J/articles/8253549.html',   
            'http://www.cnblogs.com/Eva-J/articles/8306047.html',   
            'http://www.baidu.com',
            'http://www.sogou.com',
            'https://www.cnblogs.com/Eva-J/p/7277026.html'
        ]
        p = Pool()  
        # ret_l = []
        for url in url_lst:
            ret = p.apply_async(get_url,(url,),callback=parser_page)  # 异步的方式提交任务
            # callback 将get_url的返回值给parser_page,并且立即执行parser_page
            # 如果异步的任务执行完毕之后需要立即做另外的操作,推荐使用collback和生产者消费者模型
            # ret_l.append(ret)
        p.close()              # 不获取结果就这么写
        p.join()
        # for ret in ret_l:    # 要获取结果就这么写
        #     res = ret.get()
        #     parser_page(res)

    利用多进程实现并发的socketserver

    # server端:
    from socket import *
    from multiprocessing import Pool
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8080))
    server.listen()
    
    def talk(conn):
        while True:
            try:
                msg = conn.recv(1024)
                if not msg: break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        p = Pool()
        while True:
            conn,addr = server.accept()
            p.apply_async(talk,args=(conn,))
    
    
    # client端: import socket client = socket.socket() client.connect(('127.0.0.1',8080)) while True: msg = input('>>>').strip() if not msg: continue client.send(msg.encode('utf-8')) msg = client.recv(1024) print(msg.decode('utf-8'))
  • 相关阅读:
    51.try块和catch块中return语句的执行
    17. 处理日期
    16.查找特定字符出现的次数
    15.字符串长度
    14.字符串拆分
    13.字符串比较
    12.幸运抽奖
    11.使用枚举
    10.获取系统时间
    MSSQL 判断临时表是否存在
  • 原文地址:https://www.cnblogs.com/sandy-123/p/10458402.html
Copyright © 2011-2022 走看看