2012-04-04 7 views
4

Shellyライブラリを使用して複数のタスクを並行して実行する簡単なスクリプトを作成していますが、一度に実行するタスクの最大数を制限します。このスクリプトは、各行の入力ファイルを受け取り、その入力に対してタスクを実行します。ファイルには数百の入力があり、一度に約16個のプロセスに制限したいと考えています。QSemはスレッドをブロックしていないようです

現在のスクリプトは、実際に私は4つの入力とテストファイル上で実行したときに、私はこれを見るために私も何かが欠けているように見える1の初期カウントでQSemを使用して(良くしようとする)を1に制限します

 
Starting 
Starting 
Starting 
Starting 
Done 
Done 
Done 
Done 

スレッドがQSem上でブロックしないように、これらはすべて同時に実行されています。私は今まで自分のセマフォーを実装するために、MVarTVarの両方に行ってしまっていましたが、どちらも期待通りに動作しませんでした。私は明らかに根本的な何かを欠いているが何?また、コードをコンパイルしてバイナリとして実行しようとしました。

 
#!/usr/bin/env runhaskell 
{-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-} 

import Shelly 
import Prelude hiding (FilePath) 
import Text.Shakespeare.Text (lt) 
import qualified Data.Text.Lazy as LT 
import Control.Monad (forM) 
import System.Environment (getArgs) 

import qualified Control.Concurrent.QSem as QSem 
import Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar) 

-- Define max number of simultaneous processes 
maxProcesses :: IO QSem.QSem 
maxProcesses = QSem.newQSem 1 

bkGrnd :: ShIO a -> ShIO (MVar a) 
bkGrnd proc = do 
    mvar <- liftIO newEmptyMVar 
    _ <- liftIO $ forkIO $ do 
    -- Block until there are free processes 
    sem <- maxProcesses 
    QSem.waitQSem sem 
    putStrLn "Starting" 
    -- Run the shell command 
    result <- shelly $ silently proc 
    liftIO $ putMVar mvar result 
    putStrLn "Done" 
    -- Signal that this process is done and another can run. 
    QSem.signalQSem sem 
    return mvar 

main :: IO() 
main = shelly $ silently $ do 
    [img, file] <- liftIO $ getArgs 
    contents <- readfile $ fromText $ LT.pack file 
    -- Run a backgrounded process for each line of input. 
    results <- forM (LT.lines contents) $ \line -> bkGrnd $ do 
     runStdin &ltcommand> &ltarguments> 
    liftIO $ mapM_ takeMVar results 
+1

私はShellyについては分かりませんが、あなたのコードからは、 'bkGrnd'のすべてのアプリケーションが独自の新しいセマフォを1に初期化しているようです。最初に作成し、すべての呼び出しに同じものを渡す必要があります。 –

答えて

6

私は私のコメントで言ったように、bkGrndへの各呼び出しは、すべてのスレッドが待機することなく継続することができ、独自のsemaphonreを作成します。私はmainでセマフォが作成され、毎回bkGrndに渡される代わりに、このようなものを試してみます。

bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a) 
bkGrnd sem proc = do 
    mvar <- liftIO newEmptyMVar 
    _ <- liftIO $ forkIO $ do 
    -- Block until there are free processes 
    QSem.waitQSem sem 
    -- 
    -- code continues as before 
    -- 

main :: IO() 
main = shelly $ silently $ do 
    [img, file] <- liftIO $ getArgs 
    contents <- readfile $ fromText $ LT.pack file 
    sem <- maxProcesses 
    -- Run a backgrounded process for each line of input. 
    results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do 
     runStdin <command> <arguments> 
    liftIO $ mapM_ takeMVar results 
+0

うわー、私はばかです。私は決して前にハスケルでグローバルな変更可能なデータを使用しようとしたことはありませんでした。(私が普通好きなものではありませんが、スクリプトですが)問題はあなたが指摘したように明らかです。ありがとう! – asm

+1

@AndrewMyers:心配しないでください。最も簡単な並行処理のエラーでさえ、時々見つけにくいです。 ところで、 'sem'はグローバルではなく、むしろ共有しています。それは 'main'の中で宣言され、共有セマフォの"参照 "としてスレッドに渡されます。 –

+0

ええ、私は私がやろうとしていることを意味しました。私はグローバルセマフォーとして 'maxProcesses'を考えていましたが、毎回新しいセマフォを作成するのはグローバルな' IO QSem'アクションでした。私がzshでやっているやり方をスクリプト化しようとしていなければ、あなたの方法ははるかに洗練されていて、通常どおりにやります。ですから、 'unsafePerformIO'を使わずにグローバルな可変状態を作るのは実際には不可能だと思うのです。もしそうなら、それはかなりクールだけど、私が以前に気づいたものではない。 – asm

4

あなたは答えを持っていますが、私は追加する必要があります。killThreadまたは非同期スレッド死亡が可能な場合QSemとQSemNはスレッドセーフではありません。

私のバグレポートとパッチはGHC trac ticket #3160です。固定コードは、Control.Concurrent.MSem、MSemN、MSampleVar、およびボーナスFairRWLockモジュールを使用してSafeSemaphoreという新しいライブラリとして利用できます。あなたがセマフォを取得するまで

+0

QSemへのアップデートとして7.0.1にそれをマージする計画があったときにメーリングリストの議論の一部を見ました。私はTracのチケットからそれが起こっていないことを知っているので、私は安全なパッケージをチェックアウトします。ヒントをありがとう! – asm

0

はそれが

bkGrnd sem proc = do 
    QSem.waitQSem sem 
    mvar <- liftIO newEmptyMVar 
    _ <- liftIO $ forkIO $ do 
    ... 

より良いそうもない forkIOはありませんか?

関連する問題