mdbx: парковка читающих транзакций.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev)
2024-07-09 16:04:01 +03:00
parent f335a16c92
commit ec0ada7b8c
11 changed files with 399 additions and 80 deletions

View File

@@ -3,7 +3,7 @@
#include "internals.h"
bsr_t mvcc_bind_slot(MDBX_env *env, const uintptr_t tid) {
bsr_t mvcc_bind_slot(MDBX_env *env) {
eASSERT(env, env->lck_mmap.lck);
eASSERT(env, env->lck->magic_and_version == MDBX_LOCK_MAGIC);
eASSERT(env, env->lck->os_and_format == MDBX_LOCK_FORMAT);
@@ -61,7 +61,8 @@ bsr_t mvcc_bind_slot(MDBX_env *env, const uintptr_t tid) {
safe64_reset(&result.rslot->txnid, true);
if (slot == nreaders)
env->lck->rdt_length.weak = (uint32_t)++nreaders;
result.rslot->tid.weak = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : tid;
result.rslot->tid.weak =
(env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
atomic_store32(&result.rslot->pid, env->pid, mo_AcquireRelease);
lck_rdt_unlock(env);
@@ -318,6 +319,92 @@ __cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked,
return rc;
}
int txn_park(MDBX_txn *txn, bool autounpark) {
reader_slot_t *const rslot = txn->to.reader;
tASSERT(txn, (txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY |
MDBX_TXN_PARKED)) == MDBX_TXN_RDONLY);
tASSERT(txn, txn->to.reader->tid.weak < MDBX_TID_TXN_OUSTED);
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY |
MDBX_TXN_PARKED)) != MDBX_TXN_RDONLY))
return MDBX_BAD_TXN;
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
const uint64_t tid = atomic_load64(&rslot->tid, mo_Relaxed);
const uint64_t txnid = atomic_load64(&rslot->txnid, mo_Relaxed);
if (unlikely(pid != txn->env->pid)) {
ERROR("unexpected pid %u%s%u", pid, " != must ", txn->env->pid);
return MDBX_PROBLEM;
}
if (unlikely(tid != txn->owner || txnid != txn->txnid)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%0zx"
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", txn->owner, txnid, " != must ", txn->txnid);
return MDBX_BAD_RSLOT;
}
atomic_store64(&rslot->tid, MDBX_TID_TXN_PARKED, mo_AcquireRelease);
atomic_store32(&txn->env->lck->rdt_refresh_flag, true, mo_Relaxed);
txn->flags +=
autounpark ? MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK : MDBX_TXN_PARKED;
return MDBX_SUCCESS;
}
int txn_unpark(MDBX_txn *txn) {
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD |
MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) !=
(MDBX_TXN_RDONLY | MDBX_TXN_PARKED)))
return MDBX_BAD_TXN;
for (reader_slot_t *const rslot = txn->to.reader; rslot; atomic_yield()) {
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
uint64_t tid = safe64_read(&rslot->tid);
uint64_t txnid = safe64_read(&rslot->txnid);
if (unlikely(pid != txn->env->pid)) {
ERROR("unexpected pid %u%s%u", pid, " != expected ", txn->env->pid);
return MDBX_PROBLEM;
}
if (unlikely(tid == MDBX_TID_TXN_OUSTED ||
txnid >= SAFE64_INVALID_THRESHOLD))
break;
if (unlikely(tid != MDBX_TID_TXN_PARKED || txnid != txn->txnid)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%" PRIx64
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", MDBX_TID_TXN_OUSTED, txnid, " != must ",
txn->txnid);
break;
}
if (unlikely((txn->flags & MDBX_TXN_ERROR)))
break;
#if MDBX_64BIT_CAS
if (unlikely(!atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, txn->owner)))
continue;
#else
atomic_store32(&rslot->tid.high, (uint32_t)((uint64_t)txn->owner >> 32),
mo_Relaxed);
if (unlikely(!atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED,
(uint32_t)txn->owner))) {
atomic_store32(&rslot->tid.high, (uint32_t)(MDBX_TID_TXN_PARKED >> 32),
mo_AcquireRelease);
continue;
}
#endif
txnid = safe64_read(&rslot->txnid);
tid = safe64_read(&rslot->tid);
if (unlikely(txnid != txn->txnid || tid != txn->owner)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%zx"
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", txn->owner, txnid, " != must ", txn->txnid);
break;
}
txn->flags &= ~(MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK);
return MDBX_SUCCESS;
}
int err = txn_end(txn, TXN_END_OUSTED | TXN_END_RESET | TXN_END_UPDATE);
return err ? err : MDBX_OUSTED;
}
__cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
DEBUG("DB size maxed out by reading #%" PRIaTXN, straggler);
osal_memory_fence(mo_AcquireRelease, false);
@@ -341,29 +428,61 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
if (MDBX_IS_ERROR(mvcc_cleanup_dead(env, false, nullptr)))
break;
if (!callback)
break;
reader_slot_t *stucked = nullptr;
uint64_t hold_retired = 0;
for (size_t i = 0; i < lck->rdt_length.weak; ++i) {
const uint64_t snap_retired =
atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed);
const txnid_t rtxn = safe64_read(&lck->rdt[i].txnid);
uint32_t pid;
reader_slot_t *const rslot = &lck->rdt[i];
txnid_t rtxn = safe64_read(&rslot->txnid);
retry:
if (rtxn == straggler &&
atomic_load32(&lck->rdt[i].pid, mo_AcquireRelease)) {
hold_retired = snap_retired;
stucked = &lck->rdt[i];
(pid = atomic_load32(&rslot->pid, mo_AcquireRelease)) != 0) {
const uint64_t tid = safe64_read(&rslot->tid);
if (tid == MDBX_TID_TXN_PARKED) {
/* Читающая транзакция была помечена владельцем как "припаркованная",
* т.е. подлежащая асинхронному прерыванию, либо восстановлению
* по активности читателя.
*
* Если первый CAS(slot->tid) будет успешным, то
* safe64_reset_compare() безопасно очистит txnid, либо откажется
* из-за того что читатель сбросил и/или перезапустил транзакцию.
* При этом читатеть может не заметить вытестения, если приступит
* к завершению транзакции. Все эти исходы нас устраивют.
*
* Если первый CAS(slot->tid) будет НЕ успешным, то значит читатеть
* восстановил транзакцию, либо завершил её, либо даже освободил слот.
*/
bool ousted =
#if MDBX_64BIT_CAS
atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED,
MDBX_TID_TXN_OUSTED);
#else
atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED,
(uint32_t)MDBX_TID_TXN_OUSTED);
#endif
if (likely(ousted)) {
ousted = safe64_reset_compare(&rslot->txnid, rtxn);
NOTICE("ousted-%s parked read-txn %" PRIaTXN
", pid %u, tid 0x%" PRIx64,
ousted ? "complete" : "half", rtxn, pid, tid);
eASSERT(env, ousted || safe64_read(&rslot->txnid) > straggler);
continue;
}
rtxn = safe64_read(&rslot->txnid);
goto retry;
}
hold_retired =
atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed);
stucked = rslot;
}
}
if (!stucked)
if (!callback || !stucked)
break;
uint32_t pid = atomic_load32(&stucked->pid, mo_AcquireRelease);
uint64_t tid = atomic_load64(&stucked->tid, mo_AcquireRelease);
if (safe64_read(&stucked->txnid) != straggler || !pid ||
stucked->snapshot_pages_retired.weak != hold_retired)
uint64_t tid = safe64_read(&stucked->tid);
if (safe64_read(&stucked->txnid) != straggler || !pid)
continue;
const meta_ptr_t head = meta_recent(env, &env->txn->tw.troika);
@@ -437,10 +556,7 @@ __cold int mdbx_thread_register(const MDBX_env *env) {
return MDBX_RESULT_TRUE /* already registered */;
}
const uintptr_t tid = osal_thread_self();
if (env->txn && unlikely(env->basal_txn->owner == tid))
return MDBX_TXN_OVERLAPPING;
return mvcc_bind_slot((MDBX_env *)env, tid).err;
return mvcc_bind_slot((MDBX_env *)env).err;
}
__cold int mdbx_thread_unregister(const MDBX_env *env) {