zoukankan      html  css  js  c++  java
  • 04_进程池

    1.为什么用进程池

        1.在需要频繁的创建删除较多进程的情况下,导致计算机资源消耗过多
        2.进程池则是创建指定进程数量等待执行事件,避免了不必要的创建和销毁过程

    2.进程池的使用步骤

        1.创建进程池,在池内放入适量的进程,将事件加入进程池的等待队列
        2.使用进程池中的进程不断处理事件,所有事件处理后回收关闭进程池

    3.语法概述

    from multiprocessing import Pool
    
    pool = Pool(processes=4)  # 创建指定进程数量进程池并返回进程池对象
    
    # 异步方式将事件放入进程池执行,返回一个对象该对象
    # callback: 回调函数,每当进程池中的进程处理完任务了,返回的结果交给回调函数,由回调函数进一步处理,回调函数只有异步时才有
    ret = pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
        参数:
            func要执行的事件函数,可以通过返回值对象的 ret.get() 方法得到func函数的返回值
            args: 位置参数元组,要给函数传递的参数
            kwargs: 键值对参数字典,要给函数传递的参数
            callback: 进程的任务函数返回值被当做回调函数的形参接收到,以此进行进一步的处理操作,回调函数是主进程调用的
            error_callback: 错误时主进程执行的回调函数
    pool.close()  # 关闭进程池,使其无法加入新的事件
    pool.join()  # 阻塞等待进程池退出(当所有事情处理完毕后)
    pool.apply()  # 用法和 pool.apply_async() 一样,但是没有返回值,异步方式将事件放入进程池顺序执行,一个事件结束再执行另一个事件
    func_list = pool.map(func, iter)  # 类似于内建函数map,将第二个参数的迭代数传递给第一个参数的函数执行,同时兼容了使用进程池执行

            map函数等同于for + apply_async示例

    from multiprocessing import Pool
    import time
    
    
    def fun(num):
        time.sleep(1)
        return num * num
    
    
    test = [1, 2, 3, 4, 5, 6]
    pool = Pool(3)
    
    r = pool.map(fun, test)  # 返回fun函数的返回值列表
    # 上面折行代码等同于以下注释部分的代码
    # r = []
    # for i in test:
    #     res = pool.apply_async(fun, (i,))
    #     r.append(res.get())
    print(r)
    pool.close()
    pool.join()

            map返回值示例

    from multiprocessing import Pool
    
    
    def func(num):
        num += 1
        print(num)
        return num
    
    
    if __name__ == '__main__':
        p = Pool(5)
        res = p.map(func, [i for i in range(100)])
        p.close()
        p.join()
        print('主进程中map的返回值', res)

    4.查看进程池中进程的进程号

    from multiprocessing import Pool
    import os
    import time
    import random
    import sys
    
    
    def worker(msg):
        t_start = time.time()
        print("%s开始执行,进程号为%d" % (msg, os.getpid()))
        # random.random()随机生成0~1之间的浮点数
        time.sleep(random.random() * 2)
        t_stop = time.time()
        print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start))
        return "函数{}-{}-{}".format(sys._getframe().f_code.co_name, str(msg), "over")
    
    
    ret = list()
    po = Pool(3)  # 定义一个进程池,最大进程数3
    for i in range(0, 10):
        # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        r = po.apply_async(worker, (i,))
        ret.append(r)
    
    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    for r in ret:
        print(r.get())  # 可以通过返回值对象的 r.get() 方法得到worker函数的返回值
    print("-----end-----")
    """执行结果
        ----start----
        0开始执行,进程号为23515
        1开始执行,进程号为23516
        2开始执行,进程号为23517
        1 执行完毕,耗时0.07
        3开始执行,进程号为23516
        3 执行完毕,耗时0.08
        4开始执行,进程号为23516
        4 执行完毕,耗时0.66
        5开始执行,进程号为23516
        2 执行完毕,耗时1.25
        6开始执行,进程号为23517
        0 执行完毕,耗时1.37
        7开始执行,进程号为23515
        5 执行完毕,耗时0.83
        8开始执行,进程号为23516
        8 执行完毕,耗时0.33
        9开始执行,进程号为23516
        7 执行完毕,耗时0.72
        9 执行完毕,耗时0.34
        6 执行完毕,耗时1.71
        函数worker-0-over
        函数worker-1-over
        函数worker-2-over
        函数worker-3-over
        函数worker-4-over
        函数worker-5-over
        函数worker-6-over
        函数worker-7-over
        函数worker-8-over
        函数worker-9-over
        -----end-----
    """

    5.进程池实现文件拷贝

    import multiprocessing
    import os
    import time
    import random
    
    
    def copy_file(queue, file_name, source_folder_name, dest_folder_name):
        """copy文件到指定的路径"""
        f_read = open(source_folder_name + "/" + file_name, "rb")
        f_write = open(dest_folder_name + "/" + file_name, "wb")
        while True:
            time.sleep(random.random())
            content = f_read.read(1024)
            if content:
                f_write.write(content)
            else:
                break
        f_read.close()
        f_write.close()
    
        # 发送已经拷贝完毕的文件名字
        queue.put(file_name)
    
    
    def main():
        # 获取要复制的文件夹
        source_folder_name = input("请输入要复制文件夹名字:")
    
        # 整理目标文件夹
        dest_folder_name = source_folder_name + "[副本]"
    
        # 创建目标文件夹
        try:
            os.mkdir(dest_folder_name)
        except:
            pass  # 如果文件夹已经存在,那么创建会失败
    
        # 获取这个文件夹中所有的普通文件名
        file_names = os.listdir(source_folder_name)
    
        # 创建Queue
        queue = multiprocessing.Manager().Queue()
    
        # 创建进程池
        pool = multiprocessing.Pool(3)
    
        for file_name in file_names:
            # 向进程池中添加任务
            pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))
    
        # 主进程显示进度
        pool.close()
    
        all_file_num = len(file_names)
        while True:
            file_name = queue.get()
            if file_name in file_names:
                file_names.remove(file_name)
    
            copy_rate = (all_file_num - len(file_names)) * 100 / all_file_num
            print("
    %.2f...(%s)" % (copy_rate, file_name) + " " * 50, end="")
            if copy_rate >= 100:
                break
        print()
    
    
    if __name__ == "__main__":
        main()

    6.进程池实现图片之家古装美女图片爬虫

    import os
    from multiprocessing import Pool
    
    import requests
    from bs4 import BeautifulSoup
    
    
    def get_url(url):
        res = requests.get(url)
        li = list()
        if res.status_code == 200:
            # 返回网页源代码
            soup = BeautifulSoup(res.text, "html.parser")
            # print(soup)
    
            # 返回含有图片url的div标签
            re = soup.find("div", class_="list_con_box").find_all("li")
            # print(re)
    
            # 数据清洗
            for i in re:
                img_s = i.find("img")  # 返回含有图片url的img标签
                if img_s:
                    src_s = img_s.get("src")  # 返回图片url
                    # print(src_s)  # https://img.tupianzj.com/uploads/allimg/200828/30-200RQ110300-L.jpg
                    li.append(src_s)
        return li
    
    
    def get_img(url):
        s = url.split("/")[-1]
        # print(s)  # 30-200RQ110300-L.jpg
        r = requests.get(url)
        if r.status_code == 200:
            with open("./古装美女/" + s, "wb") as f:
                f.write(r.content)
    
    
    def main():
        headers = {
            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:75.0) Gecko/20100101 Firefox/75.0"
        }
        # 图片之家古装美女url
        url = "https://www.tupianzj.com/meinv/guzhuang/"
    
        # 获取图片url列表
        img_url = get_url(url)
    
        # 提取图片名称并保存到本地
        if not os.path.isdir("./古装美女"):
            os.mkdir("./古装美女")
        # 用进程池实现多任务下载
        pool = Pool(7)
        pool.map(get_img, img_url)  # 效果等同于以下注释的两行代码
        # for i in img_url:
        #     pool.apply_async(get_img, (i,))
    
        pool.close()
        pool.join()
    
    
    if __name__ == "__main__":
        main()

    7.进程池中父进程调用回调函数

    from multiprocessing import Pool
    import requests
    import os
    
    
    def func(url):
        res = requests.get(url)
        print('子进程的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))
        # print(res.text)
        if res.status_code == 200:
            return url,res.text
    
    
    def cal_back(sta):
        url, text = sta
        print('回调函数的pid', os.getpid())
        with open('a.txt', 'a', encoding='utf-8') as f:
            f.write(url + text)
        # print('回调函数中!', url)
    
    
    if __name__ == '__main__':
        p = Pool(5)
        l = ['https://www.baidu.com',
             'http://www.jd.com',
             'http://www.taobao.com',
             'http://www.mi.com',
             'http://www.cnblogs.com',
             'https://www.bilibili.com',
             ]
        print('主进程的pid', os.getpid())
        for i in l:
            p.apply_async(func, args=(i,), callback=cal_back)
            # 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
        p.close()
        p.join()
  • 相关阅读:
    nyoj 228士兵杀敌(五)
    hdu2072 单词数
    nyoj123士兵杀敌(四)(树状数组)
    nyoj1092数字分隔(二)
    HDU1166:敌兵布阵(线段树单点更新,区间查询)
    nyoj269VF(dp)
    nyoj 860又见01背包(01背包)
    poj2184(01背包变形)
    HDU2159FATE(完全背包变形)
    HDU4508 完全背包
  • 原文地址:https://www.cnblogs.com/tangxuecheng/p/13608229.html
Copyright © 2011-2022 走看看