zoukankan      html  css  js  c++  java
  • Python并行实例

    任务

    def single():
        # 单进程单线程实现
        s = 0
        for i in range(1, N):
            s += math.sqrt(i)
        return s
    

    结论

    • Python多线程无法利用多核
    • Python多进程可以利用多核
    • Numpy速度远超并行的Python代码
    • twisted无法利用多核

    实现

    import math
    import multiprocessing
    import threading
    import timeit
    
    import numpy as np
    from twisted.internet import reactor
    import time
    
    N = 10000000
    
    
    def single():
        # 单进程单线程实现
        s = 0
        for i in range(1, N):
            s += math.sqrt(i)
        return s
    
    
    def useThread():
        # 多线程实现
        total_sum = 0
    
        def go(beg, end):
            nonlocal total_sum
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            total_sum += s  # python无法利用多核,所以这句话每个时刻只有一个线程在执行
    
        thread_count = 4
        per = math.ceil(N / thread_count)
        thread_list = []
        for i in range(thread_count):
            th = threading.Thread(target=go, args=(i * per, (i + 1) * per))
            thread_list.append(th)
            th.start()
        for th in thread_list:
            th.join()
        return total_sum
    
    
    def useMultiprocess():
        # 使用多进程
        def go(q: multiprocessing.Queue, beg, end):
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            q.put(s)
    
        process_count = 4
        per = math.ceil(N / process_count)
        process_list = []
    
        q = multiprocessing.Queue()
        for i in range(process_count):
            th = multiprocessing.Process(target=go, args=(q, i * per, (i + 1) * per))
            process_list.append(th)
            th.start()
        for th in process_list:
            th.join()
        total_sum = 0
        try:
            while 1:
                x = q.get_nowait()
                total_sum += x
        except:
            pass
        return total_sum
    
    
    def useTwisted():
        # reactor是单例模式,一个进程只有一个reactor,一个reactor包括多个线程
        total_sum = 0
        ok_count = 0
        thread_count = 4
    
        def go(beg, end):
            nonlocal total_sum
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            reactor.callFromThread(accumulate, s)
    
        def accumulate(s):
            nonlocal total_sum
            nonlocal ok_count
            ok_count += 1
            if ok_count == thread_count:
                reactor.stop()
            total_sum += s
    
        def process_work(q):
            reactor.suggestThreadPoolSize(thread_count)
            per = math.ceil(N / thread_count)
            for i in range(thread_count):
                reactor.callInThread(go, i * per, i * per + per)
            reactor.run()
            q.put(total_sum)
    
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=process_work, args=(q,))
        p.start()
        p.join()
        return q.get()
    
    
    def useTwisted2():
        # reactor是单例模式,一个进程只有一个reactor,一个reactor包括一个线程
        total_sum = 0
        thread_count = 4
        ok_count = 0
        beg_time = time.time()
    
        def go(beg, end):
            nonlocal total_sum
            s = 0
            for i in range(beg, end):
                s += math.sqrt(i)
            reactor.callFromThread(accumulate, s)
    
        def accumulate(s):
            nonlocal total_sum
            nonlocal ok_count
            total_sum += s
            ok_count += 1
            if ok_count == thread_count:
                print(time.time() - beg_time, "value", total_sum)
    
        reactor.suggestThreadPoolSize(thread_count)
        per = math.ceil(N / thread_count)
        for i in range(thread_count):
            reactor.callInThread(go, i * per, i * per + per)
    
    
    def useNumpy():
        a = np.linspace(1, N, N)
        return np.sum(np.sqrt(a))
    
    
    def main():
        for method in (single, useThread, useMultiprocess, useNumpy, useTwisted, useTwisted2):
            print(method.__name__, "result", method(), "time", timeit.timeit(method, number=10))
        reactor.run()
    
    
    if __name__ == '__main__':
        main()
    
    

    twisted无法利用多核

    from twisted.internet import threads, reactor
    import time
    import math
    
    beg_time = time.time()
    
    
    def go():
        print("go start")
        s = 0
        for i in range(10000000):
            s += math.sqrt(i + 1)
        print("go over", time.time() - beg_time)
    
    
    import timeit
    
    reactor.suggestThreadPoolSize(8)
    print(timeit.timeit(go, number=1))
    for i in range(10):
        reactor.callInThread(go)
    reactor.run()
    
    
  • 相关阅读:
    托付和事件的使用
    在使用supervisord 管理tomcat时遇到的小问题
    无法安装vmware tools的解决方PLEASE WAIT! VMware Tools is currently being installed on your system. Dependin
    (转)Openlayers 2.X加载高德地图
    (转)openlayers实现在线编辑
    (转) Arcgis for js加载百度地图
    (转)Arcgis for js加载天地图
    (转) 基于Arcgis for Js的web GIS数据在线采集简介
    (转) Arcgis for js之WKT和GEOMETRY的相互转换
    (转)Arcgis for Js之Graphiclayer扩展详解
  • 原文地址:https://www.cnblogs.com/weiyinfu/p/10514432.html
Copyright © 2011-2022 走看看