zoukankan      html  css  js  c++  java
  • Python并行实例

    任务

    def single():
        # 单进程单线程实现
        s = 0
        for i in range(1, N):
            s += math.sqrt(i)
        return s
    

    结论

    • Python多线程无法利用多核
    • Python多进程可以利用多核
    • Numpy速度远超并行的Python代码
    • twisted无法利用多核

    实现

    import math
    import multiprocessing
    import threading
    import timeit
    
    import numpy as np
    from twisted.internet import reactor
    import time
    
    N = 10000000
    
    
    def single():
        # 单进程单线程实现
        s = 0
        for i in range(1, N):
            s += math.sqrt(i)
        return s
    
    
    def useThread():
        # 多线程实现
        total_sum = 0
    
        def go(beg, end):
            nonlocal total_sum
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            total_sum += s  # python无法利用多核,所以这句话每个时刻只有一个线程在执行
    
        thread_count = 4
        per = math.ceil(N / thread_count)
        thread_list = []
        for i in range(thread_count):
            th = threading.Thread(target=go, args=(i * per, (i + 1) * per))
            thread_list.append(th)
            th.start()
        for th in thread_list:
            th.join()
        return total_sum
    
    
    def useMultiprocess():
        # 使用多进程
        def go(q: multiprocessing.Queue, beg, end):
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            q.put(s)
    
        process_count = 4
        per = math.ceil(N / process_count)
        process_list = []
    
        q = multiprocessing.Queue()
        for i in range(process_count):
            th = multiprocessing.Process(target=go, args=(q, i * per, (i + 1) * per))
            process_list.append(th)
            th.start()
        for th in process_list:
            th.join()
        total_sum = 0
        try:
            while 1:
                x = q.get_nowait()
                total_sum += x
        except:
            pass
        return total_sum
    
    
    def useTwisted():
        # reactor是单例模式,一个进程只有一个reactor,一个reactor包括多个线程
        total_sum = 0
        ok_count = 0
        thread_count = 4
    
        def go(beg, end):
            nonlocal total_sum
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            reactor.callFromThread(accumulate, s)
    
        def accumulate(s):
            nonlocal total_sum
            nonlocal ok_count
            ok_count += 1
            if ok_count == thread_count:
                reactor.stop()
            total_sum += s
    
        def process_work(q):
            reactor.suggestThreadPoolSize(thread_count)
            per = math.ceil(N / thread_count)
            for i in range(thread_count):
                reactor.callInThread(go, i * per, i * per + per)
            reactor.run()
            q.put(total_sum)
    
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=process_work, args=(q,))
        p.start()
        p.join()
        return q.get()
    
    
    def useTwisted2():
        # reactor是单例模式,一个进程只有一个reactor,一个reactor包括一个线程
        total_sum = 0
        thread_count = 4
        ok_count = 0
        beg_time = time.time()
    
        def go(beg, end):
            nonlocal total_sum
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            reactor.callFromThread(accumulate, s)
    
        def accumulate(s):
            nonlocal total_sum
            nonlocal ok_count
            total_sum += s
            ok_count += 1
            if ok_count == thread_count:
                print(time.time() - beg_time, "value", total_sum)
    
        reactor.suggestThreadPoolSize(thread_count)
        per = math.ceil(N / thread_count)
        for i in range(thread_count):
            reactor.callInThread(go, i * per, i * per + per)
    
    
    def useNumpy():
        a = np.linspace(1, N, N)
        return np.sum(np.sqrt(a))
    
    
    def main():
        for method in (single, useThread, useMultiprocess, useNumpy, useTwisted, useTwisted2):
            print(method.__name__, "result", method(), "time", timeit.timeit(method, number=10))
        reactor.run()
    
    
    if __name__ == '__main__':
        main()
    
    

    twisted无法利用多核

    from twisted.internet import threads, reactor
    import time
    import math
    
    beg_time = time.time()
    
    
    def go():
        print("go start")
        s = 0
        for i in range(10000000):
            s += math.sqrt(i + 1)
        print("go over", time.time() - beg_time)
    
    
    import timeit
    
    reactor.suggestThreadPoolSize(8)
    print(timeit.timeit(go, number=1))
    for i in range(10):
        reactor.callInThread(go)
    reactor.run()
    
    
  • 相关阅读:
    oop klass

    广义表
    Huffman树
    二叉搜索树
    二叉树的前序、中序、后序、层序遍历
    循环链表解决约瑟夫环问题
    搭建局域网SVN代码服务器
    【CheckList】精简用例,提升执行效率,减少漏测(总结篇)
    测试资源不同时,如何有针对性的设计测试用例?
  • 原文地址:https://www.cnblogs.com/weiyinfu/p/10514432.html
Copyright © 2011-2022 走看看