2016-09-04 4 views
2

基本的には、私の状況では計算のリストが大きすぎてスレッドが多すぎるため、より少ないスレッドでパフォーマンスを試してみたいと思います。reply-channelのMailboxProcessorを使用して値を返す限定エージェントを作成する

// the trivial approach (and largely my current situation) 
let doWork() = 
    [1 .. 10] 
    |> List.map (fun i -> async { 
     do! Async.Sleep (100 * i) // longest thread will run 1 sec 
     return i * i    // some complex calculation returning a certain type 
     }) 
    |> Async.Parallel 
    |> Async.RunSynchronously  // works, total wall time 1s 

私の新しいアプローチは、このコードは(私はそれが動作しますが、テストされ、私はそれが価値ではなく、ユニットを返却する必要がある)this online snippet from Tomas Petricekに触発/借りています。

type LimitAgentMessage = 
    | Start of Async<int> * AsyncReplyChannel<int> 
    | Finished 

let threadingLimitAgent limit = MailboxProcessor.Start(fun inbox -> async { 

    let queue = System.Collections.Generic.Queue<_>() 
    let count = ref 0 
    while true do 
     let! msg = inbox.Receive() 
     match msg with 
     | Start (work, reply) -> queue.Enqueue((work, reply)) 
     | Finished -> decr count 
     if count.Value < limit && queue.Count > 0 then 
      incr count 
      let work, reply = queue.Dequeue() 
      // Start it in a thread pool (on background) 
      Async.Start(async { 
       let! x = work 
       do! async {reply.Reply x } 
       inbox.Post(Finished) 
      }) 
    }) 


// given a synchronous list of tasks, run each task asynchronously, 
// return calculated values in original order 
let worker lst = 
    // this doesn't work as expected, it waits for each reply 
    let agent = threadingLimitAgent 10 
    lst 
    |> List.map(fun x ->    
     agent.PostAndReply(
      fun replyChannel -> Start(x, replyChannel))) 

、代わりにこれで、元のコードになるでしょう:

let doWork() = 
    [1 .. 10] 
    |> List.map (fun i -> async { 
     do! Async.Sleep (100 * i) // longest thread will run 1 sec 
     return i * i    // some complex calculation returning a certain type 
     }) 
    |> worker  // worker is not working (correct output, runs 5.5s) 

すべてのすべてで、出力が正確である(それは計算を行い、回答をバック伝播)が、それ(限られた数の)スレッドではそうしません。

私はちょっと遊んでいますが、明白なことが分かっていないと思います(しかも、限られたスレッドのメールボックスプロセッサーの考え方が好きかもしれません)。

答えて

3

問題はagent.PostAndReplyへの呼び出しです。 PostAndReplyは作業が終了するまでブロックされます。 List.mapの内部でこれを呼び出すと、作業が順番に実行されます。 1つの解決策は、ブロックされないPostAndAsyncReplyを使用し、結果を返すための非同期ハンドルを返すことです。

let worker lst = 
    let agent = threadingLimitAgent 10 
    lst 
    |> List.map(fun x ->    
     agent.PostAndAsyncReply(
      fun replyChannel -> Start(x, replyChannel))) 
    |> Async.Parallel 

let doWork() = 
    [1 .. 10] 
    |> List.map (fun i -> async { 
     do! Async.Sleep (100 * i) 
     return i * i    
     }) 
    |> worker  
    |> Async.RunSynchronously 

もちろん、すべての非同期ハンドルを元に戻して並行して待機させることも可能です。

+0

素晴らしい!ありがとう。それを試して、今動作します。私は明らかに欠けていたことを知っていた。これがうまくいくかどうかはわかりません。私は結果を計算する方法を見つけようとしています(地図と折りたたみ)。小さな計算が多く、それぞれが独自のスレッドで実行するには多すぎます。あなたが上記より良い提案をしているなら、それはいつも歓迎です:)。 – Abel

+1

@Abelこの解決方法は問題ありません。あなたが測定し、あなたのパフォーマンスに満足していない場合は、[Nessos Streams](http://nessos.github.io/Streams/)または[Hopac](http://hopac.github。 io/Hopac/Hopac.html)。 – krontogiannis

+0

これは素晴らしい提案ですが、私はNessos Streamsに興味を持っています – Abel

関連する問題