zoukankan      html  css  js  c++  java
  • IPython并行计算工具

    IPython并行计算工具

    解决并行计算和分布式计算的问题

    • 运行解释说明

      • 一直以来Python的并发问题都会被大家所诟病,正是因为全局解释锁的存在,导致其不能够真正的做到并发的执行。所以,我们就需要ipyparallel的存在来帮助我们处理并发计算的问题了。
      • ipyparallel中,可以利用多个engine同时运行一个任务来加快处理的速度。集群被抽象为view,包括direct_viewbalanced_view。其中,direct_view是所有的engine的抽象,当然也可以自行指定由哪些engine构成,而balanced_view是多个engine经过负载均衡之后,抽象出来的由“单一”engine构成的view。利用ipyparallel并行化的基本思路是将要处理的数据首先进行切分,然后分布到每一个engine上,然后将最终的处理结果合并,得到最终的结果,其思路和mapreduce类似。
    • 并行计算分类

      • ipcluster - 单机并行计算
      • ipyparallel - 分布式计算
    • 相关连接地址

    • 安装方式

     
    bash
    # 使用pip安装
    $ pip install ipyparallel
    • 配置并行环境
     
    bash
    # 命令可以简单的创建一个通用的并行环境profile配置文件
    $ ipython profile create --parallel --profile=myprofile

    1. 并行计算示例

    做一次wordcount的计算测试。

    • 数据来源地址
     
    bash
    # 使用wget下载
    $ wget http://www.gutenberg.org/files/27287/27287-0.txt
    • 不并行的版本
     
    python
    In [1]: import re
    
    In [2]: import io
    
    In [3]: from collections import defaultdict
    
    In [4]: non_word = re.compile(r'[Wd]+', re.UNICODE)
    
    In [5]: common_words = {
       ...: 'the','of','and','in','to','a','is','it','that','which','as','on','by',
       ...: 'be','this','with','are','from','will','at','you','not','for','no','have',
       ...: 'i','or','if','his','its','they','but','their','one','all','he','when',
       ...: 'than','so','these','them','may','see','other','was','has','an','there',
       ...: 'more','we','footnote', 'who', 'had', 'been',  'she', 'do', 'what',
       ...: 'her', 'him', 'my', 'me', 'would', 'could', 'said', 'am', 'were', 'very',
       ...: 'your', 'did', 'not',
       ...: }
    
    In [6]: def yield_words(filename):
       ...:     import io
       ...:     with io.open(filename, encoding='latin-1') as f:
       ...:         for line in f:
       ...:             for word in line.split():
       ...:                 word = non_word.sub('', word.lower())
       ...:                 if word and word not in common_words:
       ...:                     yield word
       ...:
    
    In [7]: def word_count(filename):
       ...:     word_iterator = yield_words(filename)
       ...:     counts = {}
       ...:     counts = defaultdict(int)
       ...:     while True:
       ...:         try:
       ...:             word = next(word_iterator)
       ...:         except StopIteration:
       ...:             break
       ...:         else:
       ...:             counts[word] += 1
       ...:     return counts
       ...:
    
    In [8]: %time counts = word_count(filename)
    CPU times: user 3.32 ms, sys: 1.4 ms, total: 4.72 ms
    Wall time: 10.9 ms
    • 用 IPython 来跑一下
     
    python
    # 在terminal输入如下命令,然后在ipython中就都可使用并行计算
    # 指定两个核心来执行
    [escape@loaclhost ~]$ ipcluster start -n 2
    • 先讲下 IPython 并行计算的用法
     
    python
    # import之后才能用%px*的magic
    In [1]: from IPython.parallel import Client
    
    In [2]: rc = Client()
    
    # 因为我启动了2个进程
    In [3]: rc.ids
    Out[3]: [0, 1]
    
    # 如果不自动每句都需要: `%px xxx`
    In [4]: %autopx
    %autopx enabled
    
    # 这里没autopx的话需要: `%px import os`
    In [5]: import os
    
    # 2个进程的pid
    In [6]: print os.getpid()
    [stdout:0] 62638
    [stdout:1] 62636
    
    # 在autopx下这个magic不可用
    In [7]: %pxconfig --targets 1
    [stderr:0] ERROR: Line magic function `%pxconfig` not found.
    [stderr:1] ERROR: Line magic function `%pxconfig` not found.
    
    # 再执行一次就会关闭autopx
    In [8]: %autopx
    %autopx disabled
    
    # 指定目标对象, 这样下面执行的代码就会只在第2个进程下运行
    In [10]: %pxconfig --targets 1
    
    # 其实就是执行一段非阻塞的代码
    In [11]: %%px --noblock
       ....: import time
       ....: time.sleep(1)
       ....: os.getpid()
       ....:
    Out[11]: <AsyncResult: execute>
    
    # 看只返回了第二个进程的pid
    In [12]: %pxresult
    Out[1:21]: 62636
    
    # 使用全部的进程, ipython可以细粒度的控制那个engine执行的内容
    In [13]: v = rc[:]
    
    # 每个进程都导入time模块
    In [14]: with v.sync_imports():
       ....:     import time
       ....:
    importing time on engine(s)
    
    In [15]: def f(x):
       ....:     time.sleep(1)
       ....:     return x * x
       ....:
    
    # 同步的执行
    In [16]: v.map_sync(f, range(10))
    
    Out[16]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    
    # 异步的执行
    In [17]: r = v.map(f, range(10))
    
    # celery的用法
    In [18]: r.ready(), r.elapsed
    Out[18]: (True, 5.87735)
    
    # 获得执行的结果
    In [19]: r.get()
    Out[19]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    • 并行版本
     
    python
    In [20]: def split_text(filename):
    ....:    text = open(filename).read()
    ....:    lines = text.splitlines()
    ....:    nlines = len(lines)
    ....:    n = 10
    ....:    block = nlines//n
    ....:    for i in range(n):
    ....:        chunk = lines[i*block:(i+1)*(block)]
    ....:        with open('count_file%i.txt' % i, 'w') as f:
    ....:            f.write('
    '.join(chunk))
    ....:    cwd = os.path.abspath(os.getcwd())
    ....:    # 不用glob是为了精准
    ....:    fnames = [ os.path.join(cwd, 'count_file%i.txt' % i) for i in range(n)]
    ....:    return fnames
    
    In [21]: from IPython import parallel
    
    In [22]: rc = parallel.Client()
    
    In [23]: view = rc.load_balanced_view()
    
    In [24]: v = rc[:]
    
    In [25]: v.push(dict(
       ....:     non_word=non_word,
       ....:     yield_words=yield_words,
       ....:     common_words=common_words
       ....: ))
    Out[25]: <AsyncResult: _push>
    
    In [26]: fnames = split_text(filename)
    
    In [27]: def count_parallel():
       .....:     pcounts = view.map(word_count, fnames)
       .....:     counts = defaultdict(int)
       .....:     for pcount in pcounts.get():
       .....:         for k, v in pcount.iteritems():
       .....:             counts[k] += v
       .....:     return counts, pcounts
       .....:
    
    # 这个时间包含了我再聚合的时间
    In [28]: %time counts, pcounts = count_parallel()
    # 是不是比直接运行少了很多时间
    CPU times: user 50.6 ms, sys: 8.82 ms, total: 59.4 ms
    # 这个时间是
    Wall time: 99.6 ms
    
    In [29]: pcounts.elapsed, pcounts.serial_time, pcounts.wall_time
    Out[29]: (0.104384, 0.13980499999999998, 0.104384)

    可以看出cpu时间上确实减少了,几乎一半,但真实时间上却反而增加到了164ms,用%timeit 查看,发现实际使用时间反而多出了20ms这是因为cpu计算完后还要聚合结果。这个过程也得耗时,也就是说,并行是有额外开销的。


    2. 最简单的应用

    并行就是多个核心同时执行任务了,最简单的就是执行重复任务,将函数提交到引擎中。

     
    python
    c = Client()
    a = lambda :"hi~"
     
    python
    # 并行计算
    %time c[:].apply_sync(a)
    CPU times: user 22.6 ms, sys: 5.05 ms, total: 27.7 ms
    Wall time: 35.4 ms
    
    ['hi~', 'hi~', 'hi~', 'hi~']
     
    python
    # 使用列表生成器
    %time [a() for i in range(2)]
    CPU times: user 10 µs, sys: 6 µs, total: 16 µs
    Wall time: 17.9 µs
    
    ['hi~', 'hi~']

    看得出,cpython还是相当给力的,在这种小规模计算上并行反而比用列表生成器慢很多。


    3. 直接调用 ipyparallel

    我们可以通过DirectView直接在ipython中通过Client对象直接的操作多个engine

     
    python
    from ipyparallel import Client
    rc = Client()
    
    # 查看有多少个engine
    rc.ids
    [0, 1, 2, 3]
    
    # 使用全部engine
    dview = rc[:]
     
    python
    %time map(lambda x:x**2,range(32))
    CPU times: user 21 µs, sys: 5 µs, total: 26 µs
    Wall time: 26.9 µs
    
    [0,
     1,
     4,
     9,
     ...,
     900,
     961]
     
    python
    # 并行的map工具
    %time dview.map_sync(lambda x:x**2,range(32))
    CPU times: user 31.3 ms, sys: 5.12 ms, total: 36.4 ms
    Wall time: 41.4 ms
    
    [0,
     1,
     4,
     9,
     ...,
     900,
     961]

    看来还是单进程给力哇!


    4. 负载均衡 view

    并行的一大难题便是负载均衡,直接使用DirectView并没有这方面优化,可以使用LoadBalancedView来使用负载均衡的view

     
    python
    lview = rc.load_balanced_view()
     
    python
    %time lview.map_sync(lambda x:x**2,range(32))
    CPU times: user 230 ms, sys: 47.3 ms, total: 277 ms
    Wall time: 305 ms
    
    [0,
     1,
     4,
     9,
     ...,
     900,
     961]

     文章作者: Escape
     版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Escape !
  • 相关阅读:
    网站收录(2)-财经网站
    网络爬虫(13)-Scrapy持久化存储
    网络爬虫(12)-Scrapy框架Post请求发送
    Excel常用函数
    VBA基础
    网站收录(1)-行业研究
    网络爬虫(11)-Scrapy分布式
    网络爬虫(10)-进程、线程
    log
    关于camera 智障的问题
  • 原文地址:https://www.cnblogs.com/think90/p/13178216.html
Copyright © 2011-2022 走看看