Ticket #3926: Pi.hs

File Pi.hs, 7.8 KB (added by mlesniak, 5 years ago)

Example code that shows (sometimes) the non-termination

Line 
1-------------------------------------------------------------------------------
2-- minimal example of a much bigger program (which is my excuse for strange
3-- looking code)
4--
5-- compile with
6--
7--   ghc --make -O2 -threaded Pi.hs -o pi -rtsopts -fforce-recomp
8--
9-- and execute it (for example, on a 2 core system) with
10--
11--   pi +RTS -N2
12--
13-- For inifite runs, try
14--
15--   i=0; while true;do printf "%4d\n" $((i=$i+1));pi +RTS -N2;done
16--
17-- on a system with bash.
18-------------------------------------------------------------------------------
19
20{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses, FlexibleInstances#-}
21module Main where
22import Control.Monad
23import Control.Concurrent
24import Control.Parallel.Strategies
25import System.Environment
26import Data.Array.IO
27import Control.Concurrent.STM
28import System.Posix.Signals
29import Data.Maybe
30import GHC.Conc
31import System.IO
32import System.IO.Unsafe
33import qualified Data.Set as Set
34import Data.Set (Set)
35import Data.Time.Format
36import System.Locale
37import Data.Time
38import Data.Time.Clock.POSIX
39import Text.Printf
40
41
42main :: IO ()
43main = do
44    installLogger
45    pool <- newSTMPQueue 16
46    let tasks = replicate 64 1
47    mapM_ (putSTMPQueue pool) tasks
48    ids <- forM [0..numCapabilities-1] (\n -> forkIO $ thread n calcPi pool)
49    waitSTMPQueue pool
50    debug "__________________________________"
51    writeLog_ True
52    putStrLn "SHOULD EXIT"
53
54
55thread n f pool = do
56    task <- getSTMPQueue n pool
57    case task of
58        Nothing -> return ()
59        Just t  -> do 
60            f pool t
61            thread n f pool
62
63
64--- pi calculation functions for number crunching ----------------------------
65-- doing nothing to allow compilcation with GHC HEAD!!! the bug still occurs.
66
67-- calcPiPure :: Int -> Int
68-- calcPiPure digits = showCReal (fromEnum digits) pi `pseq` 1
69
70
71calcPi :: t -> Int -> IO ()
72calcPi _ digits = do
73    debug $ "T calc " ++ show digits
74    --calcPiPure digits `pseq` return ()
75    debug $ "T finished"
76   
77
78
79--- STM based global queue with an additional private part ------------------
80data Show a => STMPQueue a = STMPQueue {
81      stmChan     :: STMCQueue a
82    , stmState    :: TVar STMState
83    , stmFinished :: TChan ()
84    , stmWorking  :: TVar (Set ThreadId)
85
86    -- for the private queue
87    , stmPrivate  :: TArray (Int,Int) a     -- currently a bit slow...
88    , stmIndex    :: TArray Int (Int, Int)
89    , stmSize     :: Int
90}
91
92data STMState = 
93      SPut
94    | SWait
95    deriving (Show, Eq)
96
97
98newSTMPQueue :: Show a => Int -> IO (STMPQueue a)
99newSTMPQueue size = do
100    chan     <- newSTMCQueue
101    state    <- newTVarIO SPut
102    finished <- newTChanIO
103    working  <- newTVarIO Set.empty
104   
105    (private, index) <- atomically $ do
106        private <- newArray_ ((0,0), (numCapabilities-1,size-1))
107        index   <- newArray (0,numCapabilities-1) (0,0)
108        return (private,index)
109    return $ STMPQueue chan state finished working private index size
110
111
112putSTMPQueue :: Show a => STMPQueue a -> a -> IO ()
113putSTMPQueue (STMPQueue chan state finished working _ _ _) a = do
114    atomically $ writeSTMCQueue chan a
115
116
117waitSTMPQueue pool@(STMPQueue chan state finished working _ _ _) = do 
118    atomically $ writeTVar state SWait 
119    atomically $ do
120        work  <- readTVar working
121        empty <- isEmptySTMCQueue chan
122        check (Set.null work && empty)
123        writeTVar state SPut
124    return ()
125
126
127getSTMPQueue :: Show a => Int -> STMPQueue a -> IO (Maybe a)
128getSTMPQueue idx pool@(STMPQueue chan state finished working private index
129  size) = do
130    (curidx,midx) <- atomically $ readArray index idx
131    if curidx == midx
132        then do
133            debug "private queue empty"
134            loop
135        else do task <- atomically $ readArray private (idx, curidx)
136                atomically $ writeArray index idx (curidx+1,midx)
137                return (Just task)
138
139  where loop = do
140            tid <- myThreadId
141
142            atomically $ do
143                work  <- Set.delete tid `fmap` readTVar working
144                writeTVar working $! work
145           
146            atomically $ do
147                empty <- isEmptySTMCQueue chan
148                work  <- readTVar working
149                op    <- readTVar state
150
151                if (not empty) 
152                    then do 
153                        a@(task:rest) <- readSTMCQueue chan (size+1)
154                        unsafeIOToSTM $ debug $ "my tasks: " ++ show a
155                        forM_ (zip [0..] rest) $ \(col,v) -> 
156                            writeArray private (idx,col) v
157                        writeArray index idx $! (0,length rest)
158                        writeTVar working $! (Set.insert tid work)
159                        return (Just task)
160                    else do
161                        case op of
162                            SPut  -> retry
163                            SWait -> do
164                                if Set.null work
165                                    then return Nothing
166                                    else retry
167
168
169--- For debugging output ----------------------------------------------------
170debugChan :: Chan String
171debugChan = unsafePerformIO newChan
172
173debugTime :: MVar UTCTime
174debugTime = unsafePerformIO (newMVar =<< getCurrentTime)
175
176debug :: String -> IO ()
177debug msg = writeChan debugChan =<< debugStr msg
178
179debug_ :: String -> IO ()
180debug_ msg = do
181    s <- debugStr msg
182    hPutStrLn stderr s
183    debug msg
184
185debugStr :: String -> IO String
186debugStr msg = do
187    tid  <- (drop 9 . show) `fmap` myThreadId
188    t    <- getCurrentTime
189    told <- swapMVar debugTime t
190    let td = diffUTCTime t told
191        ts = init $ show td
192        tl = if read ts < 0.0
193                 then "0.000000  "
194                 else printf "%-10s" ts
195    return $ tl ++ " " ++ tid ++ " " ++ msg
196
197
198writeLog :: IO ()
199writeLog = writeLog_ False
200
201
202-- check, if an environment variable LOG exists.
203writeLog_ :: Bool -> IO ()
204writeLog_ checkEnv = do
205     env <- map fst `fmap` getEnvironment
206     when (checkEnv || ("LOG" `elem` env)) showLog
207  where showLog = do
208            e <- isEmptyChan debugChan
209            unless e $ do
210                value <- readChan debugChan
211                putStrLn value
212                showLog
213
214numEnv :: String -> IO Int
215numEnv ss = do
216     (read . fromJust . lookup ss) `fmap` getEnvironment
217   
218
219whenEnv :: String -> IO () -> IO ()
220whenEnv ss f = do
221     env <- map fst `fmap` getEnvironment
222     when (ss `elem` env) f
223   
224
225installLogger :: IO ()
226installLogger = do
227    installHandler sigINT (CatchOnce handleInt) Nothing
228    return ()
229  where handleInt = do
230            putStrLn "" -- prevent that ^C is displayed on 1st log line
231            debug "SIGINT caught."
232            writeLog_ True
233            raiseSignal sigINT
234
235
236--- Counting Chan using STM -------------------------------------------------
237data STMCQueue a = STMCQueue {
238      scqList :: TChan a
239    , scqSize :: TVar Int
240}
241
242
243newSTMCQueue :: IO (STMCQueue a) 
244newSTMCQueue = do
245    list <- newTChanIO
246    size <- newTVarIO 0
247    return (STMCQueue list size)
248
249
250isEmptySTMCQueue :: STMCQueue a -> STM Bool 
251isEmptySTMCQueue (STMCQueue list size) = do
252    ioSize <- readTVar size
253    return $! (ioSize == 0)
254
255
256writeSTMCQueue :: STMCQueue a -> a -> STM () 
257writeSTMCQueue (STMCQueue list size) value = do
258    ioSize <- readTVar size
259    writeTChan list value
260    writeTVar size $! (ioSize + 1)
261
262
263writeList2STMCQueue :: STMCQueue a -> [a] -> STM () 
264writeList2STMCQueue (STMCQueue list size) values = do
265    ioSize <- readTVar size
266    mapM_ (writeTChan list) values
267    writeTVar size $! (ioSize + length values)
268
269
270readSTMCQueue :: STMCQueue a -> Int -> STM [a] 
271-- return min(n,size) elements from list
272readSTMCQueue (STMCQueue list size) n = do
273    ioSize <- readTVar size
274    unsafeIOToSTM $ debug $ "# queue: " ++ show ioSize
275    let taken = min ioSize n
276    values <- replicateM taken (readTChan list) 
277    check (length values == taken)
278    writeTVar size (ioSize - taken)
279    return values
280
281