2017-01-01 4 views
2

私はパイプを使用して、いくつかのHaskellのコードを持っている:HaskellのPipeライブラリと並行してPipeを作成するにはどうしたらいいですか?

module Main(main) where 
import Pipes 

a :: Producer Int IO() 
a = each [1..10] 

b :: Pipe Int Int IO() 
b = do 
    x <- await 
    yield (x*2) 
    b 

c :: Consumer Int IO() 
c = do 
    x <- await 
    lift $ print x 
    c 

main :: IO() 
main = runEffect $ a >-> b >-> c 

Pipes.Concurrent tutorialは、作業盗みと一緒に複数の労働者を使用して示しています。 bの中でどうやって同様のことをすることができますか? bには、設定された数の作業者を同時に使用して作業を実行したいと思います。

明らかに、並行処理はこの正確なケースでは役に立ちませんが、これは私が思いつく最も簡単な例です。実際の使用例では、限られた数のワーカーを同時に使用していくつかのWebリクエストを作成したいと考えています。

+1

https://hackage.haskell.org/package/pipes-async-0.1.1/docs/src/Pipes-Async.html#bufferは、複数のパイプをTQueueにフィードするように調整できるようですが、そこで '[Pipe ab] - > Pipe ab'を取得します。 1時間前に[機能リクエスト](https://github.com/jwiegley/pipes-async/issues/1)を投稿しました>:D – Gurkenglas

答えて

2

編集:あなたが尋ねていたことを誤解しました。 あなたはこれをパイプの中で行うことができるかもしれませんが、動機づけが何であるかはわかりません。私は再利用可能なパイプチェーンを構築することをお勧めしたいと思います。パイプ自体に組み込むと、最初のインが最初のアウトであることを保証する注文保証は失われます。

Work Stealingのセクションはあなたが探しているものですが、このコードは基本的にチュートリアルとはまったく同じですが、どのように動作するかを解説しましょう。

module Main(main) where 
import Pipes 
import Pipes.Concurrent 

import Control.Concurrent.Async (async, wait) 
import Control.Concurrent (threadDelay) 
import Control.Monad (forM) 

a :: Producer Int IO() 
a = each [1..10] 

b :: Pipe Int Int IO() 
b = do 
    x <- await 
    yield (x*2) 
    b 

c :: Consumer Int IO() 
c = do 
    x <- await 
    lift $ print x 
    c 

main :: IO() 
main = do 
    (output, input) <- spawn unbounded 
    feeder <- async $ do runEffect $ a >-> toOutput output 
         performGC 

    workers <- forM [1..3] $ \i -> 
    async $ do runEffect $ fromInput input >-> b >-> c 
       performGC 

    mapM_ wait (feeder:workers) 

spawn unboundedがPipes.Concurrentからである最初の行は、それが入力と出力のハンドルを持って「メールボックス」を初期化します。ここでは、我々はあなたがやりたいことができ一つの方法です。最初は私を混乱させましたが、この場合はメッセージを出力に送り、入力からそれらを引き出します。これはgolangのような言語のプッシュプルメッセージチャネルに似ています。

Bufferと指定して、保存できるメッセージの数を指定します。この場合、無制限で無制限を設定します。

メールボックスが初期化されるように、メッセージを送信するEffectを作成できるようになりました。メールボックスチャネルはSTMを使用して実装されているので、メッセージを非同期で収集する方法です。

メールボックスにフィードする非同期ジョブを作成しましょう。

feeder <- async $ do runEffect $ a >-> toOutput output 
        performGC 

a >-> toOutput outputは普通のパイプ組成物である、我々はパイプに出力を変換するtoOutputを必要としています。 IOの一部でもあるperformGCコールに注意してください。これにより、Pipes.Concurrentは、ジョブの完了後にクリーンアップを行うことができます。私たちが好きな場合はforkIOを使ってこれを実行できますが、この場合は後で結果が終わるのを待つことができるようにasyncを使います。さて、私たちのメールボックスは非同期にメッセージを受信する必要がありますので、メッセージを受信して​​、それらを取り出していくつかの作業をしましょう。

workers <- forM [1..3] $ \i -> 
    async $ do runEffect $ fromInput input >-> b >-> c 
      performGC 

これまでの考え方と同じですが、今回はちょうどその一部を産んでいます。私たちは通常のパイプのように入力から読み取ってfromInputを使用してから、チェーンの残りの部分を実行して、完了したらクリーンアップします。 inputは、1人の作業者だけが値を受け取るたびに値が引き出されることを保証します。 outputに入るすべてのジョブが完了すると(開いているすべてのジョブが追跡されます)、inputパイプが閉じられ、作業者が終了します。

ウェブワーカーのシナリオでこれを使用している場合は、toOutput outputチャンネルにリクエストを送信し続け、好きなだけ多くのワーカーを投稿して、fromInput inputからパイプラインにプルするメインループがあります。

関連する問題