zoukankan      html  css  js  c++  java
  • python多进程: multiprocessing Pool 和tqdm

    https://blog.csdn.net/qq_39694935/article/details/84552076

    【Python】multiprocessing Pool 进程间通信共享

    1. tqdm模块的简洁使用

    直接上代码:

    from tqdm import tqdm
    from multiprocessing import Pool
    import functools
    from pymongo import MongoClient
    mdb = MongoClient('120.xx.26.xx:20002', username='xx', password='xxxxx')
    
    # 三种main的写法只写一种即可
    
    def create_data(image):
    
        # TODO 具体处理逻辑
        print(image)
        return str(image)
    
    
    def main_deal():
    
        num_processor = 20
        p = Pool(num_processor)
        images = mdb['db_name']['image'].find(no_cursor_timeout=True).batch_size(200)
    
        fw = open('result.txt', 'w+')
        for result in tqdm(p.imap(create_data, images), total=images.count()):
            fw.write(result + '
    ')
        fw.close()
    
        for _ in tqdm(p.imap_unordered(create_data, images)):
            pass
        p.close()
        p.join()
    
    
    def main_deal():
    
        num_processor = 20
        p = Pool(num_processor)
    
        images = mdb['goodlook']['image_generated_data'].find(no_cursor_timeout=True).batch_size(200)
        fw = open('result.txt', 'w+')
        for result in tqdm(p.imap_unordered(create_data, images)):
            fw.write(result + '
    ')
        fw.close()
    
        p.close()
        p.join()
    
    
    def main_deal():
        num_processor = 20
        p = Pool(num_processor)
    
        images = mdb['goodlook']['image_generated_data'].find(no_cursor_timeout=True).batch_size(200)
        fw = open('result.txt', 'w+')
    
        pt = functools.partial(create_data)
        for result in tqdm(p.imap_unordered(pt, images)):
            fw.write(result + '
    ')
        fw.close()
    
        p.close()
        p.join()
    
    
    if __name__ == '__main__':
        main_deal()

    2.进程池多进程

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    # __author__ = "Victor"
    # Date: 2020/6/18
    
    import traceback
    import multiprocessing
    from multiprocessing import Pool
    
    concurrent_num = 10
    
    
    def task_run(data, msg):
        try:
            # time.sleep(random.randrange(1, 4))
            msg = multiprocessing.current_process().name + '-' + msg
            print(f"hello world : {data}, {msg}")
    
        except Exception as e:
            traceback.print_exc()
            print("error: ", e)
    
        return None
    
    
    if __name__ == '__main__':
    
        data = {}
        p = Pool(concurrent_num)
    
        for i in range(concurrent_num):
            msg = 'index-%d' % i
            p.apply_async(task_run, (data, msg,))
    
        p.close()
        p.join()
    

    3. 进程池和调度器模块的冲突

    #!/usr/bin/env python
    # -*- encoding: utf-8 -*-
    ''' 
    @Author: Victor
    @Contact: 
    @Date: 2020/10/15
    @function: ''
    '''
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.schedulers.background import BackgroundScheduler
    
    import time
    import random
    import multiprocessing
    
    
    class TodayCollection(object):
    
        def __init__(self):
            self.name = "今日采集类"
            self.scheduler = BlockingScheduler()
    
        def execute_tasks(self, index, d_arr, p_lock):
            try:
    
                # 一般用于写数据库
                # if p_lock:
                #     p_lock.acquire()
                # print(index, d_arr)
                # if p_lock:
                #     p_lock.release()
    
                # p_lock.acquire()
                # print(index, d_arr)
                # p_lock.release()
    
                while True:
                    print(index, d_arr, random.random())
    
            except Exception as ex:
                print(ex)
    
        def start(self):
            groups = [[1, 3, 22], [3, 4, 6, 8], [3, 3, 4, 4], [3, 5, 6, 7]]
            manager = multiprocessing.Manager()
            p_lock = manager.Lock()
            pool = multiprocessing.Pool(processes=4)
            for index, d_arr in enumerate(groups):
                if d_arr:
                    pool.apply_async(self.execute_tasks, (index, d_arr, p_lock))
    
            pool.close()
            pool.join()
            pool.terminate()
    
    
    if __name__ == '__main__':
    
        # apscheduler的BlockingScheduler和BackgroundScheduler导致多进程异常退出
        # 要想正常直接去掉self.scheduler = BlockingScheduler()
        TodayCollection().start()
    

      

  • 相关阅读:
    PGL:Paddle带你走进图学习
    Improving the way neural networks learn
    文本语义匹配Simnet模型
    图卷积神经网络(GCN)
    git revert
    vi/vim多行注释和取消注释、复制粘贴
    谷歌之多任务学习模型MMoE
    阿里CVR预估模型之ESMM
    知识图谱简介
    CTR预估--Deep Interest Network
  • 原文地址:https://www.cnblogs.com/adamans/p/10523560.html
Copyright © 2011-2022 走看看