zoukankan      html  css  js  c++  java
  • python 线程、进程

    1、队列、线程

    import MySQLdb
    import MySQLdb.cursors
    import queue
    import threading
    
    def update_line_thread():
        connection = MySQLdb.connect(host='xx.xx.xx.xx', port=3306, user='', passwd='', db='db_name',
                                 charset='utf8mb4', connect_timeout=100000, cursorclass=MySQLdb.cursors.SSCursor, autocommit=1)
        cursor = connection.cursor()
        sql = 'UPDATE table SET col_name=%s,where id=%s'
        while True:
            stroke,Id= q.get()
            keys = (stroke,Id)
            try:
                cursor.execute(sql, keys)
            except Exception as e:
                print(e,fid)
            q.task_done()
            
            
            
    q = queue.Queue()
    for i in range(10):
        t = threading.Thread(target=update_line_thread, daemon=True)
        t.start()
    for ID in id2stroke:
        q.put((id2stroke[ID], ID))
    q.join()

    2、进程池 与 线程池

      IO绑定工作 – > multiprocessing.pool.ThreadPool
      CPU绑定作业 – > multiprocessing.Pool

    import MySQLdb
    import MySQLdb.cursors
    import pymongo
    from
    multiprocessing.pool import Pool, ThreadPool def gen_doc(paper_list): with ThreadPool(7) as tp: p1 = tp.apply_async(get_paper_info, (paper_list, )) p2 = tp.apply_async(get_paper_field, (paper_list, )) p3 = tp.apply_async(get_paper_author, (paper_list, )) paper_info = p1.get() paper_field = p2.get() paper_author = p3.get() for line in paper_info: yield { "key":"value"} def update_batch(paper_list): conn= pymongo.MongoClient('xx,xx,xx,xx', 27017, username="", password="").db_name.collection_name for doc in gen_doc(paper_list): # conn.delete_one({'_id':doc['_id']}) conn.replace_one({'_id': doc['_id']}, doc, upsert=True) def update_by_list(paper_list): paper_list.sort() batch_size = 1000 batch_list = [paper_list[begin:begin+batch_size] for begin in range(0, len(paper_list), batch_size)] print("Total batch num: ", len(batch_list)) with Pool(25) as pool: # pool.map(update_batch, batch_list, chunksize=200) for idx, _ in enumerate(pool.imap_unordered(update_batch, batch_list)): if idx % 100 == 0: print(idx)

    3、concurrent.futures

     模块中有 2 个类:ThreadPoolExecutor 和 ProcessPoolExecutor,也就是对 threading 和 multiprocessing 的进行了高级别的抽象, 暴露出统一的接口,帮助开发者非常方便的实现异步调用。

     concurrent.futures 底层还是用着 threading 和 multiprocessing,相当于在其上又封装了一层,并且重新设计了架构,所以会慢一点。架构复杂,但接口简单。

    #在Python中实现多处理的一个简单方法是
    from multiprocessing import Pool
    
    def calculate(number):
        return number
    
    if __name__ == '__main__':
        pool = Pool()
        result = pool.map(calculate, range(4))
    
    #基于未来的另一种实现是 from concurrent.futures import ProcessPoolExecutor def calculate(number): return number with ProcessPoolExecutor() as executor: result = executor.map(calculate, range(4))
    Python 3.5 加入了 chunksize 。解决了 ProcessPoolExecutor始终一次将一项从可迭代项传递给子项,这会导致IPC开销增加,从而导致大型可迭代项的性能大大降低。
    multiprocessing.Pool.map outperforms ProcessPoolExecutor.map. Note that the performance difference is very small per work item, 
    so you'll probably only notice a large performance difference if you're using map on a very large iterable. The reason for the
    performance difference is that multiprocessing.Pool will batch the iterable passed to map into chunks, and then pass the chunks
    to the worker processes, which reduces the overhead of IPC between the parent and children. ProcessPoolExecutor always passes
    one item from the iterable at a time to the children, which can lead to much slower performance with large iterables, due to the
    increased IPC overhead. The good news is this issue will be fixed in Python 3.5, as as chunksize keyword argument has been added
    to ProcessPoolExecutor.map, which can be used to specify a larger chunk size if you know you're dealing with large iterables.
    See this bug(http://bugs.python.org/issue11271) for more info.
    # coding=utf-8
    import time
    from multiprocessing.pool import Pool
    from concurrent.futures import as_completed, ProcessPoolExecutor
    
    NUMBERS = range(1, 50)
    
    def fib(n):
        if n<= 2:
            return 1
        return fib(n-1) + fib(n-2)
    
    print('multiprocessing.pool.Pool:
    ')
    start = time.time()
    
    l = []
    pool = Pool(5)
    for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)):
        l.append(result)
    print(len(l))
    print('COST: {}'.format(time.time() - start))
    
    
    print('ProcessPoolExecutor without chunksize:
    ')
    start = time.time()
    l = []
    with ProcessPoolExecutor(max_workers=5) as executor:
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)):
            l.append(result)
    
    print(len(l))
    
    print('COST: {}'.format(time.time() - start))
    
    
    print('ProcessPoolExecutor with chunksize:
    ')
    start = time.time()
    l = []
    with ProcessPoolExecutor(max_workers=5) as executor:
        # 保持和multiprocessing.pool的默认chunksize一样
        chunksize, extra = divmod(len(NUMBERS), executor._max_workers * 4)
    
        for num, result in zip(NUMBERS, executor.map(fib, NUMBERS, chunksize=chunksize)):
            l.append(result)
    
    print(len(l))
    
    print('COST: {}'.format(time.time() - start))

    https://www.jianshu.com/p/36bf441034b9

    https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter4/02_Using_the_concurrent.futures_Python_modules.html

  • 相关阅读:
    面试总结
    java高级开发
    idea的快捷键
    微服务面试题目
    windows下jenkins的安装与配置
    微服务面试集合
    springlcoud中使用consul作为注册中心
    乐观锁和悲观锁
    volatile与synchronized的区别
    Java CAS 和ABA问题
  • 原文地址:https://www.cnblogs.com/Mint-diary/p/13343885.html
Copyright © 2011-2022 走看看