トップ 差分 一覧 ソース 検索 ヘルプ RSS ログイン

BugTrack-R備忘録/20

R備忘録 /状態空間モデリング/donlp2/その他のメモ

R備忘録 - 記事一覧

R(R言語)で並列処理 - multicore

  • 投稿者: みゅ
  • カテゴリ: なし
  • 優先度: 普通
  • 状態: 完了
  • 日時: 2009年01月17日 21時55分10秒

内容

簡単な説明

  • 今実行しているRの環境とまったく同じプロセスを作ります.つまり自分自身のクローンを「子プロセス」として、実行.
    • Rserveなんかといっしょですね.
  • その「子プロセス」にコマンドを渡して、実行させる.でそのとき、渡した先が別のプロセスなので、今見ている実行環境(親プロセス)には、なにも影響を与えません.
  • しかも、レスポンスがすぐ帰ってくる(裏では別のRプロセスが動いている)
  • CPUが2個以上あって、並列処理できるスクリプトなら、効果があるということでしょう.
  • 別のRをわざわざ立ち上げるより何がいいかというと
    • 自分自身のクローンをforkするので、Rを立ち上げるという重い処理が不要.このRを立ち上げる処理って結構重いんですよ・・・
    • 自分自身のクローンなので、そこまでに作成したメモリ上に乗っている変数を、その子プロセスで参照できる.
      • もちろん、子プロセスを立ち上げた後に作成した親プロセスの変数は、子プロセスからは見えない.
      • だけど後で述べる、親子間の通信で、変数というかRのオブジェクトを親子間でやり取りすることはできる.
  • ちょっとだめなとこ
    • 自分自身のクローンを作成するので、親となるRが馬鹿でかいメモリを使っていたら、子プロセスも馬鹿でかくなる.もちろん、子プロセスを作った後に、子プロセスでオブジェクトを削除してgcすればメモリ使用量は減るけど・・・
    • ここだけ気をつければよいかな・・・?つまり親プロセスはなるべくスリムにしておくこと・・・
  • Rオブジェクトのプロセス間のやり取りは【Raw Vectors】でやり取りする.【Raw Vectors】に変換できないオブジェクトは別のオブジェクトに分解してやり取りする必要がある.データフレームなど、リスト形式のオブジェクトはそのままでは、【Raw Vectors】に変換できないので、分解する必要がある.
    • これなんとかならないかなぁ・・・

使い方

  • 「parallel」とか「mclapply」とか、簡単に?使える関数もあるけど、やっぱり子プロセスと会話をしたいので、そこらへんから.

子プロセスを作る

  • 子プロセスを作ります.
> f <- fork()
> >
  • なんと、そこは子プロセスの世界.エンターで改行すると、親の世界に戻ります.
  • 一個目の「>」が親プロセスのプロンプト.2個目の「>」が子プロセスのプロンプト.この状態で以下のように入力してみます.
> > iris2 <- iris[,1]
    • 親プロセスと子プロセス両方待ち受けているので、親プロセスでもiris2というオブジェクトができます.
    • これで、子プロセスの標準入力は終了します.

子プロセスから親プロセスへオブジェクトを送信する

  • このiris2が子プロセスでも作られているか確認します.
> sendChildStdin(f,"sendMaster(iris2)\n")
sendMaster(iris2)
[1] TRUE
> [1] TRUE
  • 子プロセス「f」の標準入力に「sendMaster(iris2)\n」というコマンドを送ります.「sendMaster」関数は、子プロセスから親プロセスにオブジェクトをプッシュする関数です.
  • これを親プロセスで受け取ります.
> s <- selectChildren(f)
> s
[1] 16340
> r <- readChild(f)
  • まず「selectChildren」関数で子プロセスが読み取りできるかどうかを確認します.今、読み取れる状態なので、「readChild」関数を使って、子プロセスから送られたオブジェクトを受け取ります.
  • 以下受け取った「r」の中身
> r
   [1] 58 0a 00 00 00 02 00 02 08 01 00 02 03 00 00 00 00 0e 00 00 00 96 40 14
  [25] 66 66 66 66 66 66 40 13 99 99 99 99 99 9a 40 12 cc cc cc cc cc cd 40 12
  [49] 66 66 66 66 66 66 40 14 00 00 00 00 00 00 40 15 99 99 99 99 99 9a 40 12
・・・(続く)
  • これがいわゆる【Raw Vectors】.iris2の【Raw Vectors】らしいです.普通のオブジェクトに直します.
> unserialize(r)
  [1] 5.1 4.9 4.7 4.6 5.0 5.4 4.6 5.0 4.4 4.9 5.4 4.8 4.8 4.3 5.8 5.7 5.4 5.1
 [19] 5.7 5.1 5.4 5.1 4.6 5.1 4.8 5.0 5.0 5.2 5.2 4.7 4.8 5.4 5.2 5.5 4.9 5.0
 [37] 5.5 4.9 4.4 5.1 5.0 4.5 4.4 5.0 5.1 4.8 5.1 4.6 5.3 5.0 7.0 6.4 6.9 5.5
 [55] 6.5 5.7 6.3 4.9 6.6 5.2 5.0 5.9 6.0 6.1 5.6 6.7 5.6 5.8 6.2 5.6 5.9 6.1
 [73] 6.3 6.1 6.4 6.6 6.8 6.7 6.0 5.7 5.5 5.5 5.8 6.0 5.4 6.0 6.7 6.3 5.6 5.5
 [91] 5.5 6.1 5.8 5.0 5.6 5.7 5.7 6.2 5.1 5.7 6.3 5.8 7.1 6.3 6.5 7.6 4.9 7.3
[109] 6.7 7.2 6.5 6.4 6.8 5.7 5.8 6.4 6.5 7.7 7.7 6.0 6.9 5.6 7.7 6.3 6.7 7.2
[127] 6.2 6.1 6.4 7.2 7.4 7.9 6.4 6.3 6.1 7.7 6.3 6.4 6.0 6.9 6.7 6.9 5.8 6.8
[145] 6.7 6.7 6.3 6.5 6.2 5.9
  • iris[,1]と同じです.

親プロセスから子プロセスへオブジェクトを送信する

  • 「sendChildStdin」関数を使います.2番目の引数に【Raw Vectors】形式のオブジェクトを渡すと良いらしい.
> sendChildStdin(f, as.raw(r))
  • バグってますかな・・・
  • ならダンプしたテキストを渡せばいいか・・・
iris3 <- iris
textCn <- textConnection("tmp", "w")
dump("iris3", textCn)
close(textCn)
tmp
 [1] "`iris3` <-"
 [2] "structure(list(Sepal.Length = c(5.1, 4.9, 4.7, 4.6, 5, 5.4, 4.6, "
 [3] "5, 4.4, 4.9, 5.4, 4.8, 4.8, 4.3, 5.8, 5.7, 5.4, 5.1, 5.7, 5.1, "
・・・(続く)
invisible(sendChildStdin(f, "multicore:::closeStdout()\n"))
tmp <- paste("invisible(", paste(tmp, collapse="\n"), ")\n", sep="")
sendChildStdin(f, tmp)
  • これでできているはず.
  • 「sendChildStdin(f, "multicore:::closeStdout()\n")」は子プロセスのほうで、出力される標準出力を抑制する.これがないと、いちいち表示がされてみにくいので.でも開発中・デバッグ中は出力はあったほうがいいですね・・・
  • この後ろのほうの「子プロセス内でコマンドを評価する関数を作る」をご参照ください

parallel

  • いちいち、標準出力を抑制するのとかめんどくさいので子プロセス作るときは「parallel」使ったほうが良いかも.
> f <- parallel(1:10, silent = T)
> f
 parallelJob: processID=16533
  • 「1:10」のところは、何でもいいです.「silent = T」で、「multicore:::closeStdout()」を関数内部でやってくれます.
  • ただし、このままだと「1:10」の結果を親プロセスに返そうとしているので、
collect(f)
  • で、結果を受け取っておいたほうが良い?
> collect(f)
$`16589`
 [1]  1  2  3  4  5  6  7  8  9 10
  • この子プロセスを1個作った状態で、以下のようになっています.
$ ps -ejH
・・・
12488 12488 12488 pts/2    00:00:00     bash
16584 16584 12488 pts/2    00:00:00       R
16589 16584 12488 pts/2    00:00:00         R
・・・

子プロセスから親プロセスへオブジェクトを送信する

  • 先ほど子プロセスから親プロセスへオブジェクトを渡すときに、いろいろとめんどくさいことをしたけれど、「collect」関数を使うともしかしてらくちん?
sendChildStdin(f,"iris4 <- iris[,2]\n")
sendChildStdin(f,"sendMaster(iris4)\n")
collect(f)
  • 実行
> sendChildStdin(f,"iris4 <- iris[,2]\n")
Error in FUN(16589L[[1L]], ...) : child 16589 doesn't exist
  • って、子プロセスがいなぁぁぁぁい.
12488 12488 12488 pts/2    00:00:00     bash
16584 16584 12488 pts/2    00:00:00       R
16589 16584 12488 pts/2    00:00:00         R <defunct>
  • おなくなりになってます・・・.仕様ですか?「collect」したらプロセス殺しちゃうんだろうか・・・
    • どうも「parallel」関数の中で、オブジェクトを渡した後に、「exit(0)」というので、殺しちゃってるみたいです.「parallel」を参考に関数を作ってみた.
createChild <-
function (name, mc.set.seed = FALSE, silent = FALSE)
{
    f <- fork()
    env <- parent.frame()
    if (inherits(f, "masterProcess")) {
        #on.exit(exit(1, structure("fatal error in wrapper code",
        #    class = "try-error")))
        if (isTRUE(mc.set.seed))
            set.seed(Sys.getpid())
        if (isTRUE(silent))
            multicore:::closeStdout()
        #sendMaster(try(eval(expr, env), silent = TRUE))
        #exit(0)
    }
    if (!missing(name) && !is.null(name))
        f$name <- as.character(name)[1]
    class(f) <- c("parallelJob", class(f))
    f
}
  • これでどうかな
> f <- createChild(silent = T)
> f
 parallelJob: processID=16756
> sendChildStdin(f,"iris4 <- iris[,2]\n")
[1] TRUE
> sendChildStdin(f,"sendMaster(iris4)\n")
[1] TRUE
> collect(f,wait=F)
[[1]]
  [1] 3.5 3.0 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 3.7 3.4 3.0 3.0 4.0 4.4 3.9 3.5
 [19] 3.8 3.8 3.4 3.7 3.6 3.3 3.4 3.0 3.4 3.5 3.4 3.2 3.1 3.4 4.1 4.2 3.1 3.2
 [37] 3.5 3.6 3.0 3.4 3.5 2.3 3.2 3.5 3.8 3.0 3.8 3.2 3.7 3.3 3.2 3.2 3.1 2.3
 [55] 2.8 2.8 3.3 2.4 2.9 2.7 2.0 3.0 2.2 2.9 2.9 3.1 3.0 2.7 2.2 2.5 3.2 2.8
 [73] 2.5 2.8 2.9 3.0 2.8 3.0 2.9 2.6 2.4 2.4 2.7 2.7 3.0 3.4 3.1 2.3 3.0 2.5
 [91] 2.6 3.0 2.6 2.3 2.7 3.0 2.9 2.9 2.5 2.8 3.3 2.7 3.0 2.9 3.0 3.0 2.5 2.9
[109] 2.5 3.6 3.2 2.7 3.0 2.5 2.8 3.2 3.0 3.8 2.6 2.2 3.2 2.8 2.8 2.7 3.3 3.2
[127] 2.8 3.0 2.8 3.0 2.8 3.8 2.8 2.8 2.6 3.0 3.4 3.1 3.0 3.1 3.1 3.1 2.7 3.2
[145] 3.3 3.0 2.5 3.0 3.4 3.0

> collect(f,wait=F)
NULL
  • 子プロセスも死んでないのでOK

子プロセスの終了の仕方

  • 正直、勝手に死んでることが多いので良くわかりません・・・.が
kill(f, SIGQUIT)
  • で、明示的に
16790 16584 12488 pts/2    00:00:00         R <defunct>
  • こういう状態にはすることができます.<defunct>にしないで、親プロセスのRを終了してしまうと
16684 16584 12488 pts/2    00:00:00   R
16685 16584 12488 pts/2    00:00:00   R
16740 16584 12488 pts/2    00:00:00   R
16754 16584 12488 pts/2    00:00:00   R
16755 16584 12488 pts/2    00:00:00   R
  • こんなひどいことになってしまうので、ご注意.
  • 親のR内で、子プロセスのフォーカスがなくなる前に「kill(***, SIGQUIT)」で<defunct>にしておきましょう.
  • というか「Rserve」みたいに、きちんと終わる方法はないんですか???

rmChild

  • 「rmChild」という関数を発見.これで子プロセスを落とすことができそう.
    • マニュアルには載ってないね・・・
> kill(f, SIGQUIT)
> multicore:::rmChild(f)
[1] TRUE
  • 子プロセスこれでは、落ちませんね・・・

子プロセス内でコマンドを評価する関数を作る

childEval <-
function( child, cmd, send=F ){
    if( !is.character(cmd) ){
        print("'cmd' must be character...")
        return
    }
    #cmd <- paste(cmd, "\n", sep="")
    if( send ) cmd <- paste("sendMaster(try({",cmd,"}, silent=T))\n", sep="")
    else cmd <- paste("try({",cmd,"}, silent=T)\n", sep="")
    #cat(cmd)
    ret <- sendChildStdin(child, cmd)
    s <- selectChildren(child)
    if (is.logical(s) || !length(s)){
        
    } else{
        cat("ready to get data...\n")
    }
    #if( is.numeric(s) ) cat("ready to get data...\n")
    ret
}
  • テスト
childEval(f, "x<-1:10")
collect(f, wait=F)

親プロセスから子プロセスにオブジェクトをわたす

  • 結局「sendChildStdin」関数で生のオブジェクトをそのまま渡すことはできないので.
textCn <- textConnection("sendCmd", "w")
dump("iris3", textCn)
close(textCn)
sendCmd
 [1] "`iris3` <-"
 [2] "structure(list(Sepal.Length = c(5.1, 4.9, 4.7, 4.6, 5, 5.4, 4.6, "
 [3] "5, 4.4, 4.9, 5.4, 4.8, 4.8, 4.3, 5.8, 5.7, 5.4, 5.1, 5.7, 5.1, "
・・・(続く)
#sendCmd <- paste("invisible(", paste(sendCmd, collapse="\n"), ")\n", sep="")
sendCmd <- paste(sendCmd, collapse="\n")
childEval(f, sendCmd)
#sendChildStdin(f, sendCmd)
  • ひとつの関数にまとめる
sendObj2Child <-
function( child, obj ){
    textCn <- textConnection("sendCmd", "w")
    dump(deparse(substitute(obj)), textCn)
    close(textCn)
    sendCmd <- paste(sendCmd, collapse="\n")
    #cat(sendCmd)
    childEval(child, sendCmd)
}
  • だけど結局、文字列の長さにも制限があるので、これも一般的には使えない.
    • 小さなオブジェクトにしか使えません.

×子プロセスからオブジェクトを取得する関数を作る×

getObj <-
function( child, what ){
    if( !is.character(what) ){
        print("'what' must be character...")
        return
    }
    cmd <- paste("sendMaster(try({",what,"}, silent=T))\n", sep="")
    print(cmd)
    ret <- sendChildStdin(child, cmd)
    s <- selectChildren(child)
    if( is.numeric(s) ) return( collect(child, wait=F) )
    NA
}
  • テスト
getObj (f, "x")
  • こっちはつかえないな・・・

コメント