Ticket #4001: 0001-Implement-atomicReadMVar-fixing-4001-optimized.patch

File 0001-Implement-atomicReadMVar-fixing-4001-optimized.patch, 14.9 KB (added by ezyang, 2 years ago)

Variant which doesn't add a new field to MVar

  • compiler/prelude/primops.txt.pp

    From 7e082c025217abab924306307f6aef152d483034 Mon Sep 17 00:00:00 2001
    From: "Edward Z. Yang" <[email protected]>
    Date: Mon, 8 Jul 2013 11:03:35 -0700
    Subject: [PATCH] Implement atomicReadMVar, fixing #4001.
    
    We add the invariant to the MVar blocked threads queue that
    threads blocked on an atomic read are always at the front of
    the queue.  This invariant is easy to maintain, since takers
    are only ever added to the end of the queue.
    
    Signed-off-by: Edward Z. Yang <[email protected]>
    ---
     compiler/prelude/primops.txt.pp |  9 ++++++
     includes/rts/Constants.h        | 25 ++++++++-------
     includes/stg/MiscClosures.h     |  3 ++
     rts/HeapStackCheck.cmm          | 31 ++++++++++++++++--
     rts/Linker.c                    |  2 ++
     rts/PrimOps.cmm                 | 71 +++++++++++++++++++++++++++++++++++++++--
     rts/RaiseAsync.c                |  2 ++
     rts/RaiseAsync.h                |  1 +
     rts/RetainerProfile.c           |  1 +
     rts/Schedule.c                  |  2 ++
     rts/Threads.c                   |  4 +++
     rts/Trace.c                     |  1 +
     rts/sm/Compact.c                |  1 +
     rts/sm/Sanity.c                 |  1 +
     rts/sm/Scav.c                   |  1 +
     15 files changed, 138 insertions(+), 17 deletions(-)
    
    diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
    index 7203c11..739092d 100644
    a b primop TryPutMVarOp "tryPutMVar#" GenPrimOp 
    17171717   out_of_line      = True
    17181718   has_side_effects = True
    17191719
     1720primop  AtomicReadMVarOp "atomicReadMVar#" GenPrimOp
     1721   MVar# s a -> State# s -> (# State# s, a #)
     1722   {If {\tt MVar\#} is empty, block until it becomes full.
     1723   Then read its contents without modifying the MVar, without possibility
     1724   of intervention from other threads.}
     1725   with
     1726   out_of_line      = True
     1727   has_side_effects = True
     1728
    17201729primop  SameMVarOp "sameMVar#" GenPrimOp
    17211730   MVar# s a -> MVar# s a -> Bool
    17221731
  • includes/rts/Constants.h

    diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
    index 5ff4d4e..4739e3a 100644
    a b  
    202202 */
    203203#define NotBlocked          0
    204204#define BlockedOnMVar       1
    205 #define BlockedOnBlackHole  2
    206 #define BlockedOnRead       3
    207 #define BlockedOnWrite      4
    208 #define BlockedOnDelay      5
    209 #define BlockedOnSTM        6
     205#define BlockedOnMVarRead   2
     206#define BlockedOnBlackHole  3
     207#define BlockedOnRead       4
     208#define BlockedOnWrite      5
     209#define BlockedOnDelay      6
     210#define BlockedOnSTM        7
    210211
    211212/* Win32 only: */
    212 #define BlockedOnDoProc     7
     213#define BlockedOnDoProc     8
    213214
    214215/* Only relevant for PAR: */
    215216  /* blocked on a remote closure represented by a Global Address: */
    216 #define BlockedOnGA         8
     217#define BlockedOnGA         9
    217218  /* same as above but without sending a Fetch message */
    218 #define BlockedOnGA_NoSend  9
     219#define BlockedOnGA_NoSend  10
    219220/* Only relevant for THREADED_RTS: */
    220 #define BlockedOnCCall      10
    221 #define BlockedOnCCall_Interruptible 11
     221#define BlockedOnCCall      11
     222#define BlockedOnCCall_Interruptible 12
    222223   /* same as above but permit killing the worker thread */
    223224
    224225/* Involved in a message sent to tso->msg_cap */
    225 #define BlockedOnMsgThrowTo 12
     226#define BlockedOnMsgThrowTo 13
    226227
    227228/* The thread is not on any run queues, but can be woken up
    228229   by tryWakeupThread() */
    229 #define ThreadMigrating     13
     230#define ThreadMigrating     14
    230231
    231232/*
    232233 * These constants are returned to the scheduler by a thread that has
  • includes/stg/MiscClosures.h

    diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
    index 8717687..88cee59 100644
    a b RTS_FUN_DECL(stg_block_noregs); 
    293293RTS_FUN_DECL(stg_block_blackhole);
    294294RTS_FUN_DECL(stg_block_blackhole_finally);
    295295RTS_FUN_DECL(stg_block_takemvar);
     296RTS_FUN_DECL(stg_block_atomicreadmvar);
    296297RTS_RET(stg_block_takemvar);
     298RTS_RET(stg_block_atomicreadmvar);
    297299RTS_FUN_DECL(stg_block_putmvar);
    298300RTS_RET(stg_block_putmvar);
    299301#ifdef mingw32_HOST_OS
    RTS_FUN_DECL(stg_isEmptyMVarzh); 
    376378RTS_FUN_DECL(stg_newMVarzh);
    377379RTS_FUN_DECL(stg_takeMVarzh);
    378380RTS_FUN_DECL(stg_putMVarzh);
     381RTS_FUN_DECL(stg_atomicReadMVarzh);
    379382RTS_FUN_DECL(stg_tryTakeMVarzh);
    380383RTS_FUN_DECL(stg_tryPutMVarzh);
    381384
  • rts/HeapStackCheck.cmm

    diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
    index fbceb76..20cd9df 100644
    a b stg_block_noregs 
    487487/* -----------------------------------------------------------------------------
    488488 * takeMVar/putMVar-specific blocks
    489489 *
    490  * Stack layout for a thread blocked in takeMVar:
     490 * Stack layout for a thread blocked in takeMVar/atomicReadMVar:
    491491 *     
    492492 *       ret. addr
    493493 *       ptr to MVar   (R1)
    494  *       stg_block_takemvar_info
     494 *       stg_block_takemvar_info (or stg_block_readmvar_info)
    495495 *
    496496 * Stack layout for a thread blocked in putMVar:
    497497 *     
    stg_block_takemvar /* mvar passed in R1 */ 
    531531    BLOCK_BUT_FIRST(stg_block_takemvar_finally);
    532532}
    533533
     534INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar )
     535    return ()
     536{
     537    jump stg_atomicReadMVarzh(mvar);
     538}
     539
     540// code fragment executed just before we return to the scheduler
     541stg_block_atomicreadmvar_finally
     542{
     543    W_ r1, r3;
     544    r1 = R1;
     545    r3 = R3;
     546    unlockClosure(R3, stg_MVAR_DIRTY_info);
     547    R1 = r1;
     548    R3 = r3;
     549    jump StgReturn [R1];
     550}
     551
     552stg_block_atomicreadmvar /* mvar passed in R1 */
     553{
     554    Sp_adj(-2);
     555    Sp(1) = R1;
     556    Sp(0) = stg_block_atomicreadmvar_info;
     557    R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3
     558    BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally);
     559}
     560
    534561INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr,
    535562                P_ mvar, P_ val )
    536563    return ()
  • rts/Linker.c

    diff --git a/rts/Linker.c b/rts/Linker.c
    index 43edde2..9129b46 100644
    a b typedef struct _RtsSymbolVal { 
    10581058      SymI_HasProto(stg_yield_to_interpreter)                           \
    10591059      SymI_HasProto(stg_block_noregs)                                   \
    10601060      SymI_HasProto(stg_block_takemvar)                                 \
     1061      SymI_HasProto(stg_block_atomicreadmvar)                           \
    10611062      SymI_HasProto(stg_block_putmvar)                                  \
    10621063      MAIN_CAP_SYM                                                      \
    10631064      SymI_HasProto(MallocFailHook)                                     \
    typedef struct _RtsSymbolVal { 
    13141315      SymI_HasProto(stg_bh_upd_frame_info)                              \
    13151316      SymI_HasProto(suspendThread)                                      \
    13161317      SymI_HasProto(stg_takeMVarzh)                                     \
     1318      SymI_HasProto(stg_atomicReadMVarzh)                               \
    13171319      SymI_HasProto(stg_threadStatuszh)                                 \
    13181320      SymI_HasProto(stg_tryPutMVarzh)                                   \
    13191321      SymI_HasProto(stg_tryTakeMVarzh)                                  \
  • rts/PrimOps.cmm

    diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
    index a227e77..89086e5 100644
    a b loop: 
    14331433        goto loop;
    14341434    }
    14351435
    1436     // There are takeMVar(s) waiting: wake up the first one
     1436    // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one
    14371437
    14381438    tso = StgMVarTSOQueue_tso(q);
    14391439    StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
    loop: 
    14411441        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
    14421442    }
    14431443
    1444     ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
    14451444    ASSERT(StgTSO_block_info(tso) == mvar);
    14461445
    14471446    // actually perform the takeMVar
    loop: 
    14581457
    14591458    ccall tryWakeupThread(MyCapability() "ptr", tso);
    14601459
     1460    // If it was an atomicReadMVar, then we can still do work,
     1461    // so loop back. (XXX: This could take a while)
     1462    if (StgTSO_why_blocked(tso) == BlockedOnMVarRead::I16) {
     1463        q = StgMVarTSOQueue_link(q);
     1464        goto loop;
     1465    }
     1466
     1467    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     1468
    14611469    unlockClosure(mvar, info);
    14621470    return ();
    14631471}
    loop: 
    15121520        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
    15131521    }
    15141522
    1515     ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
    15161523    ASSERT(StgTSO_block_info(tso) == mvar);
    15171524
    15181525    // actually perform the takeMVar
    loop: 
    15291536
    15301537    ccall tryWakeupThread(MyCapability() "ptr", tso);
    15311538
     1539    // If it was an atomicReadMVar, then we can still do work,
     1540    // so loop back. (XXX: This could take a while)
     1541    if (StgTSO_why_blocked(tso) == BlockedOnMVarRead::I16) {
     1542        q = StgMVarTSOQueue_link(q);
     1543        goto loop;
     1544    }
     1545
     1546    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     1547
    15321548    unlockClosure(mvar, info);
    15331549    return (1);
    15341550}
    15351551
     1552stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
     1553{
     1554    W_ val, info, tso, q;
     1555
     1556#if defined(THREADED_RTS)
     1557    ("ptr" info) = ccall lockClosure(mvar "ptr");
     1558#else
     1559    info = GET_INFO(mvar);
     1560#endif
     1561
     1562    if (info == stg_MVAR_CLEAN_info) {
     1563        ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
     1564    }
     1565
     1566    /* If the MVar is empty, put ourselves on the blocked readers
     1567     * list and wait until we're woken up.
     1568     */
     1569    if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
     1570
     1571        ALLOC_PRIM_WITH_CUSTOM_FAILURE
     1572            (SIZEOF_StgMVarTSOQueue,
     1573             unlockClosure(mvar, stg_MVAR_DIRTY_info);
     1574             GC_PRIM_P(stg_atomicReadMVarzh, mvar));
     1575
     1576        q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
     1577
     1578        // readMVars are pushed to the front of the queue, so
     1579        // they get handled immediately
     1580        SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
     1581        StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
     1582        StgMVarTSOQueue_tso(q)  = CurrentTSO;
     1583
     1584        StgTSO__link(CurrentTSO)       = q;
     1585        StgTSO_block_info(CurrentTSO)  = mvar;
     1586        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
     1587        StgMVar_head(mvar) = q;
     1588
     1589        if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
     1590            StgMVar_tail(mvar) = q;
     1591        }
     1592
     1593        jump stg_block_atomicreadmvar(mvar);
     1594    }
     1595
     1596    val = StgMVar_value(mvar);
     1597
     1598    unlockClosure(mvar, stg_MVAR_DIRTY_info);
     1599    return (val);
     1600}
    15361601
    15371602/* -----------------------------------------------------------------------------
    15381603   Stable pointer primitives
  • rts/RaiseAsync.c

    diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
    index 11f518a..1fbb852 100644
    a b check_target: 
    294294    }
    295295
    296296    case BlockedOnMVar:
     297    case BlockedOnMVarRead:
    297298    {
    298299        /*
    299300          To establish ownership of this TSO, we need to acquire a
    removeFromQueues(Capability *cap, StgTSO *tso) 
    637638    goto done;
    638639
    639640  case BlockedOnMVar:
     641  case BlockedOnMVarRead:
    640642      removeFromMVarBlockedQueue(tso);
    641643      goto done;
    642644
  • rts/RaiseAsync.h

    diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
    index 336ab30..d804f6b 100644
    a b interruptible(StgTSO *t) 
    4949{
    5050  switch (t->why_blocked) {
    5151  case BlockedOnMVar:
     52  case BlockedOnMVarRead:
    5253  case BlockedOnMsgThrowTo:
    5354  case BlockedOnRead:
    5455  case BlockedOnWrite:
  • rts/RetainerProfile.c

    diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
    index 77dc77c..dc21149 100644
    a b inner_loop: 
    16721672        retainClosure(tso->bq,                 c, c_child_r);
    16731673        retainClosure(tso->trec,               c, c_child_r);
    16741674        if (   tso->why_blocked == BlockedOnMVar
     1675               || tso->why_blocked == BlockedOnMVarRead
    16751676               || tso->why_blocked == BlockedOnBlackHole
    16761677               || tso->why_blocked == BlockedOnMsgThrowTo
    16771678            ) {
  • rts/Schedule.c

    diff --git a/rts/Schedule.c b/rts/Schedule.c
    index 88bfd8c..408146f 100644
    a b scheduleDetectDeadlock (Capability **pcap, Task *task) 
    947947            case BlockedOnBlackHole:
    948948            case BlockedOnMsgThrowTo:
    949949            case BlockedOnMVar:
     950            case BlockedOnMVarRead:
    950951                throwToSingleThreaded(cap, task->incall->tso,
    951952                                      (StgClosure *)nonTermination_closure);
    952953                return;
    resurrectThreads (StgTSO *threads) 
    28432844       
    28442845        switch (tso->why_blocked) {
    28452846        case BlockedOnMVar:
     2847        case BlockedOnMVarRead:
    28462848            /* Called by GC - sched_mutex lock is currently held. */
    28472849            throwToSingleThreaded(cap, tso,
    28482850                                  (StgClosure *)blockedIndefinitelyOnMVar_closure);
  • rts/Threads.c

    diff --git a/rts/Threads.c b/rts/Threads.c
    index 4c990f1..f2b8005 100644
    a b tryWakeupThread (Capability *cap, StgTSO *tso) 
    255255    switch (tso->why_blocked)
    256256    {
    257257    case BlockedOnMVar:
     258    case BlockedOnMVarRead:
    258259    {
    259260        if (tso->_link == END_TSO_QUEUE) {
    260261            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
    printThreadBlockage(StgTSO *tso) 
    734735  case BlockedOnMVar:
    735736    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
    736737    break;
     738  case BlockedOnMVarRead:
     739    debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
     740    break;
    737741  case BlockedOnBlackHole:
    738742      debugBelch("is blocked on a black hole %p",
    739743                 ((StgBlockingQueue*)tso->block_info.bh->bh));
  • rts/Trace.c

    diff --git a/rts/Trace.c b/rts/Trace.c
    index 78dfead..2190189 100644
    a b static char *thread_stop_reasons[] = { 
    179179    [ThreadFinished] = "finished",
    180180    [THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call",
    181181    [6 + BlockedOnMVar]         = "blocked on an MVar",
     182    [6 + BlockedOnMVarRead]     = "blocked on an atomic MVar read",
    182183    [6 + BlockedOnBlackHole]    = "blocked on a black hole",
    183184    [6 + BlockedOnRead]         = "blocked on a read operation",
    184185    [6 + BlockedOnWrite]        = "blocked on a write operation",
  • rts/sm/Compact.c

    diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
    index 9c98dc9..247f1a0 100644
    a b thread_TSO (StgTSO *tso) 
    442442    thread_(&tso->global_link);
    443443
    444444    if (   tso->why_blocked == BlockedOnMVar
     445        || tso->why_blocked == BlockedOnMVarRead
    445446        || tso->why_blocked == BlockedOnBlackHole
    446447        || tso->why_blocked == BlockedOnMsgThrowTo
    447448        || tso->why_blocked == NotBlocked
  • rts/sm/Sanity.c

    diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
    index f0e1659..9b579ab 100644
    a b checkTSO(StgTSO *tso) 
    519519           info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO()
    520520
    521521    if (   tso->why_blocked == BlockedOnMVar
     522        || tso->why_blocked == BlockedOnMVarRead
    522523        || tso->why_blocked == BlockedOnBlackHole
    523524        || tso->why_blocked == BlockedOnMsgThrowTo
    524525        || tso->why_blocked == NotBlocked
  • rts/sm/Scav.c

    diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
    index 6137f6d..e0cc688 100644
    a b scavengeTSO (StgTSO *tso) 
    7171
    7272    evacuate((StgClosure **)&tso->_link);
    7373    if (   tso->why_blocked == BlockedOnMVar
     74        || tso->why_blocked == BlockedOnMVarRead
    7475        || tso->why_blocked == BlockedOnBlackHole
    7576        || tso->why_blocked == BlockedOnMsgThrowTo
    7677        || tso->why_blocked == NotBlocked