Ticket #4885: udpcount.hs

File udpcount.hs, 2.6 KB (added by zukerman, 3 years ago)
Line 
1{-# LANGUAGE DeriveDataTypeable #-}
2{-# LANGUAGE ScopedTypeVariables #-}
3
4import Control.Concurrent (forkIO, threadDelay)
5import Control.Concurrent.MVar
6import Control.Exception (handle, IOException)
7import Control.Monad
8import Data.Char (ord)
9import Data.Ratio
10import Data.Time.Clock.POSIX (getPOSIXTime)
11import Foreign.Marshal.Alloc (mallocBytes)
12import Network.Socket
13import System.Console.CmdArgs
14import System.IO
15import System.IO.Unsafe
16
17import qualified Data.ByteString as B
18import qualified Data.ByteString.Unsafe as U
19
20--import qualified Data.ByteString as B
21
22buffSize = 65000 :: Int
23
24data UdpCount = UdpCount { port :: Int
25                         } deriving (Show, Data, Typeable)
26
27udpCount = cmdArgsMode $ UdpCount
28  { port = def &= argPos 0 &= typ "PORT"
29  } &= summary "udp v0.1 (c) Itai Zukerman 2011"
30 
31counts = unsafePerformIO $ newMVar (0, 0) :: (MVar (Int, Int))
32timestamp = unsafePerformIO $ newMVar B.empty :: (MVar B.ByteString)
33
34space = B.pack [32]
35eol = B.pack [10]
36
37main = do args <- cmdArgsRun udpCount
38          free <- newEmptyMVar
39          full <- newEmptyMVar
40          forkIO updateTime
41          replicateM_ 4 $ forkIO $ workerLoop free full
42          forkIO $ recvLoop free full $ port args
43          forever $ do threadDelay (10^6)
44                       c <- readMVar counts
45                       hPutStrLn stderr $ show c
46
47updateTime = do t <- getPOSIXTime
48                let b = t2b t
49                swapMVar timestamp b
50                threadDelay (10^3)
51                updateTime
52  where
53    t2b t = let i = toRational t
54                ms = (numerator i * 1000) `div` (denominator i)
55            in B.pack $ map (fromIntegral . ord) (show ms)
56
57recvLoop free full port = do sock <- socket AF_INET Datagram 0
58                             bindSocket sock $ SockAddrInet (fromIntegral port) iNADDR_ANY
59                             forever $ do buff <- takeMVar free
60                                          (size, _) <- retryOnFail $ recvBufFrom sock buff buffSize
61                                          putMVar full (size, buff)
62
63retryOnFail action = handle (\(_ :: IOException) -> retryOnFail action) action
64
65workerLoop free full = do buff <- mallocBytes buffSize
66                          putMVar free buff
67                          loop
68  where 
69    loop = do 
70      (size, buff') <- takeMVar full
71      --bs <- U.unsafePackCStringLen (buff', size)
72      bs <- B.packCStringLen (buff', size)
73      ts <- readMVar timestamp
74      let line = B.concat [ts, space, bs, eol]
75      --B.hPutStr stdout line
76      (n, t) <- takeMVar counts
77      putMVar counts $! (n+1, t+size)
78      putMVar free buff'
79      loop