マップとPSQに書き込むメインスレッドがあります。マップとPSQの両方で同じキーを使用するので、PSQを見ると、最小優先度を持つエントリはO(1)の複雑さで見つけられ、Mapの値にマッピングされます。共有可変状態:IORefsを使用する場合
私の主なスレッドは、必要に応じてMapとPSQの両方を追加/変更しますが、常に2番目のスレッドがあります(forever $ do
)は、PSQを見て最も古いキーがN ms前になったと判断し、それをフラッシュする。
これが起こるためには、両方のスレッドが同じ可変データを参照する必要があります。状態を維持するための最良の方法は何ですか?これはIOREfsのケースでしょうか?これを解決する他の方法はありますか?
ここでは「いくつかの」プレアルファコード:
import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B
--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p
--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages
key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]
newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
time <- getPOSIXTime
let q = PSQ.singleton key time
let m = Map.singleton key messages
return (q, m)
appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
let Just messages' = Map.lookup (host,port) m
l = length . concat $ messages'
l' = l + length newmsgs
in
if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m
insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
time <- getPOSIXTime
let q1 = PSQ.insert (a,b) time q
let m1 = Map.insert (a,b) c m
return (q1, m1)
sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO()
sendq s datastring host port = do
hostAddr <- inet_addr host
sendAllTo s datastring (SockAddrInet port hostAddr)
return()
deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
where
m' = Map.delete (host, port) m
q' = PSQ.delete (host, port) q
loopMyQ q m1 done = forever $ do
let Just m = PSQ.findMin q
let time = (PSQ.prio m) + 0.200 --adds 200ms
now <- getPOSIXTime
if now < time
then print (m1)
--here eventually I would call the send function to flush the queue
else putMVar done()
sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
sendrecv s q1 m1 msg = do
let m2 = appendMsg msg key m1
(q3, m3) = case m2 of
val | m2 == m1 -> deleteRec key q1 m1
| otherwise -> (q1, m2)
(q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
return (q4, m4)) else return (q1, m2)
when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
return (q5, m5)
--main :: IO()
main = withSocketsDo $ do
s <- socket AF_INET Datagram defaultProtocol
(q1, m1) <- newRq
done <- newEmptyMVar
forkIO $ loopMyQ q1 m1 done
(q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
takeMVar done
--print ("longer than 200ms ago")
申し訳ありませんが、あなたの問題を完全に理解する必要はありませんが、データの一貫性のあるビューを常に維持することを検討している場合は、STMが最適です。 –