zoukankan      html  css  js  c++  java
  • 进程池

    进程池:

    1.进程池初识,2.效率比较,3.同步和异步,4.进程池的返回值和回调函数,5.socket并发的服务端


    为什么要有进程池?进程池的概念。

    在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

    在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

    multiprocess.Pool模块:

    Pool([numprocess  [,initializer [, initargs]]]):创建进程池
    1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3 initargs:是要传给initializer的参数组
    参数介绍
    p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
       
    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    
    主要方法
    主要方法
    方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    obj.ready():如果调用完成,返回True
    obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    obj.wait([timeout]):等待结果变为可用。
    obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
    其他方法(了解)

    进程池初识:

    #进程池
    #开辟一个空间,固定放着固定数目的进程,比如5个。现在有50个任务,每次任务一执行,就去池子里找这5个进程,一次执行五个任务,
    #执行完的任务,把进程放回进程池,供其他任务继续使用。从而大大减少了进程的开启和销毁的内存占用,提高了效率
    
    #更高级的进程池  #···python里没有
    #假设 进程池最少的时候有n个进程,最多是m个
    #当我们访问网站,用户量多的时候,进程池的进程,就会根据算法,慢慢增加,增加到m个
    #当访问量,或者任务下降,进程池就会 减减减  减到最少 n个
    
    
    #信号量:是同一段代码,只能同一时间由几个进程执行,但是这些进程数都是要悉数开启和销毁的,如果有200个进程,这些进程都会占用内存
    #进程池:进程都是固定的,不用每次都开启和销毁

    进程池和普通多进程的效率比较:

    from multiprocessing import Pool,Process
    import time
    
    def func(n):
        for i in range(10):
            print(n+1)
    
    if __name__ == '__main__':
        pool = Pool(5) #一般进程数是 cpu核数+1
        start = time.time()
        pool.map(func,range(100)) #开启了一百个任务,map是自带join,close的
        #map的功能最多就只能这样了,如果想传更多参数,可以iterable里可以传tuple,list..
        t1 = time.time() - start
    
        start2 = time.time()
        p_lst = []
        for i in range(100):
            p = Process(target=func,args=(i,))
            p.start()
            p_lst.append(p)
        [p.join() for p in p_lst]
        t2 = time.time() - start2
        print(t1,t2) # 0.19 VS  2.61 进程池完胜
    进程池和多进程-效率比较

    同步: apply   异步: apply_async(func, args=(args1,args2,...),callback=func2):

    import os
    import time
    from multiprocessing import Pool,Process
    
    def func(n):
        print('%s 开始了~~~'%n,os.getpid())
        time.sleep(1)
        print('%s 结束了'%n,os.getpid())
    
    if __name__ == '__main__':
        pool = Pool(5)
        for i in range(10):
            # pool.apply(func,args=(i,))  #apply 进程池的同步调用(自然不需要close,join),要等待进程中的任务完结,才会执行下一个任务.基本上不会用
            pool.apply_async(func,args=(i,))#apply_async异步,  一但有任务结束了,它所使用的进程(pid)马上会被其他任务使用
                                            #需要close和join,不然会导致主代码的结束,从而结束进程中的任务
        pool.close() #结束进程池接收任务
        pool.join()  #感知所有任务的结束,然后在执行主程序接下来的代码
    同步和异步-进程池

    #对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC 进程间通信:队列(子进程put,主进程get),管道
    #但是进程池 是可以直接接收子进程的返回值的
    #apply的结果就是func的返回值

    #通过get方法,获取 apply_async的返回值

    #对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC  进程间通信:队列(子进程put,主进程get),管道
    #但是进程池 是可以直接接收子进程的返回值的
    #apply的结果就是func的返回值
    
    from multiprocessing import Pool
    import time
    
    def func(n):
        time.sleep(0.2)
        return n*n
    
    if __name__ == '__main__':
        pool = Pool(5)
        ret_lst=[]
        for i in range(10):
            # ret = pool.apply(func, args=(i,))
            ret = pool.apply_async(func,args=(i,))
            #print(ret.get()) #get会阻塞着,直到拿到ret的结果。每个ret马上跟着一个get,所以会按顺序一个个print
                            #一旦异步提交了一个任务,返回一个对象,这个对象获得一个get方法,获取这个任务的返回值,所以get可以替代close和join
            ret_lst.append(ret)
        for ret in ret_lst:print(ret.get())
        # pool.close()
        # pool.join()
    
        # ret = pool.map(func,range(10))
        # print(ret)  #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        #因为map自带close和join,它会把所有的值都计算好了,再打印出来。
        #所以虽然map简单,但是apply_async可以实时提交,还是要用后者
    进程池的返回值

    回调函数不能传参数,唯一的参数就是注册函数的返回值

    回调函数
    import os
    from multiprocessing import Pool
    def func1(n):
        print('in func1',os.getpid())
        return n*n
    
    def func2(nn):
        print('in func2',os.getpid())
        print(nn)
    
    if __name__ == '__main__':
        print('主进程 :',os.getpid())
        p = Pool(5)
        for i in range(10):
            p.apply_async(func1,args=(10,),callback=func2) #回调函数不能传参数,唯一的参数就是注册函数的返回值
        p.close()                                          #回调函数是在主进程里执行的
        p.join()
    进程池的回调函数
    import requests
    from multiprocessing import Pool
    
    def get(url):  #让并发进程去获取url,把网络延时交给并发,把数据处理返回给主进程
        res = requests.get(url)
        if res.status_code == 200:
            return url,res.content.decode('utf8')
    
    def call_back(args):
        url,content = args
        print(url,len(content))
    
    if __name__ == '__main__':
        url_lst = [
            'https://www.cnblogs.com/',
            'http://www.baidu.com',
            'https://www.sogou.com/',
            'http://www.sohu.com/',
        ]
        p = Pool(5)
        for url in url_lst:
            p.apply_async(get,args=(url,),callback=call_back)
        p.close()
        p.join()
    小爬虫-回调函数
    url = r'https://movie.douban.com/subject/26281899/?from=subject-page'
    import requests
    res = requests.get(url)
    print(res) #返回为一个 <Response [200]>
    print(res.status_code) #状态码:200
    print(res.content)      #二进制的网页源码 (无格式版的)
    print(res.text)         #网页源码
    
    # from urllib.request import urlopen
    # ret = urlopen(url)
    # print(ret.read().decode('utf8')) #获得一个跟原网页源码一样格式的内容
    requests模块
        # apply
            # 同步的:只有当func执行完之后,才会继续向下执行其他代码
            # ret = apply(func,args=())
            # 返回值就是func的return
        # apply_async
            # 异步的:当func被注册进入一个进程之后,程序就继续向下执行
            # apply_async(func,args=())
            # 返回值 : apply_async返回的对象obj
            #          为了用户能从中获取func的返回值obj.get()
            # get会阻塞直到对应的func执行完毕拿到结果
            # 使用apply_async给进程池分配任务,
            # 需要先close后join来保持多进程和主进程代码的同步性
    复习

    使用进程池来实现socket服务端的并发:

    import socket
    from multiprocessing import Pool
    
    def server(conn):
        conn.send(b'hi')
        ret = conn.recv(1024).decode('utf8')
        print(ret)
        conn.send(b'hello')
        conn.close()
    
    if __name__ == '__main__':
        pool = Pool(5)
        sk = socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            pool.apply_async(server,args=(conn,))
        sk.close()
        # pool.close()
        # pool.join()
    server端
    import socket
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    
    print(sk.recv(1024))
    msg = input('>>>> ')
    sk.send(msg.encode('utf8'))
    msg = sk.recv(1024).decode('utf8')
    print(msg)
    # sk.close()
    client端
  • 相关阅读:
    Sun开发的JINI技术在网络中的应用
    让memcached和mysql更好的工作
    Nginx+Tomcat+memcached负载均衡实现session共享
    Nginx 简单的负载均衡配置示例
    数据库sharding(scale up to scale out)
    docker专题(2):docker常用管理命令(上)
    UMDF
    编程精粹:编写高质量的C语言代码———笔记一
    子矩阵中共享1的最长对角线
    Print the numbers between 30 to 3000.
  • 原文地址:https://www.cnblogs.com/gkx0731/p/9744461.html
Copyright © 2011-2022 走看看