2014-01-06 11 views
7

conduitnetwork-conduit、およびstm-conduitを使用して、小さな受信サーバーを実装するコードを次に示します。ソケット上のデータを受信し、STMチャネルを介してメインスレッドにストリームします。コンジットとソケット:複数の接続を許可

import Control.Concurrent (forkIO) 
import Control.Concurrent.STM (atomically) 
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan()) 
import Control.Monad (void) 
import Control.Monad.IO.Class (MonadIO (liftIO)) 
import Control.Monad.Trans.Class 

import Data.ByteString (ByteString) 
import qualified Data.ByteString as B 
import Data.Conduit 
import qualified Data.Conduit.Binary as DCB 
import Data.Conduit.Extra.Resumable 
import Data.Conduit.Network (sourceSocket) 
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources) 

import System.Directory (removeFile) 
import System.IO 

type BSChan = TBMChan ByteString 

listenSocket :: Socket -> Int -> IO BSChan 
listenSocket soc bufSize = do 
    chan <- atomically $ newTBMChan bufSize 
    forkListener chan 
    return chan 
    where 
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
     loop = do 
     (conn, _) <- accept soc 
     sourceSocket conn $$ sinkTBMChan chan 
     close conn 
     loop 

main :: IO() 
main = do 
    soc <- socket AF_UNIX Stream 0 
    bind soc (SockAddrUnix "mysock") 
    socChan <- listenSocket soc 8 
    sourceTBMChan socChan $$ DCB.sinkHandle stdout 
    removeFile "mysock" 

(実際のアプリケーションでは、ソケットからのデータのストリームは、私がリスナーに直接それを処理していない理由である、いくつかの他の人と合併します)。

問題は、メインスレッドが終了するまでこれを開いたままにしておいた場合、最初のメッセージがソケットで受信された後で終了するという問題です。シンク(2行目から最後の行にある)がデータの最初のストリームの終わりを見たら終了していない限り、私はそれがなぜこれを行うのか分かりません。これをしないよう説得することはできますか? Conduitには、ソースを再開可能にすることについてのものがありますが、シンクはありません。

+2

将来の質問については、コードが実際にコンパイルされるようにすべてのインポートも含めてください。ソリューションをより簡単にテストできます。 – shang

+0

ここでの実装のコンジットの側面とは無関係な軽微なコメント:ここでの実装は、着信接続ごとに専用のワーカースレッドを持つ代わりに、接続を一度に1つずつ受け入れます。それは意図的なのでしょうか? –

+0

@shang - fair point、私は輸入品で更新しました。要点を追加してそれにリンクすることを意味しますが、私はそれを忘れました! – Impredicative

答えて

6

sinkTBMChanのdocumentionから:シンクが閉じているとき

、チャネルが近すぎます。

最初のソケットハンドルが閉じたときに、それがシンクを停止sinkHandleに伝播順番にTBMChanを閉じ、接続シンクを閉じる、閉じるためにsourceSocketからSourceの原因となります。

これを解決する最も簡単な方法は、loopを接続を閉じずにそのソースをTBMChanに接続するカスタムソースに変更することです。チャンネルからの作家と読者のシャットダウンの調整

listenSocket :: Socket -> Int -> IO BSChan 
listenSocket soc bufSize = do 
    chan <- atomically $ newTBMChan bufSize 
    forkListener chan 
    return chan 
    where 
    forkListener chan = void . forkIO $ do 
     listen soc 2 
     loop $$ sinkTBMChan chan 

    loop = do 
     (conn, _) <- liftIO $ accept soc 
     sourceSocket conn 
     liftIO $ close conn 
     loop 
+0

はい、これは私がやったことです(下記参照)。私は 'network-conduit'を完全に削除し、接続を閉じることのないソースを実装しました。 – Impredicative

1

ここでは、再開可能なシンクの作成を含まない1つの回答です。 network-conduitsourceSocketは、単一の接続を許可しますが、我々はsourceSocketの内側に再接続動作を実装することができます(コードについて謝罪、私はそれがクリーニングが必要だと思いますが、少なくともそれが動作!):ここ

sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString 
sourceSocket sock = 
    loop 
    where 
    loop = do 
     (conn, _) <- lift . liftIO $ accept sock 
     loop' conn 
     lift . liftIO $ close conn 
     loop 
    loop' conn = do 
     bs <- lift . liftIO $ recv conn 4096 
     if B.null bs 
     then return() 
     else yield bs >> loop' conn 

一つの問題は、ということですこれは決して終了しません(プログラムが終了するまで)。ソケットはプログラムの寿命を聞いたままにしておかなければならないので、これは私の使用例では問題ではありません。

4

は非自明な問題ですが、pipes-concurrencyライブラリを使用することでこれを解決するpipes生態系、からソリューションを再利用することができます。このライブラリはいくつかのpipes - 独立したユーティリティを提供しています。これは、読み取り側と書き込み側の間で通信するためにconduitライブラリと再利用できるため、各側が自動的に正しいタイミングを自動的に正確に知り、手動でも片側をきれいにすることができます。

pipes-concurrencyライブラリから使用するキー機能は、spawnです。そのタイプは次のとおりです。

Bufferは、使用する基本的なSTMチャネル抽象化を指定します。

spawn (Bounded 8) :: IO (Output a, Input a) 

aは、このケースで何もすることができ、それはByteStringすることができ、例えば:あなたのコード例から判断すると、それはあなたがBoundedバッファをしたいように聞こえる

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString) 

InputOutputはメールボックスのように動作します。あなたはOutput秒にsend INGデータによってメールボックスにメッセージを追加すると、あなたはInput sからデータをINGのrecvによって(FIFO順)メールボックスのうち、メッセージを取る:

-- Returns `False` if the mailbox is sealed 
send :: Output a -> a -> STM Bool 

-- Returns `Nothing` if the mailbox is sealed 
recv :: Input a -> STM (Maybe a) 

pipes-concurrencyの巧妙な機能があることですメールボックスにリーダがないかライタがない場合、ガベージコレクタがメールボックスを自動的に封印するように指示します。これにより、デッドロックの共通ソースが回避されます。

エコシステムpipesを使用している場合は、通常、次の2つの上位レベルのユーティリティを使用してメールボックスの読み取りと書き込みを行います。

main :: IO() 
main = do 
    soc <- socket AF_UNIX Stream 0 
    bind soc (SockAddrUnix "mysock") 
    (output, input) <- spawn (Bounded 8) 
    forkIO $ readFromSocket soc $$ toOutput output 
    fromInput input $$ DCB.sinkHandle stdout 
    removeFile "mysock" 
:次に、あなたの主な機能は次のようなもののようになります

import Control.Monad.Trans.Class (lift) 
import Data.Conduit 
import Pipes.Concurrent 

toOutput' :: Output a -> Sink a IO() 
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a) 

fromInput' :: Input a -> Source IO a 
fromInput' i = do 
    ma <- lift $ atomically $ recv i 
    case ma of 
     Nothing -> return() 
     Just a -> do 
      yield a 
      fromInput' i 

:コア機械がpipesあるので

-- Stream values into the mailbox until it is sealed 
toOutput :: Output a -> Consumer a IO() 

-- Stream values from the mailbox until it is sealed 
fromInput :: Input a -> Producer a IO() 

ただし、同等のこれらの関数のconduitバージョンを書き換えることができます非依存性

... はSocketから読み取るSourceとなります。

他のデータソースを使用してoutputに自由に書き込むことができます。完了したら、inputまたはoutputを適切に処分することを心配する必要はありません。

pipes-concurrencyについて詳しくは、official tutorialをお読みください。

+0

これはありがとうございます、物事について行くより潜在的に良い方法のように見えます。私はその時点で問題を解決することに成功しましたが、少し後にチュートリアルを掘り下げていきます。 – Impredicative

1

私は@ shangの答えは正しいと思う、私はちょっと遠くに行き、writeTBMChanの動作がここでより良い原因のように見えると思います。 TBMChanを自動的に閉じないように変更することをお勧めします。

sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan 

これをプログラムで使用すると、期待通りに動作します。