Ticket #3787: Trampoline.2.hs

File Trampoline.2.hs, 26.5 KB (added by blamario, 4 years ago)

A newer version of the module, with a more thorough panic.

Line 
1{-
2    Copyright 2009 Mario Blazevic
3
4    This file is part of the Streaming Component Combinators (SCC) project.
5
6    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
7    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
8    version.
9
10    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
11    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
12
13    You should have received a copy of the GNU General Public License along with SCC.  If not, see
14    <http://www.gnu.org/licenses/>.
15-}
16
17-- | Module "Trampoline" defines the trampoline computations and their basic building blocks.
18
19{-# LANGUAGE ScopedTypeVariables, Rank2Types, MultiParamTypeClasses, TypeFamilies, KindSignatures,
20             FlexibleContexts, FlexibleInstances, OverlappingInstances, UndecidableInstances
21 #-}
22
23module Control.Concurrent.SCC.Trampoline where
24
25import Control.Concurrent (forkIO)
26import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
27import Control.Monad (liftM, liftM2, when)
28import Control.Monad.Identity
29import Control.Monad.Trans (MonadTrans(..))
30import Control.Parallel (par, pseq)
31
32import Data.Foldable (toList)
33import Data.Maybe (maybe)
34import Data.Sequence (Seq, viewl)
35
36-- | Class of monads that can perform two computations in parallel.
37class Monad m => ParallelizableMonad m where
38   -- | Combine two computations into a single parallel computation. Default implementation of `parallelize` is
39   -- @liftM2 (,)@
40   parallelize :: m a -> m b -> m (a, b)
41   parallelize = liftM2 (,)
42
43-- | Any monad that allows the result value to be extracted, such as `Identity` or `Maybe` monad, can implement
44-- `parallelize` by using `par`.
45instance ParallelizableMonad Identity where
46   parallelize ma mb = let a = runIdentity ma
47                           b = runIdentity mb
48                       in  a `par` (b `pseq` a `pseq` Identity (a, b))
49
50instance ParallelizableMonad Maybe where
51   parallelize ma mb = case ma `par` (mb `pseq` (ma, mb))
52                       of (Just a, Just b) -> Just (a, b)
53                          _ -> Nothing
54
55-- | IO is parallelizable by `forkIO`.
56instance ParallelizableMonad IO where
57   parallelize ma mb = do va <- newEmptyMVar
58                          vb <- newEmptyMVar
59                          forkIO (ma >>= putMVar va)
60                          forkIO (mb >>= putMVar vb)
61                          a <- takeMVar va
62                          b <- takeMVar vb
63                          return (a, b)
64
65-- | Suspending monadic computations.
66newtype Trampoline s m r = Trampoline {
67   -- | Run the next step of a `Trampoline` computation.
68   bounce :: m (TrampolineState s m r)
69   }
70
71data TrampolineState s m r =
72   -- | Trampoline computation is finished with final value /r/.
73   Done r
74   -- | Computation is suspended, its remainder is embedded in the functor /s/.
75 | Suspend! (s (Trampoline s m r))
76
77instance (Functor s, Monad m) => Monad (Trampoline s m) where
78   return x = Trampoline (return (Done x))
79   t >>= f = Trampoline (bounce t >>= apply f)
80      where apply f (Done x) = bounce (f x)
81            apply f (Suspend s) = return (Suspend (fmap (>>= f) s))
82
83instance (Functor s, ParallelizableMonad m) => ParallelizableMonad (Trampoline s m) where
84   parallelize t1 t2 = Trampoline $ liftM combine $ parallelize (bounce t1) (bounce t2) where
85      combine (Done x, Done y) = Done (x, y)
86      combine (Suspend s, Done y) = Suspend (fmap (liftM $ \x-> (x, y)) s)
87      combine (Done x, Suspend s) = Suspend (fmap (liftM $ (,) x) s)
88      combine (Suspend s1, Suspend s2) = Suspend (fmap (parallelize $ suspend s1) s2)
89
90instance Functor s => MonadTrans (Trampoline s) where
91   lift = Trampoline . liftM Done
92
93data Yield x y = Yield! x y
94instance Functor (Yield x) where
95   fmap f (Yield x y) = Yield x (f y)
96
97data Await x y = Await! (x -> y)
98instance Functor (Await x) where
99   fmap f (Await g) = Await (f . g)
100
101data EitherFunctor l r x = LeftF (l x) | RightF (r x)
102instance (Functor l, Functor r) => Functor (EitherFunctor l r) where
103   fmap f (LeftF l) = LeftF (fmap f l)
104   fmap f (RightF r) = RightF (fmap f r)
105
106newtype NestedFunctor l r x = NestedFunctor (l (r x))
107instance (Functor l, Functor r) => Functor (NestedFunctor l r) where
108   fmap f (NestedFunctor lr) = NestedFunctor ((fmap . fmap) f lr)
109
110data SomeFunctor l r x = LeftSome (l x) | RightSome (r x) | Both (NestedFunctor l r x)
111instance (Functor l, Functor r) => Functor (SomeFunctor l r) where
112   fmap f (LeftSome l) = LeftSome (fmap f l)
113   fmap f (RightSome r) = RightSome (fmap f r)
114   fmap f (Both lr) = Both (fmap f lr)
115
116type TryYield x = EitherFunctor (Yield x) (Await Bool)
117
118suspend :: (Monad m, Functor s) => s (Trampoline s m x) -> Trampoline s m x
119suspend s = Trampoline (return (Suspend s))
120
121yield :: forall m x. Monad m => x -> Trampoline (Yield x) m ()
122yield x = suspend (Yield x (return ()))
123
124await :: forall m x. Monad m => Trampoline (Await x) m x
125await = suspend (Await return)
126
127tryYield :: forall m x. Monad m => x -> Trampoline (TryYield x) m Bool
128tryYield x = suspend (LeftF (Yield x (suspend (RightF (Await return)))))
129
130canYield :: forall m x. Monad m => Trampoline (TryYield x) m Bool
131canYield = suspend (RightF (Await return))
132
133fromTrampoline :: Monad m => Trampoline s m x -> m x
134fromTrampoline t = bounce t >>= \(Done x)-> return x
135
136runTrampoline :: Monad m => Trampoline Identity m x -> m x
137runTrampoline = fromTrampoline
138
139pogoStick :: (Functor s, Monad m) => (s (Trampoline s m x) -> Trampoline s m x) -> Trampoline s m x -> m x
140pogoStick reveal t = bounce t
141                     >>= \s-> case s
142                              of Done result -> return result
143                                 Suspend c -> pogoStick reveal (reveal c)
144
145pogoStickNested :: (Functor s1, Functor s2, Monad m) => 
146                   (s2 (Trampoline (EitherFunctor s1 s2) m x) -> Trampoline (EitherFunctor s1 s2) m x)
147                   -> Trampoline (EitherFunctor s1 s2) m x -> Trampoline s1 m x
148pogoStickNested reveal t = 
149   Trampoline{bounce= bounce t
150                      >>= \s-> case s
151                               of Done result -> return (Done result)
152                                  Suspend (LeftF s) -> return (Suspend (fmap (pogoStickNested reveal) s))
153                                  Suspend (RightF c) -> bounce (pogoStickNested reveal (reveal c))
154             }
155
156nest :: (Functor a, Functor b) => a x -> b y -> NestedFunctor a b (x, y)
157nest a b = NestedFunctor $ fmap (\x-> fmap ((,) x) b) a
158
159-- couple :: (Monad m, Functor s1, Functor s2) =>
160--           Trampoline s1 m x -> Trampoline s2 m y -> Trampoline (NestedFunctor s1 s2) m (x, y)
161-- couple t1 t2 = Trampoline{bounce= do ts1 <- bounce t1
162--                                      ts2 <- bounce t2
163--                                      case (ts1, ts2) of (Done x, Done y) -> return $ Done (x, y)
164--                                                         (Suspend s1, Suspend s2) -> return $ Suspend $
165--                                                                                     fmap (uncurry couple) (nest s1 s2)
166--                          }
167
168coupleAlternating :: (Monad m, Functor s1, Functor s2) => 
169                     Trampoline s1 m x -> Trampoline s2 m y -> Trampoline (SomeFunctor s1 s2) m (x, y)
170coupleAlternating t1 t2 = 
171   Trampoline{bounce= do ts1 <- bounce t1
172                         ts2 <- bounce t2
173                         case (ts1, ts2) of (Done x, Done y) -> return $ Done (x, y)
174                                            (Suspend s1, Suspend s2) ->
175                                               return $ Suspend $ fmap (uncurry coupleAlternating) (Both $ nest s1 s2)
176                                            (Done x, Suspend s2) ->
177                                               return $ Suspend $ fmap (coupleAlternating (return x)) (RightSome s2)
178                                            (Suspend s1, Done y) ->
179                                               return $ Suspend $ fmap (flip coupleAlternating (return y)) (LeftSome s1)
180             }
181
182coupleParallel :: (ParallelizableMonad m, Functor s1, Functor s2) => 
183                  Trampoline s1 m x -> Trampoline s2 m y -> Trampoline (SomeFunctor s1 s2) m (x, y)
184coupleParallel t1 t2 = 
185   Trampoline{bounce= parallelize (bounce t1) (bounce t2)
186                      >>= \pair-> case pair
187                                  of (Done x, Done y) -> return $ Done (x, y)
188                                     (Suspend s1, Suspend s2) ->
189                                        return $ Suspend $ fmap (uncurry coupleParallel) (Both $ nest s1 s2)
190                                     (Done x, Suspend s2) ->
191                                        return $ Suspend $ fmap (coupleParallel (return x)) (RightSome s2)
192                                     (Suspend s1, Done y) ->
193                                        return $ Suspend $ fmap (flip coupleParallel (return y)) (LeftSome s1)
194             }
195
196coupleNested :: (Monad m, Functor s0, Functor s1, Functor s2) => 
197                Trampoline (EitherFunctor s0 s1) m x -> Trampoline (EitherFunctor s0 s2) m y -> 
198                Trampoline (EitherFunctor s0 (SomeFunctor s1 s2)) m (x, y)
199coupleNested t1 t2 = 
200   Trampoline{bounce= do ts1 <- bounce t1
201                         ts2 <- bounce t2
202                         case (ts1, ts2) of (Done x, Done y) -> return $ Done (x, y)
203                                            (Suspend (RightF s), Done y) -> 
204                                               return $ Suspend $ RightF $ fmap (flip coupleNested (return y)) (LeftSome s)
205                                            (Done x, Suspend (RightF s)) -> 
206                                               return $ Suspend $ RightF $ fmap (coupleNested (return x)) (RightSome s)
207                                            (Suspend (RightF s1), Suspend (RightF s2)) ->
208                                               return $ Suspend $ RightF $ fmap (uncurry coupleNested) (Both $ nest s1 s2)
209                                            (Suspend (LeftF s), Done y) ->
210                                               return $ Suspend $ LeftF $ fmap (flip coupleNested (return y)) s
211                                            (Done x, Suspend (LeftF s)) ->
212                                               return $ Suspend $ LeftF $ fmap (coupleNested (return x)) s
213                                            (Suspend (LeftF s1), Suspend (LeftF s2)) -> 
214                                               return $ Suspend $ LeftF $ fmap (coupleNested $ suspend $ LeftF s1) s2
215             }
216
217seesaw :: (Monad m, Functor s1, Functor s2) => 
218           (forall x y s t. (s ~ SomeFunctor s1 s2, t ~ Trampoline s m (x, y)) => s t -> t)
219           -> Trampoline s1 m x -> Trampoline s2 m y -> m (x, y)
220seesaw resolve t1 t2 = pogoStick resolve (coupleAlternating t1 t2)
221
222seesawParallel :: (ParallelizableMonad m, Functor s1, Functor s2) => 
223                  (forall x y s t. (s ~ SomeFunctor s1 s2, t ~ Trampoline s m (x, y)) => s t -> t)
224                  -> Trampoline s1 m x -> Trampoline s2 m y -> m (x, y)
225seesawParallel resolve t1 t2 = pogoStick resolve (coupleParallel t1 t2)
226
227resolveProducerConsumer :: forall a s s0 t t' m x. 
228                           (Functor s0, Monad m, s ~ SomeFunctor (TryYield a) (Await (Maybe a)), 
229                            t ~ Trampoline (EitherFunctor s0 s) m x) => 
230                           s t -> t
231resolveProducerConsumer (LeftSome (LeftF (Yield _ c))) = c
232resolveProducerConsumer (LeftSome (RightF (Await c))) = c False
233resolveProducerConsumer (RightSome (Await c)) = c Nothing
234resolveProducerConsumer (Both (NestedFunctor (LeftF (Yield x (Await c))))) = c (Just x)
235resolveProducerConsumer (Both (NestedFunctor (RightF (Await c)))) = suspend (RightF $ RightSome $ c True)
236
237couplePC :: ParallelizableMonad m => Trampoline (Yield a) m x -> Trampoline (Await (Maybe a)) m y -> m (x, y)
238couplePC t1 t2 = parallelize (bounce t1) (bounce t2)
239                 >>= \(s1, s2)-> case (s1, s2)
240                                 of (Done x, Done y) -> return (x, y)
241                                    (Suspend (Yield x c1), Suspend (Await c2)) -> couplePC c1 (c2 $ Just x)
242                                    (Suspend (Yield _ c1), Done y) -> couplePC c1 (return y)
243                                    (Done x, Suspend (Await c2)) -> couplePC (return x) (c2 Nothing)
244
245coupleFinite :: ParallelizableMonad m => Trampoline (TryYield a) m x -> Trampoline (Await (Maybe a)) m y -> m (x, y)
246coupleFinite t1 t2 =
247   parallelize (bounce t1) (bounce t2)
248   >>= \(s1, s2)-> case (s1, s2)
249                   of (Done x, Done y) -> return (x, y)
250                      (Done x, Suspend (Await c2)) -> coupleFinite (return x) (c2 Nothing)
251                      (Suspend (LeftF (Yield x c1)), Suspend (Await c2)) -> coupleFinite c1 (c2 $ Just x)
252                      (Suspend (LeftF (Yield _ c1)), Done y) -> coupleFinite c1 (return y)
253                      (Suspend (RightF (Await c1)), Suspend s2@Await{}) -> coupleFinite (c1 True) (suspend s2)
254                      (Suspend (RightF (Await c1)), Done y) -> coupleFinite (c1 False) (return y)
255
256coupleFiniteSequential :: Monad m => Trampoline (TryYield a) m x -> Trampoline (Await (Maybe a)) m y -> m (x, y)
257coupleFiniteSequential t1 t2 =
258   bounce t1
259   >>= \s1-> bounce t2
260             >>= \s2-> case (s1, s2)
261                       of (Done x, Done y) -> return (x, y)
262                          (Done x, Suspend (Await c2)) -> coupleFiniteSequential (return x) (c2 Nothing)
263                          (Suspend (LeftF (Yield x c1)), Suspend (Await c2)) -> coupleFiniteSequential c1 (c2 $ Just x)
264                          (Suspend (LeftF (Yield _ c1)), Done y) -> coupleFiniteSequential c1 (return y)
265                          (Suspend (RightF (Await c1)), Suspend s2@Await{}) -> coupleFiniteSequential (c1 True) (suspend s2)
266                          (Suspend (RightF (Await c1)), Done y) -> coupleFiniteSequential (c1 False) (return y)
267
268-- coupleNested :: (Functor s, Monad m) =>
269--                 Trampoline (EitherFunctor s (Yield a)) m x
270--              -> Trampoline (EitherFunctor s (Await (Maybe a))) m y -> Trampoline s m (x, y)
271             
272-- coupleNested t1 t2 =
273--    lift (liftM2 (,) (bounce t1) (bounce t2))
274--    >>= \(s1, s2)-> case (s1, s2)
275--                    of (Done x, Done y) -> return (x, y)
276--                       (Suspend (RightF (Yield _ c1)), Done y) -> coupleNested c1 (return y)
277--                       (Done x, Suspend (RightF (Await c2))) -> coupleNested (return x) (c2 Nothing)
278--                       (Suspend (RightF (Yield x c1)), Suspend (RightF (Await c2))) -> coupleNested c1 (c2 $ Just x)
279--                       (Suspend (LeftF s), Done y) -> suspend (fmap (flip coupleNested (return y)) s)
280--                       (Done x, Suspend (LeftF s)) -> suspend (fmap (coupleNested (return x)) s)
281--                       (Suspend (LeftF s1), Suspend (LeftF s2)) -> suspend (fmap (coupleNested $ suspend $ LeftF s1) s2)
282
283coupleNestedFinite :: (Functor s, ParallelizableMonad m) =>
284                      Trampoline (SinkFunctor s a) m x -> Trampoline (SourceFunctor s a) m y -> Trampoline s m (x, y)
285coupleNestedFinite t1 t2 = lift (parallelize (bounce t1) (bounce t2))
286                           >>= stepCouple coupleNestedFinite
287
288coupleNestedFiniteSequential :: (Functor s, Monad m) =>
289                                Trampoline (SinkFunctor s a) m x
290                             -> Trampoline (SourceFunctor s a) m y
291                             -> Trampoline s m (x, y)
292coupleNestedFiniteSequential producer consumer = 
293   pogoStickNested resolveProducerConsumer (coupleNested producer consumer)
294-- coupleNestedFiniteSequential t1 t2 = lift (liftM2 (,) (bounce t1) (bounce t2))
295--                                      >>= stepCouple coupleNestedFiniteSequential
296
297stepCouple :: (Functor s, Monad m) =>
298              (Trampoline (EitherFunctor s (TryYield a)) m x
299                  -> Trampoline (EitherFunctor s (Await (Maybe a))) m y
300                  -> Trampoline s m (x, y))
301              -> (TrampolineState (EitherFunctor s (TryYield a)) m x,
302                  TrampolineState (EitherFunctor s (Await (Maybe a))) m y)
303              -> Trampoline s m (x, y)
304stepCouple f couple = case couple
305                      of (Done x, Done y) -> return (x, y)
306                         (Done x, Suspend (RightF (Await c2))) -> f (return x) (c2 Nothing)
307                         (Suspend (RightF (LeftF (Yield _ c1))), Done y) -> f c1 (return y)
308                         (Suspend (RightF (LeftF (Yield x c1))), Suspend (RightF (Await c2))) -> f c1 (c2 $ Just x)
309                         (Suspend (RightF (RightF (Await c1))), Suspend s2@(RightF Await{})) -> f (c1 True) (suspend s2)
310                         (Suspend (RightF (RightF (Await c1))), Done y) -> f (c1 False) (return y)
311                         (Suspend (LeftF s), Done y) -> suspend (fmap (flip f (return y)) s)
312                         (Done x, Suspend (LeftF s)) -> suspend (fmap (f (return x)) s)
313                         (Suspend (LeftF s1), Suspend (LeftF s2)) -> suspend (fmap (f $ suspend $ LeftF s1) s2)
314                         (Suspend (LeftF s1), Suspend (RightF s2)) -> suspend (fmap (flip f (suspend $ RightF s2)) s1)
315                         (Suspend (RightF s1), Suspend (LeftF s2)) -> suspend (fmap (f (suspend $ RightF s1)) s2)
316
317local :: forall m l r x. (Functor r, Monad m) => Trampoline r m x -> Trampoline (EitherFunctor l r) m x
318local (Trampoline mr) = Trampoline (liftM inject mr)
319   where inject :: TrampolineState r m x -> TrampolineState (EitherFunctor l r) m x
320         inject (Done x) = Done x
321         inject (Suspend r) = Suspend (RightF $ fmap local r)
322
323out :: forall m l r x. (Functor l, Monad m) => Trampoline l m x -> Trampoline (EitherFunctor l r) m x
324out (Trampoline ml) = Trampoline (liftM inject ml)
325   where inject :: TrampolineState l m x -> TrampolineState (EitherFunctor l r) m x
326         inject (Done x) = Done x
327         inject (Suspend l) = Suspend (LeftF $ fmap out l)
328
329-- | Class of functors that can be lifted.
330class (Functor a, Functor d) => AncestorFunctor a d where
331   -- | Convert the ancestor functor into its descendant. The descendant functor typically contains the ancestor.
332   liftFunctor :: a x -> d x
333
334instance Functor a => AncestorFunctor a a where
335   liftFunctor = id
336instance (Functor a, Functor d', Functor d, d ~ EitherFunctor d' s, AncestorFunctor a d') => AncestorFunctor a d where
337   liftFunctor = LeftF . (liftFunctor :: a x -> d' x)
338
339liftOut :: forall m a d x. (Monad m, Functor a, AncestorFunctor a d) => Trampoline a m x -> Trampoline d m x
340liftOut (Trampoline ma) = Trampoline (liftM inject ma)
341   where inject :: TrampolineState a m x -> TrampolineState d m x
342         inject (Done x) = Done x
343         inject (Suspend a) = Suspend (liftFunctor $ fmap liftOut a)
344
345type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
346type SinkFunctor a x = EitherFunctor a (TryYield x)
347
348-- | A 'Sink' can be used to yield values from any nested `Trampoline` computation whose functor provably descends from
349-- the functor /a/. It's the write-only end of a 'Pipe' communication channel.
350data Sink (m :: * -> *) a x =
351   Sink
352   {
353   -- | Function 'put' tries to put a value into the given `Sink`. The intervening 'Trampoline' computations suspend up
354   -- to the 'pipe' invocation that has created the argument sink. The result of 'put' indicates whether the operation
355   -- succeded.
356   put :: forall d. (AncestorFunctor a d) => x -> Trampoline d m Bool,
357   -- | Function 'canPut' checks if the argument `Sink` accepts values, i.e., whether a 'put' operation would succeed on
358   -- the sink.
359   canPut :: forall d. (AncestorFunctor a d) => Trampoline d m Bool
360   }
361
362-- | A 'Source' can be used to read values into any nested `Trampoline` computation whose functor provably descends from
363-- the functor /a/. It's the read-only end of a 'Pipe' communication channel.
364newtype Source (m :: * -> *) a x =
365   Source
366   {
367   -- | Function 'get' tries to get a value from the given 'Source' argument. The intervening 'Trampoline' computations
368   -- suspend all the way to the 'pipe' function invocation that created the source. The function returns 'Nothing' if
369   -- the argument source is empty.
370   get :: forall d. (AncestorFunctor a d) => Trampoline d m (Maybe x)
371   }
372
373-- | Converts a 'Sink' on the ancestor functor /a/ into a sink on the descendant functor /d/.
374liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
375liftSink s = Sink {put= liftOut . (put s :: x -> Trampoline d m Bool),
376                   canPut= liftOut (canPut s :: Trampoline d m Bool)}
377
378-- | Converts a 'Source' on the ancestor functor /a/ into a source on the descendant functor /d/.
379liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
380liftSource s = Source {get= liftOut (get s :: Trampoline d m (Maybe x))}
381
382-- | The 'pipe' function splits the computation into two concurrent parts, /producer/ and /consumer/. The /producer/ is
383-- given a 'Sink' to put values into, and /consumer/ a 'Source' to get those values from. Once producer and consumer
384-- both complete, 'pipe' returns their paired results.
385pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
386        (Sink m a1 x -> Trampoline a1 m r1) -> (Source m a2 x -> Trampoline a2 m r2) -> Trampoline a m (r1, r2)
387pipe producer consumer = coupleNestedFiniteSequential (producer sink) (consumer source) where
388   sink = Sink {put= liftOut . (local . tryYield :: x -> Trampoline a1 m Bool),
389                canPut= liftOut (local canYield :: Trampoline a1 m Bool)} :: Sink m a1 x
390   source = Source (liftOut (local await :: Trampoline a2 m (Maybe x))) :: Source m a2 x
391
392-- | The 'pipeP' function is equivalent to 'pipe', except the /producer/ and /consumer/ are run in parallel.
393pipeP :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
394         (Sink m a1 x -> Trampoline a1 m r1) -> (Source m a2 x -> Trampoline a2 m r2) -> Trampoline a m (r1, r2)
395pipeP producer consumer = coupleNestedFinite (producer sink) (consumer source) where
396   sink = Sink {put= liftOut . (local . tryYield :: x -> Trampoline a1 m Bool),
397                canPut= liftOut (local canYield :: Trampoline a1 m Bool)} :: Sink m a1 x
398   source = Source (liftOut (local await :: Trampoline a2 m (Maybe x))) :: Source m a2 x
399
400-- | The 'pipePS' function acts either as 'pipeP' or as 'pipe', depending on the argument /parallel/.
401pipePS :: forall m a a1 a2 x r1 r2. (ParallelizableMonad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
402          Bool -> (Sink m a1 x -> Trampoline a1 m r1) -> (Source m a2 x -> Trampoline a2 m r2) ->
403          Trampoline a m (r1, r2)
404pipePS parallel = if parallel then pipeP else pipe
405
406getSuccess :: forall m a d x . (Monad m, AncestorFunctor a d)
407              => Source m a x -> (x -> Trampoline d m ()) {- ^ Success continuation -} -> Trampoline d m ()
408getSuccess source succeed = get source >>= maybe (return ()) succeed
409
410-- | Function 'get'' assumes that the argument source is not empty and returns the value the source yields. If the
411-- source is empty, the function throws an error.
412get' :: forall m a d x . (Monad m, AncestorFunctor a d) => Source m a x -> Trampoline d m x
413get' source = get source >>= maybe (error "get' failed") return
414
415-- | 'pour' copies all data from the /source/ argument into the /sink/ argument, as long as there is anything to copy
416-- and the sink accepts it.
417pour :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
418        => Source m a1 x -> Sink m a2 x -> Trampoline d m ()
419pour source sink = fill'
420   where fill' = canPut sink >>= flip when (getSuccess source (\x-> put sink x >> fill'))
421
422-- | 'pourMap' is like 'pour' that applies the function /f/ to each argument before passing it into the /sink/.
423pourMap :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
424           => (x -> y) -> Source m a1 x -> Sink m a2 y -> Trampoline d m ()
425pourMap f source sink = loop
426   where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> put sink (f x) >> loop))
427
428-- | 'pourMapMaybe' is to 'pourMap' like 'Data.Maybe.mapMaybe' is to 'Data.List.Map'.
429pourMapMaybe :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
430                => (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Trampoline d m ()
431pourMapMaybe f source sink = loop
432   where loop = canPut sink >>= flip when (get source >>= maybe (return ()) (\x-> maybe (return False) (put sink) (f x) >> loop))
433
434-- | 'tee' is similar to 'pour' except it distributes every input value from the /source/ arguments into both /sink1/
435-- and /sink2/.
436tee :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
437       => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Trampoline d m ()
438tee source sink1 sink2 = distribute
439   where distribute = do c1 <- canPut sink1
440                         c2 <- canPut sink2
441                         when (c1 && c2)
442                            (get source >>= maybe (return ()) (\x-> put sink1 x >> put sink2 x >> distribute))
443
444-- | 'putList' puts entire list into its /sink/ argument, as long as the sink accepts it. The remainder that wasn't
445-- accepted by the sink is the result value.
446putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Trampoline d m [x]
447putList [] sink = return []
448putList l@(x:rest) sink = put sink x >>= cond (putList rest sink) (return l)
449
450-- | 'getList' returns the list of all values generated by the source.
451getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Trampoline d m [x]
452getList source = getList' return
453   where getList' f = get source >>= maybe (f []) (\x-> getList' (f . (x:)))
454
455-- | 'consumeAndSuppress' consumes the entire source ignoring the values it generates.
456consumeAndSuppress :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Trampoline d m ()
457consumeAndSuppress source = get source
458                            >>= maybe (return ()) (const (consumeAndSuppress source))
459
460-- | A utility function wrapping if-then-else, useful for handling monadic truth values
461cond :: a -> a -> Bool -> a
462cond x y test = if test then x else y
463
464-- | A utility function, useful for handling monadic list values where empty list means success
465whenNull :: forall a m. Monad m => m [a] -> [a] -> m [a]
466whenNull action list = if null list then action else return list
467
468-- | Like 'putList', except it puts the contents of the given 'Data.Sequence.Seq' into the sink.
469putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Trampoline d m [x]
470putQueue q sink = putList (toList (viewl q)) sink