Ticket #4001: 0001-Implement-atomicReadMVar-fixing-4001-optimized.patch

File 0001-Implement-atomicReadMVar-fixing-4001-optimized.patch, 14.9 KB (added by ezyang, 9 months ago)

Variant which doesn't add a new field to MVar

  • compiler/prelude/primops.txt.pp

    From 7e082c025217abab924306307f6aef152d483034 Mon Sep 17 00:00:00 2001
    From: "Edward Z. Yang" <ezyang@mit.edu>
    Date: Mon, 8 Jul 2013 11:03:35 -0700
    Subject: [PATCH] Implement atomicReadMVar, fixing #4001.
    
    We add the invariant to the MVar blocked threads queue that
    threads blocked on an atomic read are always at the front of
    the queue.  This invariant is easy to maintain, since takers
    are only ever added to the end of the queue.
    
    Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
    ---
     compiler/prelude/primops.txt.pp |  9 ++++++
     includes/rts/Constants.h        | 25 ++++++++-------
     includes/stg/MiscClosures.h     |  3 ++
     rts/HeapStackCheck.cmm          | 31 ++++++++++++++++--
     rts/Linker.c                    |  2 ++
     rts/PrimOps.cmm                 | 71 +++++++++++++++++++++++++++++++++++++++--
     rts/RaiseAsync.c                |  2 ++
     rts/RaiseAsync.h                |  1 +
     rts/RetainerProfile.c           |  1 +
     rts/Schedule.c                  |  2 ++
     rts/Threads.c                   |  4 +++
     rts/Trace.c                     |  1 +
     rts/sm/Compact.c                |  1 +
     rts/sm/Sanity.c                 |  1 +
     rts/sm/Scav.c                   |  1 +
     15 files changed, 138 insertions(+), 17 deletions(-)
    
    diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
    index 7203c11..739092d 100644
    a b primop TryPutMVarOp "tryPutMVar#" GenPrimOp 
    17171717   out_of_line      = True 
    17181718   has_side_effects = True 
    17191719 
     1720primop  AtomicReadMVarOp "atomicReadMVar#" GenPrimOp 
     1721   MVar# s a -> State# s -> (# State# s, a #) 
     1722   {If {\tt MVar\#} is empty, block until it becomes full. 
     1723   Then read its contents without modifying the MVar, without possibility 
     1724   of intervention from other threads.} 
     1725   with 
     1726   out_of_line      = True 
     1727   has_side_effects = True 
     1728 
    17201729primop  SameMVarOp "sameMVar#" GenPrimOp 
    17211730   MVar# s a -> MVar# s a -> Bool 
    17221731 
  • includes/rts/Constants.h

    diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h
    index 5ff4d4e..4739e3a 100644
    a b  
    202202 */ 
    203203#define NotBlocked          0 
    204204#define BlockedOnMVar       1 
    205 #define BlockedOnBlackHole  2 
    206 #define BlockedOnRead       3 
    207 #define BlockedOnWrite      4 
    208 #define BlockedOnDelay      5 
    209 #define BlockedOnSTM        6 
     205#define BlockedOnMVarRead   2 
     206#define BlockedOnBlackHole  3 
     207#define BlockedOnRead       4 
     208#define BlockedOnWrite      5 
     209#define BlockedOnDelay      6 
     210#define BlockedOnSTM        7 
    210211 
    211212/* Win32 only: */ 
    212 #define BlockedOnDoProc     7 
     213#define BlockedOnDoProc     8 
    213214 
    214215/* Only relevant for PAR: */ 
    215216  /* blocked on a remote closure represented by a Global Address: */ 
    216 #define BlockedOnGA         8 
     217#define BlockedOnGA         9 
    217218  /* same as above but without sending a Fetch message */ 
    218 #define BlockedOnGA_NoSend  9 
     219#define BlockedOnGA_NoSend  10 
    219220/* Only relevant for THREADED_RTS: */ 
    220 #define BlockedOnCCall      10 
    221 #define BlockedOnCCall_Interruptible 11 
     221#define BlockedOnCCall      11 
     222#define BlockedOnCCall_Interruptible 12 
    222223   /* same as above but permit killing the worker thread */ 
    223224 
    224225/* Involved in a message sent to tso->msg_cap */ 
    225 #define BlockedOnMsgThrowTo 12 
     226#define BlockedOnMsgThrowTo 13 
    226227 
    227228/* The thread is not on any run queues, but can be woken up 
    228229   by tryWakeupThread() */ 
    229 #define ThreadMigrating     13 
     230#define ThreadMigrating     14 
    230231 
    231232/* 
    232233 * These constants are returned to the scheduler by a thread that has 
  • includes/stg/MiscClosures.h

    diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
    index 8717687..88cee59 100644
    a b RTS_FUN_DECL(stg_block_noregs); 
    293293RTS_FUN_DECL(stg_block_blackhole); 
    294294RTS_FUN_DECL(stg_block_blackhole_finally); 
    295295RTS_FUN_DECL(stg_block_takemvar); 
     296RTS_FUN_DECL(stg_block_atomicreadmvar); 
    296297RTS_RET(stg_block_takemvar); 
     298RTS_RET(stg_block_atomicreadmvar); 
    297299RTS_FUN_DECL(stg_block_putmvar); 
    298300RTS_RET(stg_block_putmvar); 
    299301#ifdef mingw32_HOST_OS 
    RTS_FUN_DECL(stg_isEmptyMVarzh); 
    376378RTS_FUN_DECL(stg_newMVarzh); 
    377379RTS_FUN_DECL(stg_takeMVarzh); 
    378380RTS_FUN_DECL(stg_putMVarzh); 
     381RTS_FUN_DECL(stg_atomicReadMVarzh); 
    379382RTS_FUN_DECL(stg_tryTakeMVarzh); 
    380383RTS_FUN_DECL(stg_tryPutMVarzh); 
    381384 
  • rts/HeapStackCheck.cmm

    diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm
    index fbceb76..20cd9df 100644
    a b stg_block_noregs 
    487487/* ----------------------------------------------------------------------------- 
    488488 * takeMVar/putMVar-specific blocks 
    489489 * 
    490  * Stack layout for a thread blocked in takeMVar: 
     490 * Stack layout for a thread blocked in takeMVar/atomicReadMVar: 
    491491 *       
    492492 *       ret. addr 
    493493 *       ptr to MVar   (R1) 
    494  *       stg_block_takemvar_info 
     494 *       stg_block_takemvar_info (or stg_block_readmvar_info) 
    495495 * 
    496496 * Stack layout for a thread blocked in putMVar: 
    497497 *       
    stg_block_takemvar /* mvar passed in R1 */ 
    531531    BLOCK_BUT_FIRST(stg_block_takemvar_finally); 
    532532} 
    533533 
     534INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar ) 
     535    return () 
     536{ 
     537    jump stg_atomicReadMVarzh(mvar); 
     538} 
     539 
     540// code fragment executed just before we return to the scheduler 
     541stg_block_atomicreadmvar_finally 
     542{ 
     543    W_ r1, r3; 
     544    r1 = R1; 
     545    r3 = R3; 
     546    unlockClosure(R3, stg_MVAR_DIRTY_info); 
     547    R1 = r1; 
     548    R3 = r3; 
     549    jump StgReturn [R1]; 
     550} 
     551 
     552stg_block_atomicreadmvar /* mvar passed in R1 */ 
     553{ 
     554    Sp_adj(-2); 
     555    Sp(1) = R1; 
     556    Sp(0) = stg_block_atomicreadmvar_info; 
     557    R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3 
     558    BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally); 
     559} 
     560 
    534561INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr, 
    535562                P_ mvar, P_ val ) 
    536563    return () 
  • rts/Linker.c

    diff --git a/rts/Linker.c b/rts/Linker.c
    index 43edde2..9129b46 100644
    a b typedef struct _RtsSymbolVal { 
    10581058      SymI_HasProto(stg_yield_to_interpreter)                           \ 
    10591059      SymI_HasProto(stg_block_noregs)                                   \ 
    10601060      SymI_HasProto(stg_block_takemvar)                                 \ 
     1061      SymI_HasProto(stg_block_atomicreadmvar)                           \ 
    10611062      SymI_HasProto(stg_block_putmvar)                                  \ 
    10621063      MAIN_CAP_SYM                                                      \ 
    10631064      SymI_HasProto(MallocFailHook)                                     \ 
    typedef struct _RtsSymbolVal { 
    13141315      SymI_HasProto(stg_bh_upd_frame_info)                              \ 
    13151316      SymI_HasProto(suspendThread)                                      \ 
    13161317      SymI_HasProto(stg_takeMVarzh)                                     \ 
     1318      SymI_HasProto(stg_atomicReadMVarzh)                               \ 
    13171319      SymI_HasProto(stg_threadStatuszh)                                 \ 
    13181320      SymI_HasProto(stg_tryPutMVarzh)                                   \ 
    13191321      SymI_HasProto(stg_tryTakeMVarzh)                                  \ 
  • rts/PrimOps.cmm

    diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
    index a227e77..89086e5 100644
    a b loop: 
    14331433        goto loop; 
    14341434    } 
    14351435 
    1436     // There are takeMVar(s) waiting: wake up the first one 
     1436    // There are atomicReadMVar/takeMVar(s) waiting: wake up the first one 
    14371437 
    14381438    tso = StgMVarTSOQueue_tso(q); 
    14391439    StgMVar_head(mvar) = StgMVarTSOQueue_link(q); 
    loop: 
    14411441        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; 
    14421442    } 
    14431443 
    1444     ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); 
    14451444    ASSERT(StgTSO_block_info(tso) == mvar); 
    14461445 
    14471446    // actually perform the takeMVar 
    loop: 
    14581457 
    14591458    ccall tryWakeupThread(MyCapability() "ptr", tso); 
    14601459 
     1460    // If it was an atomicReadMVar, then we can still do work, 
     1461    // so loop back. (XXX: This could take a while) 
     1462    if (StgTSO_why_blocked(tso) == BlockedOnMVarRead::I16) { 
     1463        q = StgMVarTSOQueue_link(q); 
     1464        goto loop; 
     1465    } 
     1466 
     1467    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); 
     1468 
    14611469    unlockClosure(mvar, info); 
    14621470    return (); 
    14631471} 
    loop: 
    15121520        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; 
    15131521    } 
    15141522 
    1515     ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); 
    15161523    ASSERT(StgTSO_block_info(tso) == mvar); 
    15171524 
    15181525    // actually perform the takeMVar 
    loop: 
    15291536 
    15301537    ccall tryWakeupThread(MyCapability() "ptr", tso); 
    15311538 
     1539    // If it was an atomicReadMVar, then we can still do work, 
     1540    // so loop back. (XXX: This could take a while) 
     1541    if (StgTSO_why_blocked(tso) == BlockedOnMVarRead::I16) { 
     1542        q = StgMVarTSOQueue_link(q); 
     1543        goto loop; 
     1544    } 
     1545 
     1546    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); 
     1547 
    15321548    unlockClosure(mvar, info); 
    15331549    return (1); 
    15341550} 
    15351551 
     1552stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ ) 
     1553{ 
     1554    W_ val, info, tso, q; 
     1555 
     1556#if defined(THREADED_RTS) 
     1557    ("ptr" info) = ccall lockClosure(mvar "ptr"); 
     1558#else 
     1559    info = GET_INFO(mvar); 
     1560#endif 
     1561 
     1562    if (info == stg_MVAR_CLEAN_info) { 
     1563        ccall dirty_MVAR(BaseReg "ptr", mvar "ptr"); 
     1564    } 
     1565 
     1566    /* If the MVar is empty, put ourselves on the blocked readers 
     1567     * list and wait until we're woken up. 
     1568     */ 
     1569    if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) { 
     1570 
     1571        ALLOC_PRIM_WITH_CUSTOM_FAILURE 
     1572            (SIZEOF_StgMVarTSOQueue, 
     1573             unlockClosure(mvar, stg_MVAR_DIRTY_info); 
     1574             GC_PRIM_P(stg_atomicReadMVarzh, mvar)); 
     1575 
     1576        q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1); 
     1577 
     1578        // readMVars are pushed to the front of the queue, so 
     1579        // they get handled immediately 
     1580        SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); 
     1581        StgMVarTSOQueue_link(q) = StgMVar_head(mvar); 
     1582        StgMVarTSOQueue_tso(q)  = CurrentTSO; 
     1583 
     1584        StgTSO__link(CurrentTSO)       = q; 
     1585        StgTSO_block_info(CurrentTSO)  = mvar; 
     1586        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; 
     1587        StgMVar_head(mvar) = q; 
     1588 
     1589        if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { 
     1590            StgMVar_tail(mvar) = q; 
     1591        } 
     1592 
     1593        jump stg_block_atomicreadmvar(mvar); 
     1594    } 
     1595 
     1596    val = StgMVar_value(mvar); 
     1597 
     1598    unlockClosure(mvar, stg_MVAR_DIRTY_info); 
     1599    return (val); 
     1600} 
    15361601 
    15371602/* ----------------------------------------------------------------------------- 
    15381603   Stable pointer primitives 
  • rts/RaiseAsync.c

    diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
    index 11f518a..1fbb852 100644
    a b check_target: 
    294294    } 
    295295 
    296296    case BlockedOnMVar: 
     297    case BlockedOnMVarRead: 
    297298    { 
    298299        /* 
    299300          To establish ownership of this TSO, we need to acquire a 
    removeFromQueues(Capability *cap, StgTSO *tso) 
    637638    goto done; 
    638639 
    639640  case BlockedOnMVar: 
     641  case BlockedOnMVarRead: 
    640642      removeFromMVarBlockedQueue(tso); 
    641643      goto done; 
    642644 
  • rts/RaiseAsync.h

    diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
    index 336ab30..d804f6b 100644
    a b interruptible(StgTSO *t) 
    4949{ 
    5050  switch (t->why_blocked) { 
    5151  case BlockedOnMVar: 
     52  case BlockedOnMVarRead: 
    5253  case BlockedOnMsgThrowTo: 
    5354  case BlockedOnRead: 
    5455  case BlockedOnWrite: 
  • rts/RetainerProfile.c

    diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c
    index 77dc77c..dc21149 100644
    a b inner_loop: 
    16721672        retainClosure(tso->bq,                 c, c_child_r); 
    16731673        retainClosure(tso->trec,               c, c_child_r); 
    16741674        if (   tso->why_blocked == BlockedOnMVar 
     1675               || tso->why_blocked == BlockedOnMVarRead 
    16751676               || tso->why_blocked == BlockedOnBlackHole 
    16761677               || tso->why_blocked == BlockedOnMsgThrowTo 
    16771678            ) { 
  • rts/Schedule.c

    diff --git a/rts/Schedule.c b/rts/Schedule.c
    index 88bfd8c..408146f 100644
    a b scheduleDetectDeadlock (Capability **pcap, Task *task) 
    947947            case BlockedOnBlackHole: 
    948948            case BlockedOnMsgThrowTo: 
    949949            case BlockedOnMVar: 
     950            case BlockedOnMVarRead: 
    950951                throwToSingleThreaded(cap, task->incall->tso,  
    951952                                      (StgClosure *)nonTermination_closure); 
    952953                return; 
    resurrectThreads (StgTSO *threads) 
    28432844         
    28442845        switch (tso->why_blocked) { 
    28452846        case BlockedOnMVar: 
     2847        case BlockedOnMVarRead: 
    28462848            /* Called by GC - sched_mutex lock is currently held. */ 
    28472849            throwToSingleThreaded(cap, tso, 
    28482850                                  (StgClosure *)blockedIndefinitelyOnMVar_closure); 
  • rts/Threads.c

    diff --git a/rts/Threads.c b/rts/Threads.c
    index 4c990f1..f2b8005 100644
    a b tryWakeupThread (Capability *cap, StgTSO *tso) 
    255255    switch (tso->why_blocked) 
    256256    { 
    257257    case BlockedOnMVar: 
     258    case BlockedOnMVarRead: 
    258259    { 
    259260        if (tso->_link == END_TSO_QUEUE) { 
    260261            tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; 
    printThreadBlockage(StgTSO *tso) 
    734735  case BlockedOnMVar: 
    735736    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); 
    736737    break; 
     738  case BlockedOnMVarRead: 
     739    debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure); 
     740    break; 
    737741  case BlockedOnBlackHole: 
    738742      debugBelch("is blocked on a black hole %p",  
    739743                 ((StgBlockingQueue*)tso->block_info.bh->bh)); 
  • rts/Trace.c

    diff --git a/rts/Trace.c b/rts/Trace.c
    index 78dfead..2190189 100644
    a b static char *thread_stop_reasons[] = { 
    179179    [ThreadFinished] = "finished", 
    180180    [THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call", 
    181181    [6 + BlockedOnMVar]         = "blocked on an MVar", 
     182    [6 + BlockedOnMVarRead]     = "blocked on an atomic MVar read", 
    182183    [6 + BlockedOnBlackHole]    = "blocked on a black hole", 
    183184    [6 + BlockedOnRead]         = "blocked on a read operation", 
    184185    [6 + BlockedOnWrite]        = "blocked on a write operation", 
  • rts/sm/Compact.c

    diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c
    index 9c98dc9..247f1a0 100644
    a b thread_TSO (StgTSO *tso) 
    442442    thread_(&tso->global_link); 
    443443 
    444444    if (   tso->why_blocked == BlockedOnMVar 
     445        || tso->why_blocked == BlockedOnMVarRead 
    445446        || tso->why_blocked == BlockedOnBlackHole 
    446447        || tso->why_blocked == BlockedOnMsgThrowTo 
    447448        || tso->why_blocked == NotBlocked 
  • rts/sm/Sanity.c

    diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
    index f0e1659..9b579ab 100644
    a b checkTSO(StgTSO *tso) 
    519519           info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO() 
    520520 
    521521    if (   tso->why_blocked == BlockedOnMVar 
     522        || tso->why_blocked == BlockedOnMVarRead 
    522523        || tso->why_blocked == BlockedOnBlackHole 
    523524        || tso->why_blocked == BlockedOnMsgThrowTo 
    524525        || tso->why_blocked == NotBlocked 
  • rts/sm/Scav.c

    diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
    index 6137f6d..e0cc688 100644
    a b scavengeTSO (StgTSO *tso) 
    7171 
    7272    evacuate((StgClosure **)&tso->_link); 
    7373    if (   tso->why_blocked == BlockedOnMVar 
     74        || tso->why_blocked == BlockedOnMVarRead 
    7475        || tso->why_blocked == BlockedOnBlackHole 
    7576        || tso->why_blocked == BlockedOnMsgThrowTo 
    7677        || tso->why_blocked == NotBlocked