2012-02-20 12 views
2

マップとPSQに書き込むメインスレッドがあります。マップとPSQの両方で同じキーを使用するので、PSQを見ると、最小優先度を持つエントリはO(1)の複雑さで見つけられ、Mapの値にマッピングされます。共有可変状態:IORefsを使用する場合

私の主なスレッドは、必要に応じてMapとPSQの両方を追加/変更しますが、常に2番目のスレッドがあります(forever $ do)は、PSQを見て最も古いキーがN ms前になったと判断し、それをフラッシュする。

これが起こるためには、両方のスレッドが同じ可変データを参照する必要があります。状態を維持するための最良の方法は何ですか?これはIOREfsのケースでしょうか?これを解決する他の方法はありますか?

ここでは「いくつかの」プレアルファコード:

import Data.Time 
import Data.Functor 
import Data.Time.Clock.POSIX 
import qualified Data.PSQueue as PSQ 
import qualified Data.Map as Map 
import Data.Maybe 
import Control.Concurrent 
import Control.Concurrent.MVar 
import Control.Monad 
import Network.Socket hiding (send, sendTo, recv, recvFrom) 
import Network.Socket.ByteString 
import qualified Data.ByteString.Char8 as B 

--PSQ = (host, PID) POSIXTime 
--where the tuple is k and POSIXTime is p 

--Map is (host, PortNumber) [messages] 
--where the tuple is the key and [messages] is a list of messages 

key = ("192.168.1.1", 4711) 
messages = ["aaa", "bbbb", "ccccc"] 

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
newRq = do 
     time <- getPOSIXTime 
     let q = PSQ.singleton key time 
     let m = Map.singleton key messages 
     return (q, m) 

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String] 
appendMsg newmsgs (host, port) m = 
     let Just messages' = Map.lookup (host,port) m 
      l = length . concat $ messages' 
      l' = l + length newmsgs 
     in 
     if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m 

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
insertNewRec (a,b) c q m = do 
     time <- getPOSIXTime 
     let q1 = PSQ.insert (a,b) time q 
     let m1 = Map.insert (a,b) c m 
     return (q1, m1) 

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO() 
sendq s datastring host port = do 
     hostAddr <- inet_addr host 
     sendAllTo s datastring (SockAddrInet port hostAddr) 
     return() 

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
deleteRec (host, port) q m = (q', m') 
     where 
     m' = Map.delete (host, port) m 
     q' = PSQ.delete (host, port) q 

loopMyQ q m1 done = forever $ do 
     let Just m = PSQ.findMin q 
     let time = (PSQ.prio m) + 0.200 --adds 200ms 
     now <- getPOSIXTime 
     if now < time 
     then print (m1) 
     --here eventually I would call the send function to flush the queue 
     else putMVar done() 

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
sendrecv s q1 m1 msg = do 
    let m2 = appendMsg msg key m1 
     (q3, m3) = case m2 of 
        val | m2 == m1 -> deleteRec key q1 m1 
         | otherwise -> (q1, m2) 
    (q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3 
             return (q4, m4)) else return (q1, m2) 
    when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711) 
    return (q5, m5) 

--main :: IO() 
main = withSocketsDo $ do 
    s <- socket AF_INET Datagram defaultProtocol 
    (q1, m1) <- newRq 
    done <- newEmptyMVar 
    forkIO $ loopMyQ q1 m1 done 
    (q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000] 
    takeMVar done 
    --print ("longer than 200ms ago") 

答えて

6

あなたが最も可能性が高いスレッド間で一貫性のある状態を維持するためにMVarsまたはTVarsを使用したいです。 IORefはスレッドセーフではありません。

この問題は、STM(およびTVars)を使用することをお勧めします。あなたは複数のデータ構造への同時アクセスを扱っています.MMarsのロック順序について考えるよりも、STMの構成は扱いがずっと簡単です。

コードを確認した後、TVarsが最善の策になるようです。あなたのPSQとマップを2つの異なるTVarで包みなさい。 atomicallyトランザクションで、両方の一貫性のあるビューを必要とするすべてのコードをラップします。ほとんどの場合、あなたのコードは「うまくいく」でしょう。ただし、ロックの競合がある場合、アトムブロックは動作するまで再試行されます。

+0

申し訳ありませんが、あなたの問題を完全に理解する必要はありませんが、データの一貫性のあるビューを常に維持することを検討している場合は、STMが最適です。 –

関連する問題