zoukankan      html  css  js  c++  java
  • python 线程、进程

    1、队列、线程

    import MySQLdb
    import MySQLdb.cursors
    import queue
    import threading
    
    def update_line_thread():
        connection = MySQLdb.connect(host='xx.xx.xx.xx', port=3306, user='', passwd='', db='db_name',
                                 charset='utf8mb4', connect_timeout=100000, cursorclass=MySQLdb.cursors.SSCursor, autocommit=1)
        cursor = connection.cursor()
        sql = 'UPDATE table SET col_name=%s,where id=%s'
        while True:
            stroke,Id= q.get()
            keys = (stroke,Id)
            try:
                cursor.execute(sql, keys)
            except Exception as e:
                print(e,fid)
            q.task_done()
            
            
            
    q = queue.Queue()
    for i in range(10):
        t = threading.Thread(target=update_line_thread, daemon=True)
        t.start()
    for ID in id2stroke:
        q.put((id2stroke[ID], ID))
    q.join()

    2、进程池 与 线程池

      IO绑定工作 – > multiprocessing.pool.ThreadPool
      CPU绑定作业 – > multiprocessing.Pool

    import MySQLdb
    import MySQLdb.cursors
    import pymongo
    from
    multiprocessing.pool import Pool, ThreadPool def gen_doc(paper_list): with ThreadPool(7) as tp: p1 = tp.apply_async(get_paper_info, (paper_list, )) p2 = tp.apply_async(get_paper_field, (paper_list, )) p3 = tp.apply_async(get_paper_author, (paper_list, )) paper_info = p1.get() paper_field = p2.get() paper_author = p3.get() for line in paper_info: yield { "key":"value"} def update_batch(paper_list): conn= pymongo.MongoClient('xx,xx,xx,xx', 27017, username="", password="").db_name.collection_name for doc in gen_doc(paper_list): # conn.delete_one({'_id':doc['_id']}) conn.replace_one({'_id': doc['_id']}, doc, upsert=True) def update_by_list(paper_list): paper_list.sort() batch_size = 1000 batch_list = [paper_list[begin:begin+batch_size] for begin in range(0, len(paper_list), batch_size)] print("Total batch num: ", len(batch_list)) with Pool(25) as pool: # pool.map(update_batch, batch_list, chunksize=200) for idx, _ in enumerate(pool.imap_unordered(update_batch, batch_list)): if idx % 100 == 0: print(idx)

    3、concurrent.futures

     模块中有 2 个类:ThreadPoolExecutor 和 ProcessPoolExecutor,也就是对 threading 和 multiprocessing 的进行了高级别的抽象, 暴露出统一的接口,帮助开发者非常方便的实现异步调用。

     concurrent.futures 底层还是用着 threading 和 multiprocessing,相当于在其上又封装了一层,并且重新设计了架构,所以会慢一点。架构复杂,但接口简单。

    #在Python中实现多处理的一个简单方法是
    from multiprocessing import Pool
    
    def calculate(number):
        return number
    
    if __name__ == '__main__':
        pool = Pool()
        result = pool.map(calculate, range(4))
    
    #基于未来的另一种实现是 from concurrent.futures import ProcessPoolExecutor def calculate(number): return number with ProcessPoolExecutor() as executor: result = executor.map(calculate, range(4))
    Python 3.5 加入了 chunksize 。解决了 ProcessPoolExecutor始终一次将一项从可迭代项传递给子项,这会导致IPC开销增加,从而导致大型可迭代项的性能大大降低。
    multiprocessing.Pool.map outperforms ProcessPoolExecutor.map. Note that the performance difference is very small per work item, 
    so you'll probably only notice a large performance difference if you're using map on a very large iterable. The reason for the
    performance difference is that multiprocessing.Pool will batch the iterable passed to map into chunks, and then pass the chunks
    to the worker processes, which reduces the overhead of IPC between the parent and children. ProcessPoolExecutor always passes
    one item from the iterable at a time to the children, which can lead to much slower performance with large iterables, due to the
    increased IPC overhead. The good news is this issue will be fixed in Python 3.5, as as chunksize keyword argument has been added
    to ProcessPoolExecutor.map, which can be used to specify a larger chunk size if you know you're dealing with large iterables.
    See this bug(http://bugs.python.org/issue11271) for more info.
    # coding=utf-8
    import time
    from multiprocessing.pool import Pool
    from concurrent.futures import as_completed, ProcessPoolExecutor
    
    NUMBERS = range(1, 50)
    
    def fib(n):
        if n<= 2:
            return 1
        return fib(n-1) + fib(n-2)
    
    print('multiprocessing.pool.Pool:
    ')
    start = time.time()
    
    l = []
    pool = Pool(5)
    for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)):
        l.append(result)
    print(len(l))
    print('COST: {}'.format(time.time() - start))
    
    
    print('ProcessPoolExecutor without chunksize:
    ')
    start = time.time()
    l = []
    with ProcessPoolExecutor(max_workers=5) as executor:
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
            l.append(result)
    
    print(len(l))
    
    print('COST: {}'.format(time.time() - start))
    
    
    print('ProcessPoolExecutor with chunksize:
    ')
    start = time.time()
    l = []
    with ProcessPoolExecutor(max_workers=5) as executor:
        # 保持和multiprocessing.pool的默认chunksize一样
        chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 4)
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS, chunksize=chunksize)):
            l.append(result)
    
    print(len(l))
    
    print('COST: {}'.format(time.time() - start))

    https://www.jianshu.com/p/36bf441034b9

    https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html

  • 相关阅读:
    js加密
    sharepoint更新左侧列表的名字
    HTML转换JS
    Html空格字符代码:
    docker 与host互传文件
    Ubuntu里node命令出错,找不到
    docker查看运行容器详细信息
    docker保存容器的修改
    Docker容器中安装新的程序
    运行docker容器镜像
  • 原文地址:https://www.cnblogs.com/Mint-diary/p/13343885.html
Copyright © 2011-2022 走看看