2012-06-11 10 views
39

.progress = 'text'plyr'sllplyに大好きです。しかし、それはリスト項目がさまざまなコアに送られてから最終的に照合されるので、mclapply(パッケージmulticoreから)までにどれくらいの距離があるのか​​分からなくなることが非常に心配です。mclapplyで進行状況を追跡する方法はありますか?

私は*currently in sim_id # ....*のようなメッセージを出力していますが、これはあまり役に立ちません。なぜなら、リスト項目の何パーセントが完了したのかの指標を与えないからです。(私のスクリプトが立ち往生していないことを知ることは有用です。一緒に動く)。

誰かが私の.Routファイルを見て進歩を感じることができる他のアイデアを提案できますか?私はマニュアルカウンタを追加することを考えましたが、どのように実装するのかわかりません。mclapplyは、フィードバックを出す前にすべてのリスト項目を処理しなければならないからです。

+1

同様の質問については、私の答えを見てください:http://stackoverflow.com/a/5431265/653825 – otsaw

+0

@fotNeltonの優れた答え、および再利用のためにそれに基づいて他の人。一回限りの 'mclapply'呼び出しの進捗状況を確認するための迅速な解決策として、ワーカー関数では' cat( "。") 'だけでも可能です。 – codeola

+0

優れた質問ですが、 'package multiore'はもはや利用できません。パッケージ' multicore'なしで回避策がありますか? – forecaster

答えて

26

mclapplyは複数のプロセスを生成するため、FIFO、パイプ、またはソケットを使用することができます。その唯一の義務現在の進行状況を報告することである子を

ここ
library(multicore) 

finalResult <- local({ 
    f <- fifo(tempfile(), open="w+b", blocking=T) 
    if (inherits(fork(), "masterProcess")) { 
     # Child 
     progress <- 0.0 
     while (progress < 1 && !isIncomplete(f)) { 
      msg <- readBin(f, "double") 
      progress <- progress + as.numeric(msg) 
      cat(sprintf("Progress: %.2f%%\n", progress * 100)) 
     } 
     exit() 
    } 
    numJobs <- 100 
    result <- mclapply(1:numJobs, function(...) { 
     # Dome something fancy here 
     # ... 
     # Send some progress update 
     writeBin(1/numJobs, f) 
     # Some arbitrary result 
     sample(1000, 1) 
    }) 
    close(f) 
    result 
}) 

cat("Done\n") 

、一時ファイルがFIFOとして使用され、メインプロセスのフォーク:今すぐ次の例を考えてみましょう。メインプロセスは、mclapplyを呼び出すことによって継続され、評価される式(より正確には、式ブロック)は、writeBinによってFIFOの部分進行情報を書き込む。

これは単なる例に過ぎないので、おそらく出力のすべてをニーズに合わせる必要があります。 HTH!

+0

これは、標準関数 'message'と' sink'を使うことと効果的に違いますか?すべての子プロセスからのメッセージは遅延なしで同じシンクに送られます。 – otsaw

+2

'mclapply'の場合、メインプロセスはすべての子プロセスが終了するのを待っているので、別の子プロセスをフォークせずに' mclapply'が動作している間にメッセージを受信し処理することはできません。 – fotNelton

+0

@fotNelton:私の経験に基づいて、子プロセスは、親プロセスのものと同じstdoutとstderrを遅滞なく送信するようです。しかし、これはOSに依存しているのでしょうか? – otsaw

7

ここには、通常、mcapplyを使用する場所に適用する@fotNelton's solutionに基づく関数があります。

mcadply <- function(X, FUN, ...) { 
    # Runs multicore lapply with progress indicator and transformation to 
    # data.table output. Arguments mirror those passed to lapply. 
    # 
    # Args: 
    # X: Vector. 
    # FUN: Function to apply to each value of X. Note this is transformed to 
    #  a data.frame return if necessary. 
    # ...: Other arguments passed to mclapply. 
    # 
    # Returns: 
    # data.table stack of each mclapply return value 
    # 
    # Progress bar code based on https://stackoverflow.com/a/10993589 
    require(multicore) 
    require(plyr) 
    require(data.table) 

    local({ 
    f <- fifo(tempfile(), open="w+b", blocking=T) 
    if (inherits(fork(), "masterProcess")) { 
     # Child 
     progress <- 0 
     print.progress <- 0 
     while (progress < 1 && !isIncomplete(f)) { 
     msg <- readBin(f, "double") 
     progress <- progress + as.numeric(msg) 
     # Print every 1% 
     if(progress >= print.progress + 0.01) { 
      cat(sprintf("Progress: %.0f%%\n", progress * 100)) 
      print.progress <- floor(progress * 100)/100 
     } 
     } 
     exit() 
    } 

    newFun <- function(...) { 
     writeBin(1/length(X), f) 
     return(as.data.frame(FUN(...))) 
    } 

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...))) 
    close(f) 
    cat("Done\n") 
    return(result) 
    }) 
} 
2

@fotNelsonの回答に基づいて、行単位で印刷する代わりにプログレスバーを使用し、mclapplyで外部関数を呼び出します。 .GlobalEnvにFIFO接続を割り当てる

library('utils') 
library('multicore') 

prog.indic <- local({ #evaluates in local environment only 
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection 
    assign(x='f',value=f,envir=.GlobalEnv) 
    pb <- txtProgressBar(min=1, max=MC,style=3) 

    if (inherits(fork(), "masterProcess")) { #progress tracker 
     # Child 
     progress <- 0.0 
     while (progress < MC && !isIncomplete(f)){ 
      msg <- readBin(f, "double") 
       progress <- progress + as.numeric(msg) 

      # Updating the progress bar. 
      setTxtProgressBar(pb,progress) 
      } 


     exit() 
     } 
    MC <- 100 
    result <- mclapply(1:MC, .mcfunc) 

    cat('\n') 
    assign(x='result',value=result,envir=.GlobalEnv) 
    close(f) 
    }) 

.mcfunc<-function(i,...){ 
     writeBin(1, f) 
     return(i) 
    } 

はmclapplyコールの外側関数からそれを使用する必要があります。質問と以前の返答をありがとう、私はしばらくの間これを行う方法を考えていた。

11

は基本的に@ fotNelsonのソリューションの別のバージョンを追加することなく、いくつかの変更で:mclapplyの代替で

  • ドロップ建て使用し
  • キャッチCtrl-Cを呼び出し(すべてmclapply機能をサポート)と優雅に中止されます進行状況バー(txtProgressBar)
  • 進捗状況を追跡し、進行状況バーを指定するオプション
  • は、むしろparallelを使用しますこれは誰かに役に立てば幸いトップ

  • roxygen2スタイルのドキュメント
  • 今CRAN
  • から削除されましたmulticoreは、Xがmclapplyごとに一覧表示するには強制的に変換(その長さ(X)は、期待される結果を与える)より.. 。

    library(parallel) 
    
    #------------------------------------------------------------------------------- 
    #' Wrapper around mclapply to track progress 
    #' 
    #' Based on http://stackoverflow.com/questions/10984556 
    #' 
    #' @param X   a vector (atomic or list) or an expressions vector. Other 
    #'     objects (including classed objects) will be coerced by 
    #'     ‘as.list’ 
    #' @param FUN  the function to be applied to 
    #' @param ...  optional arguments to ‘FUN’ 
    #' @param mc.preschedule see mclapply 
    #' @param mc.set.seed see mclapply 
    #' @param mc.silent see mclapply 
    #' @param mc.cores see mclapply 
    #' @param mc.cleanup see mclapply 
    #' @param mc.allow.recursive see mclapply 
    #' @param mc.progress track progress? 
    #' @param mc.style style of progress bar (see txtProgressBar) 
    #' 
    #' @examples 
    #' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01)) 
    #' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1) 
    #' 
    #' dat <- lapply(1:10, function(x) rnorm(100)) 
    #' func <- function(x, arg1) mean(x)/arg1 
    #' mclapply2(dat, func, arg1=10, mc.cores=2) 
    #------------------------------------------------------------------------------- 
    mclapply2 <- function(X, FUN, ..., 
        mc.preschedule = TRUE, mc.set.seed = TRUE, 
        mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L), 
        mc.cleanup = TRUE, mc.allow.recursive = TRUE, 
        mc.progress=TRUE, mc.style=3) 
    { 
        if (!is.vector(X) || is.object(X)) X <- as.list(X) 
    
        if (mc.progress) { 
         f <- fifo(tempfile(), open="w+b", blocking=T) 
         p <- parallel:::mcfork() 
         pb <- txtProgressBar(0, length(X), style=mc.style) 
         setTxtProgressBar(pb, 0) 
         progress <- 0 
         if (inherits(p, "masterProcess")) { 
          while (progress < length(X)) { 
           readBin(f, "double") 
           progress <- progress + 1 
           setTxtProgressBar(pb, progress) 
          } 
          cat("\n") 
          parallel:::mcexit() 
         } 
        } 
        tryCatch({ 
         result <- mclapply(X, ..., function(...) { 
           res <- FUN(...) 
           if (mc.progress) writeBin(1, f) 
           res 
          }, 
          mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed, 
          mc.silent = mc.silent, mc.cores = mc.cores, 
          mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive 
         ) 
    
        }, finally = { 
         if (mc.progress) close(f) 
        }) 
        result 
    } 
    
  • +0

    このバージョンでは、実際にタスクの進行状況は表示されません。プログレスバーは0%から始まり、そこにとどまります。 – Ariel

    +0

    OKはバグでなければなりません - 私はそれを調べます... – wannymahoots

    +0

    この機能はOS XとLinux上で動作しますので、Windowsの問題です。 – wannymahoots

    6

    pbapplyパッケージは、一般的なケースでこれを実装しています。 pblapplypbsapplyの両方にcl引数があります。 documentation

    cl引数で並列処理を有効にすることができます。 parLapply は、clが 'cluster'オブジェクトのときに呼び出され、clが の整数のときにmclapplyが呼び出されます。プログレスバーを表示すると、 のメインプロセスとノード/子プロセスの間のオーバーヘッドが、 と、プログレスバーのない機能のパラレル同等に比較されます。 プログレスバーが無効になっている場合(つまり、getOption("pboptions")$type == "none" dopb()FALSEの場合)、元の機能に戻ります。 interactive() の場合はFALSE(コマンドラインRスクリプトから呼び出されます)の場合のデフォルトです。

    clを供給(またはNULLを通過)しない場合、デフォルトの非平行lapplyもプログレスバーなど、使用されています。

    関連する問題