zoukankan      html  css  js  c++  java
  • R多线程并行计算

    先上代码案例:

    主要的操作:

    library(parallel);#加载并行计算包

    cl <- makeCluster(8);# 初始化cpu集群

    clusterEvalQ(cl,library(RODBC));#添加并行计算中用到的包

    clusterExport(cl,'variablename');#添加并行计算中用到的环境变量(如当前上下文中定义的方法)

    dt <- parApply(cl,stasList, 1, stasPowerPre_Time);# apply的并行版本

    all_predata_time <- do.call('rbind',dt);# 整合结果


    R并行计算相关

    parallel

    library(parallel)
    #使用 parallel包,首先要初始化一个集群,这个集群的数量最好是你CPU核数-1。如果一台8核的电脑建立了数量为8的集群,那你的CPU就干不了其他事情了。所以可以这样启动一个集群:
    # Calculate the number of cores
    no_cores <- detectCores() - 1
    # Initiate cluster
    cl <- makeCluster(no_cores)
    #现在只需要使用并行化版本的lapply,parLapply就可以了
    parLapply(cl, 2:4,function(exponent) 2^exponent)
    #结束后,要记得关闭集群,否则你电脑的内存会始终被R占用
    stopCluster(cl)

    变量作用域

    在Mac/Linux中你可以使用 makeCluster(no_core, type="FORK")这一选项,从而使你并行运行的时候可以包含所有环境变量。
    在Windows中由于使用的是Parallel Socket Cluster (PSOCK),所以每个集群只会加载base包,所以你运行时要指定加载特定的包或变量:

    cl<-makeCluster(no_cores)
    base <- 2
    clusterExport(cl, "base")
    parLapply(cl,2:4,function(exponent)  base^exponent)
    stopCluster(cl)

    注意:你需要用clusterExport(cl, "base")把base这一个变量加载到集群当中。

    如果你在函数中使用了一些其他的包就要使用clusterEvalQ加载进去,比如说,使用rms包,那么就用clusterEvalQ(cl, library(rms))

    要注意的是,在clusterExport 加载某些变量后,这些变量的任何变化都会被忽略:

    cl<-makeCluster(no_cores)
    clusterExport(cl, "base")
    base <- 4
    # Run
    parLapply(cl, 2:4,  function(exponent) 
                base^exponent) 
    # Finish
    stopCluster(cl)
    
    运行结果:
    [[1]]
    [1] 4
     
    [[2]]
    [1] 8
     
    [[3]]
    

    使用parSapply

    如果你想程序返回一个向量或者矩阵。而不是一个列表,那么就应该使用sapply,他同样也有并行版本parSapply:

     

    parSapply(cl, 2:4, 
              function(exponent) 
                base^exponent)
    
    [1]  4  8 16
    

    输出矩阵并显示行名和列名(因此才需要使用as.character):  

    parSapply(cl, as.character(2:4), 
              function(exponent){
                x <- as.numeric(exponent)
                c(base = base^x, self = x^x)
              })
    
    2  3   4
    base 4  8  16
    self 4 27 256
    

    foreach

    设计foreach包的思想可能想要创建一个lapply和for循环的标准,初始化的过程有些不同,你需要register注册集群:

    library(foreach)
    library(doParallel)
    cl<-makeCluster(no_cores)
    registerDoParallel(cl)
    
    #要记得最后要结束集群(不是用stopCluster()):
    stopImplicitCluster()
    

    foreach函数可以使用参数.combine控制你汇总结果的方法:

    foreach(exponent = 2:4, .combine = c)  
        %dopar%  
              base^exponent
     
     [1]  4  8 16</code>    
    
    #-------------------------------------------------
     foreach(exponent = 2:4, .combine = rbind)  
        %dopar%  
              base^exponent
        [,1]
    result.1    4
    result.2    8
    result.3   16    
    
    
    #-------------------------------------------------
     foreach(exponent = 2:4,  .combine = list,  .multicombine = TRUE) 
         %dopar%  
              base^exponent
    [[1]]
    [1] 4
     
    [[2]]
    [1] 8
     
    [[3]]
    [1] 16
                    
    

     注意到最后list的combine方法是默认的。在这个例子中用到一个.multicombine参数,他可以帮助你避免嵌套列表。比如说list(list(result.1, result.2), result.3) : 

    foreach(exponent = 2:4,  .combine = list)  
        %dopar%  
          base^exponent
    [[1]]
    [[1]][[1]]
    [1] 4
     
    [[1]][[2]]
    [1] 8
     
     
    [[2]]
    [1] 16
    

    变量作用域

    在foreach中,变量作用域有些不同,它会自动加载本地的环境到函数中:  

    base <- 2
    cl<-makeCluster(2)
    registerDoParallel(cl)
    foreach(exponent = 2:4, .combine = c)  
        %dopar%  
              base^exponent
    stopCluster(cl)
    
     [1]  4  8 16</code>    
    

    但是,对于父环境的变量则不会加载,以下这个例子就会抛出错误:

    test <- function (exponent) {
      foreach(exponent = 2:4, 
              .combine = c)  %dopar%  
        base^exponent
    }
    test()
     
     Error in base^exponent : task 1 failed - "object 'base' not found"
    

    为解决这个问题你可以使用.export这个参数而不需要使用clusterExport。注意的是,他可以加载最终版本的变量,在函数运行前,变量都是可以改变的:

    base <- 2
    cl<-makeCluster(2)
    registerDoParallel(cl)
     
    base <- 4
    test <- function (exponent) {
      foreach(exponent = 2:4, 
              .combine = c,
              .export = "base")  %dopar%  
        base^exponent
    }
    test()
     
    stopCluster(cl)
     
     [1]  4  8 16
    

    相似的你可以使用.packages参数来加载包,比如说:.packages = c("rms", "mice")

    使用Fork还是Psock?

    这两个的区别:

    FORK:”to pide in branches and go separate ways”
    系统:Unix/Mac (not Windows)
    环境: 所有 

    PSOCK:并行socket集群
    系统: All (including Windows)
    环境: 空

    内存控制

    如果你不打算使用windows的话,建议你尝试FORK模式,它可以实现内存共享,节省你的内存。

    PSOCK:

    library(pryr) # Used for memory analyses
    cl<-makeCluster(no_cores)
    clusterExport(cl, "a")
    clusterEvalQ(cl, library(pryr))
    parSapply(cl, X = 1:10, function(x) {address(a)}) == address(a)
    
    [1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE
    

    FORK :

    cl<-makeCluster(no_cores, type="FORK")
    parSapply(cl, X = 1:10, function(x) address(a)) == address(a)
    
     [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE
    

    你不需要花费太多时间去配置你的环境,有趣的是,你不需要担心变量冲突:

    b <- 0
    parSapply(cl, X = 1:10, function(x) {b <- b + 1; b})
    # [1] 1 1 1 1 1 1 1 1 1 1
    parSapply(cl, X = 1:10, function(x) {b <<- b + 1; b})
    # [1] 1 2 3 4 5 1 2 3 4 5
    b
    # [1] 0
    

    调试

    当你在并行环境中工作是,debug是很困难的,你不能使用browser/cat/print等参数来发现你的问题。

    tryCatch-list方法

    使用stop()函数这不是一个好方法,因为当你收到一个错误信息时,很可能这个错误信息你在很久之前写的,都快忘掉了,但是当你的程序跑了1,2天后,突然弹出这个错误,就只因为这一个错误,你的程序终止了,并把你之前的做的计算全部扔掉了,这是很讨厌的。为此,你可以尝试使用tryCatch去捕捉那些错误,从而使得出现错误后程序还能继续执行:

    foreach(x=list(1, 2, "a"))  %dopar%  
    {
      tryCatch({
        c(1/x, x, 2^x)
      }, error = function(e) return(paste0("The variable '", x, "'", 
                                          " caused the error: '", e, "'")))
    }
    [[1]]
    [1] 1 1 2
     
    [[2]]
    [1] 0.5 2.0 4.0
     
    [[3]]
    [1] "The variable 'a' caused the error: 'Error in 1/x: non-numeric argument to binary operator
    '"
    

     这也正是我喜欢list的原因,它可以方便的将所有相关的数据输出,而不是只输出一个错误信息。这里有一个使用rbindlapply进行conbine的例子:

     

    out <- lapply(1:3, function(x) c(x, 2^x, x^x))
    do.call(rbind, out)
     [,1] [,2] [,3]
    [1,]    1    2    1
    [2,]    2    4    4
    [3,]    3    8   27
    

    创建一个文件输出

    当我们无法在控制台观测每个工作时,我们可以设置一个共享文件,让结果输出到文件当中,这是一个想当舒服的解决方案:

    cl<-makeCluster(no_cores, outfile = "debug.txt")
    registerDoParallel(cl)
    foreach(x=list(1, 2, "a"))  %dopar%  
    {
      print(x)
    }
    stopCluster(cl)
    
    starting worker pid=7392 on localhost:11411 at 00:11:21.077
    starting worker pid=7276 on localhost:11411 at 00:11:21.319
    starting worker pid=7576 on localhost:11411 at 00:11:21.762
    [1] 2]
     
    [1] "a"
    

    创建一个结点专用文件

    一个或许更为有用的选择是创建一个结点专用的文件,如果你的数据集存在一些问题的时候,可以方便观测:

    cl<-makeCluster(no_cores, outfile = "debug.txt")
    registerDoParallel(cl)
    foreach(x=list(1, 2, "a"))  %dopar%  
    {
      cat(dput(x), file = paste0("debug_file_", x, ".txt"))
    } 
    stopCluster(cl)
    

    partools

    partools这个包有一个dbs()函数或许值得一看(使用非windows系统值得一看),他允许你联合多个终端给每个进程进行debug。

    Caching

    当做一个大型计算时,我强烈推荐使用一些缓存。这或许有多个原因你想要结束计算,但是要遗憾地浪费了计算的宝贵的时间。这里有一个包可以做缓存,R.cache,但是我发现自己写个函数来实现更加简单。你只需要嵌入digest包就可以。digest()函数是一个散列函数,把一个R对象输入进去可以输出一个md5值或sha1等从而得到一个唯一的key值,当你key匹配到你保存的cache中的key时,你就可以继续你的计算了,而不需要将算法重新运行,以下是一个使用例子:

    cacheParallel <- function(){
      vars <- 1:2
      tmp <- clusterEvalQ(cl, 
                          library(digest))
     
      parSapply(cl, vars, function(var){
        fn <- function(a) a^2
        dg <- digest(list(fn, var))
        cache_fn <- 
          sprintf("Cache_%s.Rdata", 
                  dg)
        if (file.exists(cache_fn)){
          load(cache_fn)
        }else{
          var <- fn(var); 
          Sys.sleep(5)
          save(var, file = cache_fn)
        }
        return(var)
      })
    }
    

    这个例子很显然在第二次运行的时候并没有启动Sys.sleep,而是检测到了你的cache文件,加载了上一次计算后的cache,你就不必再计算Sys.sleep了,因为在上一次已经计算过了。

    system.time(out <- cacheParallel())
    # user system elapsed
    # 0.003 0.001 5.079
    out
    # [1] 1 4
    system.time(out <- cacheParallel())
    # user system elapsed
    # 0.001 0.004 0.046
    out
    # [1] 1 4
     
    # To clean up the files just do:
    file.remove(list.files(pattern = "Cache.+.Rdata"))
    

    载入平衡

    任务载入

    需要注意的是,无论parLapply还是foreach都是一个包装(wrapper)的函数。这意味着他们不是直接执行并行计算的代码,而是依赖于其他函数实现的。在parLapply中的定义如下:

    parLapply <- function (cl = NULL, X, fun, ...) 
    {
        cl <- defaultCluster(cl)
        do.call(c, clusterApply(cl, x = splitList(X, length(cl)), 
            fun = lapply, fun, ...), quote = TRUE)
    }
    

    注意到splitList(X, length(cl)) ,他会将任务分割成多个部分,然后将他们发送到不同的集群中。如果你有很多cache或者存在一个任务比其他worker中的任务都大,那么在这个任务结束之前,其他提前结束的worker都会处于空闲状态。为了避免这一情况,你需要将你的任务尽量平均分配给每个worker。举个例子,你要计算优化神经网络的参数,这一过程你可以并行地以不同参数来训练神经网络,你应该将如下代码:

    # From the nnet example
    parLapply(cl, c(10, 20, 30, 40, 50), function(neurons) 
      nnet(ir[samp,], targets[samp,],
           size = neurons))
    

    改为:

    # From the nnet example
    parLapply(cl, c(10, 50, 30, 40, 20), function(neurons) 
      nnet(ir[samp,], targets[samp,],
           size = neurons))
    

    内存载入

    在大数据的情况下使用并行计算会很快的出现问题。因为使用并行计算会极大的消耗内存,你必须要注意不要让你的R运行内存到达内存的上限,否则这将会导致崩溃或非常缓慢。使用Forks是一个控制内存上限的一个重要方法。Fork是通过内存共享来实现,而不需要额外的内存空间,这对性能的影响是很显著的(我的系统时16G内存,8核心):

    > rm(list=ls())
    > library(pryr)
    > library(magrittr)
    > a <- matrix(1, ncol=10^4*2, nrow=10^4)
    > object_size(a)
    1.6 GB
    > system.time(mean(a))
       user  system elapsed 
      0.338   0.000   0.337
    > system.time(mean(a + 1))
       user  system elapsed 
      0.490   0.084   0.574
    > library(parallel)
    > cl <- makeCluster(4, type = "PSOCK")
    > system.time(clusterExport(cl, "a"))
       user  system elapsed 
      5.253   0.544   7.289
    > system.time(parSapply(cl, 1:8, 
                            function(x) mean(a + 1)))
       user  system elapsed 
      0.008   0.008   3.365
    > stopCluster(cl); gc();
    > cl <- makeCluster(4, type = "FORK")
    > system.time(parSapply(cl, 1:8, 
                            function(x) mean(a + 1)))
       user  system elapsed 
      0.009   0.008   3.123
    > stopCluster(cl)
    

    FORKs可以让你并行化从而不用崩溃:

    > cl <- makeCluster(8, type = "PSOCK")
    > system.time(clusterExport(cl, "a"))
       user  system elapsed 
     10.576   1.263  15.877
    > system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
    Error in checkForRemoteErrors(val) : 
      8 nodes produced errors; first error: cannot allocate vector of size 1.5 Gb
    Timing stopped at: 0.004 0 0.389
    > stopCluster(cl)
    > cl <- makeCluster(8, type = "FORK")
    > system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
       user  system elapsed 
      0.014   0.016   3.735
    > stopCluster(cl)
    

    当然,他并不能让你完全解放,如你所见,当我们创建一个中间变量时也是需要消耗内存的:

    > a <- matrix(1, ncol=10^4*2.1, nrow=10^4)
    > cl <- makeCluster(8, type = "FORK")
    > parSapply(cl, 1:8, function(x) {
    +   b <- a + 1
    +   mean(b)
    +   })
    Error in unserialize(node$con) : error reading from connection</code>
    

    内存建议

    尽量使用rm()避免无用的变量 尽量使用gc()释放内存。即使这在R中是自动执行的,但是当它没有及时执行,在一个并行计算的情况下,如果没有及时释放内存,那么它将不会将内存返回给操作系统,从而影响了其他worker的执行。 通常并行化在大规模运算下很有用,但是,考虑到R中的并行化存在内存的初始化成本,所以考虑到内存的情况下,显然小规模的并行化可能会更有用。 有时候在并行计算时,不断做缓存,当达到上限时,换回串行计算。 你也可以手动的控制每个核所使用的内存数量,一个简单的方法就是:memory.limit()/memory.size() = max cores

    其他建议

    一个常用的CPU核数检测函数:max(1, detectCores() - 1)

    永远不要使用set.seed(),使用clusterSetRNGStream()来代替设置种子,如果你想重现结果。 如果你有Nvidia 显卡,你可以尝试使用gputools 包进行GPU加速(警告:安装可能会很困难) 当使用mice并行化时记得使用ibind()来合并项。

    转自:

    https://www.2cto.com/kf/201606/517963.html  


    R中的apply族函数和多线程计算

    一.apply族函数

    1.apply  应用于矩阵和数组

    # apply
    # 1代表行,2代表列
    # create a matrix of 10 rows x 2 columns
    m <- matrix(c(1:10, 11:20), nrow = 10, ncol = 2)
    # mean of the rows
    apply(m, 1, mean)
    [1] 6 7 8 9 10 11 12 13 14 15
    # mean of the columns
    apply(m, 2, mean)
    [1] 5.5 15.5
    # divide all values by 2
    apply(m, 1:2, function(x) x/2)
    

      

     

    2.eapply 应用于环境中的变量

    # a new environment
    e <- new.env()
    # two environment variables, a and b
    e$a <- 1:10
    e$b <- 11:20
    # mean of the variables
    eapply(e, mean)
    $b
    [1] 15.5
     
    $a
    [1] 5.5
    

      

     

    3.lapply应用于列表,返回列表,实际data.frame也是一种list,一种由多个长度相同的向量cbind一起的list:lapply(list, function)

    sapply(iris[,1:4],mean)
    Sepal.Length  Sepal.Width Petal.Length  Petal.Width
    5.843333     3.057333     3.758000     1.199333
     
    lapply(iris[,1:4],mean)
    $Sepal.Length
    [1] 5.843333
     
    $Sepal.Width
    [1] 3.057333
     
    $Petal.Length
    [1] 3.758
     
    $Petal.Width
    [1] 1.199333
    

      

     

    4.sapply 是lapply的友好形式.lapply和sapply都可应用于list,data.frame。只是返回的对象类型不一样,前者是list,后者看情况,如果是每一个list下面的元素长度都一样,返回的结果就会被就会简化。举例说明。

    # 下面两个返回的结果是一样一样的,都是list
    sapply(iris,unique)
    lapply(iris,unique)
     
    # 下面两个前者返回向量,后者返回list
    sapply(iris[,1:4],mean)
    lapply(iris[,1:4],mean)
     
    #下面两个前者返回data.frame,后者反回list
    sapply(iris[,1:4], function(x) x/2)
    lapply(iris[,1:4], function(x) x/2)
     
    # sapply会根据返回结果,选最合适的对象类型来存放对象,而list反悔的统统都是list
    # 以下两者返回结果一样
    library(magrittr)
    lapply(iris[,1:4],mean)%>%unlist()
    sapply(iris[,1:4],mean)
    

      

     

     5.vapply要求提供第三个参数,即输出的格式

    l <- list(a = 1:10, b = 11:20)
    # fivenum of values using vapply
    l.fivenum <- vapply(l, fivenum, c(Min.=0, "1st Qu."=0, Median=0, "3rd Qu."=0, Max.=0))
    class(l.fivenum)
    [1] "matrix"
    # let's see it
    l.fivenum
               a    b
    Min.     1.0 11.0
    1st Qu.  3.0 13.0
    Median   5.5 15.5
    3rd Qu.  8.0 18.0
    Max.    10.0 20.0
    

      

     

    6.replicate
    Description: “replicate is a wrapper for the common use of sapply for repeated evaluation of an expression (which will usually involve random number generation).”

    replicate(10, rnorm(10))
    

      

     

    7.mapply可传递多个参数进去.

    mapply is a multivariate version of sapplymapply applies FUN to the first elements of each ... argument, the second elements, the third elements, and so on. Arguments are recycled if necessary.

    l1 <- list(a = c(1:10), b = c(11:20))
    l2 <- list(c = c(21:30), d = c(31:40))
    # sum the corresponding elements of l1 and l2
    mapply(sum, l1$a, l1$b, l2$c, l2$d)
    [1]  64  68  72  76  80  84  88  92  96 100
    #mapply像是可以传递多个参数的saply
    mapply(rep, 1:4, 5)
    [,1] [,2] [,3] [,4]
    [1,]    1    2    3    4
    [2,]    1    2    3    4
    [3,]    1    2    3    4
    [4,]    1    2    3    4
    [5,]    1    2    3    4
    

      

     

     8.rapply
    Description: “rapply is a recursive version of lapply.”

    # let's start with our usual simple list example
    l <- list(a = 1:10, b = 11:20)
    # log2 of each value in the list
    rapply(l, log2)
          a1       a2       a3       a4       a5       a6       a7       a8
    0.000000 1.000000 1.584963 2.000000 2.321928 2.584963 2.807355 3.000000
          a9      a10       b1       b2       b3       b4       b5       b6
    3.169925 3.321928 3.459432 3.584963 3.700440 3.807355 3.906891 4.000000
          b7       b8       b9      b10
    4.087463 4.169925 4.247928 4.321928
    # log2 of each value in each list
    rapply(l, log2, how = "list")
    $a
     [1] 0.000000 1.000000 1.584963 2.000000 2.321928 2.584963 2.807355 3.000000
     [9] 3.169925 3.321928
      
    $b
     [1] 3.459432 3.584963 3.700440 3.807355 3.906891 4.000000 4.087463 4.169925
     [9] 4.247928 4.321928
      
    # what if the function is the mean?
    rapply(l, mean)
       a    b
     5.5 15.5
      
    rapply(l, mean, how = "list")
    $a
    [1] 5.5
      
    $b
    [1] 15.5
    

      

     

    二.多线程计算

    下面用欧拉问题14,来演示R中的向量化编程(利用apply组函数)和多线程

    #-----Longest Collatz sequence Problem 14
    func <- function(x) {
        n = 1
        raw <- x
        while (x > 1) {
            x <- ifelse(x%%2==0,x/2,3*x+1)
            n = n + 1
        }
        return(c(raw,n))
    }
     
    #方法1 向量化编程
    library(magrittr)
    system.time({
        x <- 1:1e5
        res1 <- sapply(x, func)%>%t()
    })
     
    用户   系统   流逝
    37.960  0.360 41.315
     
    #方法2 向量化编程
    system.time({
        x <- 1:1e5
        res2 <- do.call('rbind',lapply(x,func))
    })
     
    用户   系统   流逝
    36.031  0.181 36.769
     
    #方法3 多线程计算
    library(parallel)
    # 用system.time来返回计算所需时间
    system.time({
        x <- 1:1e5
        cl <- makeCluster(4) # 初始化四核心集群
        results <- parLapply(cl,x,func) # lapply的并行版本
        res.df <- do.call('rbind',results) # 整合结果
        stopCluster(cl) # 关闭集群
    })
     
    用户   系统   流逝
    0.199  0.064 20.038
     
    # 方法4 for 循环
    system.time({
        m <- matrix(nrow = 0,ncol = 2)
        for(i in 1:1e5){
            m <- rbind(m,func(i))
        }
    })
     
    #方法4用时太长
    

      

     

    转自:

    http://www.cnblogs.com/litao1105/p/5573373.html  

      

      

      

      

      

      

      

      

      

      

      

      

      

      

      

      

      

     

      

      

  • 相关阅读:
    windows下面编译redis5.0.5
    在ASP.NET Core 2.x中获取客户端IP地址
    https网站访问第三方https网站时候报错: The request was aborted:Could not create SSL/TLS secure channel.
    大量日志Login failed for user 'sa'. 原因: 密码与所提供的登录名不匹配。 [客户端: x.x.x.x] 导致Sql Server 的ErrorLog文件过大几十G
    一次清除SQL SERVER错误日志(ErrorLog)的体会!
    notepad++正则抽取所有符合条件的字符串
    Android编译执行envsetup.sh,产生工具命令m、mm、mmm、mmma、tapas 、croot、cgrep、jgrep、 resgrep、godir
    全志Tina_dolphin播放音视频裸流(h264,pcm)验证
    全志Linux Tina编译demoOmxVdec错误
    linux epoll学习
  • 原文地址:https://www.cnblogs.com/xianhan/p/9562239.html
Copyright © 2011-2022 走看看