mdbx: рефакторинг читающих транзакций в вычленением txn_ro_start(), txn_ro_seize(), txn_ro_slot().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2025-01-14 14:33:20 +03:00
parent 0accf98ff7
commit 7e772114bc
6 changed files with 230 additions and 190 deletions

View File

@ -131,7 +131,7 @@ int mdbx_txn_park(MDBX_txn *txn, bool autounpark) {
return LOG_IFERR(rc ? rc : MDBX_OUSTED);
}
return LOG_IFERR(txn_park(txn, autounpark));
return LOG_IFERR(txn_ro_park(txn, autounpark));
}
int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted) {
@ -147,7 +147,7 @@ int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted) {
if (unlikely(!F_ISSET(txn->flags, MDBX_TXN_RDONLY | MDBX_TXN_PARKED)))
return MDBX_SUCCESS;
rc = txn_unpark(txn);
rc = txn_ro_unpark(txn);
if (likely(rc != MDBX_OUSTED) || !restart_if_ousted)
return LOG_IFERR(rc);

View File

@ -41,7 +41,7 @@ typedef struct node_search_result {
typedef struct bind_reader_slot_result {
int err;
reader_slot_t *rslot;
reader_slot_t *slot;
} bsr_t;
#include "atomics-ops.h"

View File

@ -50,23 +50,23 @@ bsr_t mvcc_bind_slot(MDBX_env *env) {
}
}
result.rslot = &env->lck->rdt[slot];
result.slot = &env->lck->rdt[slot];
/* Claim the reader slot, carefully since other code
* uses the reader table un-mutexed: First reset the
* slot, next publish it in lck->rdt_length. After
* that, it is safe for mdbx_env_close() to touch it.
* When it will be closed, we can finally claim it. */
atomic_store32(&result.rslot->pid, 0, mo_AcquireRelease);
safe64_reset(&result.rslot->txnid, true);
atomic_store32(&result.slot->pid, 0, mo_AcquireRelease);
safe64_reset(&result.slot->txnid, true);
if (slot == nreaders)
env->lck->rdt_length.weak = (uint32_t)++nreaders;
result.rslot->tid.weak = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
atomic_store32(&result.rslot->pid, env->pid, mo_AcquireRelease);
result.slot->tid.weak = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
atomic_store32(&result.slot->pid, env->pid, mo_AcquireRelease);
lck_rdt_unlock(env);
if (likely(env->flags & ENV_TXKEY)) {
eASSERT(env, env->registered_reader_pid == env->pid);
thread_rthc_set(env->me_txkey, result.rslot);
thread_rthc_set(env->me_txkey, result.slot);
}
return result;
}

View File

@ -43,8 +43,8 @@ MDBX_INTERNAL bool txn_refund(MDBX_txn *txn);
MDBX_INTERNAL txnid_t txn_snapshot_oldest(const MDBX_txn *const txn);
MDBX_INTERNAL int txn_abort(MDBX_txn *txn);
MDBX_INTERNAL int txn_renew(MDBX_txn *txn, unsigned flags);
MDBX_INTERNAL int txn_park(MDBX_txn *txn, bool autounpark);
MDBX_INTERNAL int txn_unpark(MDBX_txn *txn);
MDBX_INTERNAL int txn_ro_park(MDBX_txn *txn, bool autounpark);
MDBX_INTERNAL int txn_ro_unpark(MDBX_txn *txn);
MDBX_INTERNAL int txn_check_badbits_parked(const MDBX_txn *txn, int bad_bits);
MDBX_INTERNAL void txn_done_cursors(MDBX_txn *txn);
MDBX_INTERNAL int txn_shadow_cursors(const MDBX_txn *parent, const size_t dbi);
@ -80,6 +80,7 @@ MDBX_INTERNAL int txn_nested_join(MDBX_txn *txn, struct commit_timestamp *ts);
MDBX_INTERNAL int txn_basal_commit(MDBX_txn *txn, struct commit_timestamp *ts);
MDBX_INTERNAL int txn_basal_end(MDBX_txn *txn, unsigned mode);
MDBX_INTERNAL int txn_ro_end(MDBX_txn *txn, unsigned mode);
MDBX_INTERNAL int txn_ro_start(MDBX_txn *txn, unsigned flags);
/* env.c */
MDBX_INTERNAL int env_open(MDBX_env *env, mdbx_mode_t mode);

View File

@ -3,80 +3,159 @@
#include "internals.h"
int txn_park(MDBX_txn *txn, bool autounpark) {
reader_slot_t *const rslot = txn->to.reader;
tASSERT(txn, (txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) == MDBX_TXN_RDONLY);
tASSERT(txn, txn->to.reader->tid.weak < MDBX_TID_TXN_OUSTED);
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) != MDBX_TXN_RDONLY))
return MDBX_BAD_TXN;
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
const uint64_t tid = atomic_load64(&rslot->tid, mo_Relaxed);
const uint64_t txnid = atomic_load64(&rslot->txnid, mo_Relaxed);
if (unlikely(pid != txn->env->pid)) {
ERROR("unexpected pid %u%s%u", pid, " != must ", txn->env->pid);
return MDBX_PROBLEM;
}
if (unlikely(tid != txn->owner || txnid != txn->txnid)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%0zx"
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", txn->owner, txnid, " != must ", txn->txnid);
static inline int txn_ro_rslot(MDBX_txn *txn) {
reader_slot_t *slot = txn->to.reader;
STATIC_ASSERT(sizeof(uintptr_t) <= sizeof(slot->tid));
if (likely(slot)) {
if (likely(slot->pid.weak == txn->env->pid && slot->txnid.weak >= SAFE64_INVALID_THRESHOLD)) {
tASSERT(txn, slot->pid.weak == osal_getpid());
tASSERT(txn, slot->tid.weak == ((txn->env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self()));
return MDBX_SUCCESS;
}
return MDBX_BAD_RSLOT;
}
atomic_store64(&rslot->tid, MDBX_TID_TXN_PARKED, mo_AcquireRelease);
atomic_store32(&txn->env->lck->rdt_refresh_flag, true, mo_Relaxed);
txn->flags += autounpark ? MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK : MDBX_TXN_PARKED;
return MDBX_SUCCESS;
if (unlikely(!txn->env->lck_mmap.lck))
return MDBX_SUCCESS;
MDBX_env *const env = txn->env;
if (env->flags & ENV_TXKEY) {
eASSERT(env, !(env->flags & MDBX_NOSTICKYTHREADS));
slot = thread_rthc_get(env->me_txkey);
if (likely(slot)) {
if (likely(slot->pid.weak == env->pid && slot->txnid.weak >= SAFE64_INVALID_THRESHOLD)) {
tASSERT(txn, slot->pid.weak == osal_getpid());
tASSERT(txn, slot->tid.weak == ((env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self()));
txn->to.reader = slot;
return MDBX_SUCCESS;
}
if (unlikely(slot->pid.weak) || !(globals.runtime_flags & MDBX_DBG_LEGACY_MULTIOPEN))
return MDBX_BAD_RSLOT;
thread_rthc_set(env->me_txkey, nullptr);
}
} else {
eASSERT(env, (env->flags & MDBX_NOSTICKYTHREADS));
}
bsr_t brs = mvcc_bind_slot(env);
if (likely(brs.err == MDBX_SUCCESS)) {
tASSERT(txn, brs.slot->pid.weak == osal_getpid());
tASSERT(txn, brs.slot->tid.weak == ((env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self()));
}
txn->to.reader = brs.slot;
return brs.err;
}
int txn_unpark(MDBX_txn *txn) {
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) !=
(MDBX_TXN_RDONLY | MDBX_TXN_PARKED)))
return MDBX_BAD_TXN;
for (reader_slot_t *const rslot = txn->to.reader; rslot; atomic_yield()) {
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
uint64_t tid = safe64_read(&rslot->tid);
uint64_t txnid = safe64_read(&rslot->txnid);
if (unlikely(pid != txn->env->pid)) {
ERROR("unexpected pid %u%s%u", pid, " != expected ", txn->env->pid);
return MDBX_PROBLEM;
static inline int txn_ro_seize(MDBX_txn *txn) {
/* Seek & fetch the last meta */
troika_t troika = meta_tap(txn->env);
uint64_t timestamp = 0;
size_t loop = 0;
do {
MDBX_env *const env = txn->env;
const meta_ptr_t head = likely(env->stuck_meta < 0) ? /* regular */ meta_recent(env, &troika)
: /* recovery mode */ meta_ptr(env, env->stuck_meta);
reader_slot_t *const r = txn->to.reader;
if (likely(r != nullptr)) {
safe64_reset(&r->txnid, true);
atomic_store32(&r->snapshot_pages_used, head.ptr_v->geometry.first_unallocated, mo_Relaxed);
atomic_store64(&r->snapshot_pages_retired, unaligned_peek_u64_volatile(4, head.ptr_v->pages_retired), mo_Relaxed);
safe64_write(&r->txnid, head.txnid);
eASSERT(env, r->pid.weak == osal_getpid());
eASSERT(env, r->tid.weak == ((env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self()));
eASSERT(env, r->txnid.weak == head.txnid ||
(r->txnid.weak >= SAFE64_INVALID_THRESHOLD && head.txnid < env->lck->cached_oldest.weak));
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
} else {
/* exclusive mode without lck */
eASSERT(env, !env->lck_mmap.lck && env->lck == lckless_stub(env));
}
if (unlikely(tid == MDBX_TID_TXN_OUSTED || txnid >= SAFE64_INVALID_THRESHOLD))
break;
if (unlikely(tid != MDBX_TID_TXN_PARKED || txnid != txn->txnid)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%" PRIx64 " and/or txn-id %" PRIaTXN "%s%" PRIaTXN, tid, " != must ",
MDBX_TID_TXN_OUSTED, txnid, " != must ", txn->txnid);
break;
}
if (unlikely((txn->flags & MDBX_TXN_ERROR)))
break;
jitter4testing(true);
#if MDBX_64BIT_CAS
if (unlikely(!atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, txn->owner)))
continue;
#else
atomic_store32(&rslot->tid.high, (uint32_t)((uint64_t)txn->owner >> 32), mo_Relaxed);
if (unlikely(!atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED, (uint32_t)txn->owner))) {
atomic_store32(&rslot->tid.high, (uint32_t)(MDBX_TID_TXN_PARKED >> 32), mo_AcquireRelease);
if (unlikely(meta_should_retry(env, &troika))) {
timestamp = 0;
continue;
}
#endif
txnid = safe64_read(&rslot->txnid);
tid = safe64_read(&rslot->tid);
if (unlikely(txnid != txn->txnid || tid != txn->owner)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%zx"
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", txn->owner, txnid, " != must ", txn->txnid);
break;
/* Snap the state from current meta-head */
int err = coherency_fetch_head(txn, head, &timestamp);
jitter4testing(false);
if (unlikely(err != MDBX_SUCCESS)) {
if (err != MDBX_RESULT_TRUE)
return err;
continue;
}
txn->flags &= ~(MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK);
const uint64_t snap_oldest = atomic_load64(&env->lck->cached_oldest, mo_AcquireRelease);
if (unlikely(txn->txnid < snap_oldest)) {
if (env->stuck_meta >= 0) {
ERROR("target meta-page %i is referenced to an obsolete MVCC-snapshot "
"%" PRIaTXN " < cached-oldest %" PRIaTXN,
env->stuck_meta, txn->txnid, snap_oldest);
return MDBX_MVCC_RETARDED;
}
continue;
}
if (!r || likely(txn->txnid == atomic_load64(&r->txnid, mo_Relaxed)))
return MDBX_SUCCESS;
} while (likely(++loop < 42));
ERROR("bailout waiting for valid snapshot (%s)", "meta-pages are too volatile");
return MDBX_PROBLEM;
}
int txn_ro_start(MDBX_txn *txn, unsigned flags) {
MDBX_env *const env = txn->env;
eASSERT(env, flags & MDBX_TXN_RDONLY);
eASSERT(env, (flags & ~(txn_ro_begin_flags | MDBX_WRITEMAP | MDBX_NOSTICKYTHREADS)) == 0);
txn->flags = flags;
int err = txn_ro_rslot(txn);
if (unlikely(err != MDBX_SUCCESS))
goto bailout;
STATIC_ASSERT(MDBX_TXN_RDONLY_PREPARE > MDBX_TXN_RDONLY);
reader_slot_t *r = txn->to.reader;
if (flags & (MDBX_TXN_RDONLY_PREPARE - MDBX_TXN_RDONLY)) {
eASSERT(env, txn->txnid == 0);
eASSERT(env, txn->owner == 0);
eASSERT(env, txn->n_dbi == 0);
if (likely(r)) {
eASSERT(env, r->snapshot_pages_used.weak == 0);
eASSERT(env, r->txnid.weak >= SAFE64_INVALID_THRESHOLD);
atomic_store32(&r->snapshot_pages_used, 0, mo_Relaxed);
}
txn->flags = MDBX_TXN_RDONLY | MDBX_TXN_FINISHED;
return MDBX_SUCCESS;
}
int err = txn_end(txn, TXN_END_OUSTED | TXN_END_RESET | TXN_END_UPDATE);
return err ? err : MDBX_OUSTED;
txn->owner = likely(r) ? (uintptr_t)r->tid.weak : ((env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self());
if ((env->flags & MDBX_NOSTICKYTHREADS) == 0 && env->txn && unlikely(env->basal_txn->owner == txn->owner) &&
(globals.runtime_flags & MDBX_DBG_LEGACY_OVERLAP) == 0) {
err = MDBX_TXN_OVERLAPPING;
goto bailout;
}
err = txn_ro_seize(txn);
if (unlikely(err != MDBX_SUCCESS))
goto bailout;
if (unlikely(txn->txnid < MIN_TXNID || txn->txnid > MAX_TXNID)) {
ERROR("%s", "environment corrupted by died writer, must shutdown!");
err = MDBX_CORRUPTED;
goto bailout;
}
return MDBX_SUCCESS;
bailout:
tASSERT(txn, err != MDBX_SUCCESS);
txn->txnid = INVALID_TXNID;
if (likely(txn->to.reader))
safe64_reset(&txn->to.reader->txnid, true);
return err;
}
int txn_ro_end(MDBX_txn *txn, unsigned mode) {
@ -132,3 +211,79 @@ int txn_ro_end(MDBX_txn *txn, unsigned mode) {
}
return MDBX_SUCCESS;
}
int txn_ro_park(MDBX_txn *txn, bool autounpark) {
reader_slot_t *const rslot = txn->to.reader;
tASSERT(txn, (txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) == MDBX_TXN_RDONLY);
tASSERT(txn, txn->to.reader->tid.weak < MDBX_TID_TXN_OUSTED);
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) != MDBX_TXN_RDONLY))
return MDBX_BAD_TXN;
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
const uint64_t tid = atomic_load64(&rslot->tid, mo_Relaxed);
const uint64_t txnid = atomic_load64(&rslot->txnid, mo_Relaxed);
if (unlikely(pid != txn->env->pid)) {
ERROR("unexpected pid %u%s%u", pid, " != must ", txn->env->pid);
return MDBX_PROBLEM;
}
if (unlikely(tid != txn->owner || txnid != txn->txnid)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%0zx"
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", txn->owner, txnid, " != must ", txn->txnid);
return MDBX_BAD_RSLOT;
}
atomic_store64(&rslot->tid, MDBX_TID_TXN_PARKED, mo_AcquireRelease);
atomic_store32(&txn->env->lck->rdt_refresh_flag, true, mo_Relaxed);
txn->flags += autounpark ? MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK : MDBX_TXN_PARKED;
return MDBX_SUCCESS;
}
int txn_ro_unpark(MDBX_txn *txn) {
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) !=
(MDBX_TXN_RDONLY | MDBX_TXN_PARKED)))
return MDBX_BAD_TXN;
for (reader_slot_t *const rslot = txn->to.reader; rslot; atomic_yield()) {
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
uint64_t tid = safe64_read(&rslot->tid);
uint64_t txnid = safe64_read(&rslot->txnid);
if (unlikely(pid != txn->env->pid)) {
ERROR("unexpected pid %u%s%u", pid, " != expected ", txn->env->pid);
return MDBX_PROBLEM;
}
if (unlikely(tid == MDBX_TID_TXN_OUSTED || txnid >= SAFE64_INVALID_THRESHOLD))
break;
if (unlikely(tid != MDBX_TID_TXN_PARKED || txnid != txn->txnid)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%" PRIx64 " and/or txn-id %" PRIaTXN "%s%" PRIaTXN, tid, " != must ",
MDBX_TID_TXN_OUSTED, txnid, " != must ", txn->txnid);
break;
}
if (unlikely((txn->flags & MDBX_TXN_ERROR)))
break;
#if MDBX_64BIT_CAS
if (unlikely(!atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, txn->owner)))
continue;
#else
atomic_store32(&rslot->tid.high, (uint32_t)((uint64_t)txn->owner >> 32), mo_Relaxed);
if (unlikely(!atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED, (uint32_t)txn->owner))) {
atomic_store32(&rslot->tid.high, (uint32_t)(MDBX_TID_TXN_PARKED >> 32), mo_AcquireRelease);
continue;
}
#endif
txnid = safe64_read(&rslot->txnid);
tid = safe64_read(&rslot->tid);
if (unlikely(txnid != txn->txnid || tid != txn->owner)) {
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%zx"
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
tid, " != must ", txn->owner, txnid, " != must ", txn->txnid);
break;
}
txn->flags &= ~(MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK);
return MDBX_SUCCESS;
}
int err = txn_end(txn, TXN_END_OUSTED | TXN_END_RESET | TXN_END_UPDATE);
return err ? err : MDBX_OUSTED;
}

120
src/txn.c
View File

@ -76,125 +76,9 @@ int txn_renew(MDBX_txn *txn, unsigned flags) {
flags |= env->flags & (MDBX_NOSTICKYTHREADS | MDBX_WRITEMAP);
if (flags & MDBX_TXN_RDONLY) {
eASSERT(env, (flags & ~(txn_ro_begin_flags | MDBX_WRITEMAP | MDBX_NOSTICKYTHREADS)) == 0);
txn->flags = flags;
reader_slot_t *r = txn->to.reader;
STATIC_ASSERT(sizeof(uintptr_t) <= sizeof(r->tid));
if (likely(env->flags & ENV_TXKEY)) {
eASSERT(env, !(env->flags & MDBX_NOSTICKYTHREADS));
r = thread_rthc_get(env->me_txkey);
if (likely(r)) {
if (unlikely(!r->pid.weak) && (globals.runtime_flags & MDBX_DBG_LEGACY_MULTIOPEN)) {
thread_rthc_set(env->me_txkey, nullptr);
r = nullptr;
} else {
eASSERT(env, r->pid.weak == env->pid);
eASSERT(env, r->tid.weak == osal_thread_self());
}
}
} else {
eASSERT(env, !env->lck_mmap.lck || (env->flags & MDBX_NOSTICKYTHREADS));
}
if (likely(r)) {
if (unlikely(r->pid.weak != env->pid || r->txnid.weak < SAFE64_INVALID_THRESHOLD))
return MDBX_BAD_RSLOT;
} else if (env->lck_mmap.lck) {
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS))
return brs.err;
r = brs.rslot;
}
txn->to.reader = r;
STATIC_ASSERT(MDBX_TXN_RDONLY_PREPARE > MDBX_TXN_RDONLY);
if (flags & (MDBX_TXN_RDONLY_PREPARE - MDBX_TXN_RDONLY)) {
eASSERT(env, txn->txnid == 0);
eASSERT(env, txn->owner == 0);
eASSERT(env, txn->n_dbi == 0);
if (likely(r)) {
eASSERT(env, r->snapshot_pages_used.weak == 0);
eASSERT(env, r->txnid.weak >= SAFE64_INVALID_THRESHOLD);
atomic_store32(&r->snapshot_pages_used, 0, mo_Relaxed);
}
txn->flags = MDBX_TXN_RDONLY | MDBX_TXN_FINISHED;
return MDBX_SUCCESS;
}
txn->owner = likely(r) ? (uintptr_t)r->tid.weak : ((env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self());
if ((env->flags & MDBX_NOSTICKYTHREADS) == 0 && env->txn && unlikely(env->basal_txn->owner == txn->owner) &&
(globals.runtime_flags & MDBX_DBG_LEGACY_OVERLAP) == 0)
return MDBX_TXN_OVERLAPPING;
/* Seek & fetch the last meta */
uint64_t timestamp = 0;
size_t loop = 0;
troika_t troika = meta_tap(env);
while (1) {
const meta_ptr_t head = likely(env->stuck_meta < 0) ? /* regular */ meta_recent(env, &troika)
: /* recovery mode */ meta_ptr(env, env->stuck_meta);
if (likely(r != nullptr)) {
safe64_reset(&r->txnid, true);
atomic_store32(&r->snapshot_pages_used, head.ptr_v->geometry.first_unallocated, mo_Relaxed);
atomic_store64(&r->snapshot_pages_retired, unaligned_peek_u64_volatile(4, head.ptr_v->pages_retired),
mo_Relaxed);
safe64_write(&r->txnid, head.txnid);
eASSERT(env, r->pid.weak == osal_getpid());
eASSERT(env, r->tid.weak == ((env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self()));
eASSERT(env, r->txnid.weak == head.txnid ||
(r->txnid.weak >= SAFE64_INVALID_THRESHOLD && head.txnid < env->lck->cached_oldest.weak));
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
} else {
/* exclusive mode without lck */
eASSERT(env, !env->lck_mmap.lck && env->lck == lckless_stub(env));
}
jitter4testing(true);
if (unlikely(meta_should_retry(env, &troika))) {
retry:
if (likely(++loop < 42)) {
timestamp = 0;
continue;
}
ERROR("bailout waiting for valid snapshot (%s)", "meta-pages are too volatile");
rc = MDBX_PROBLEM;
goto read_failed;
}
/* Snap the state from current meta-head */
rc = coherency_fetch_head(txn, head, &timestamp);
jitter4testing(false);
if (unlikely(rc != MDBX_SUCCESS)) {
if (rc == MDBX_RESULT_TRUE)
goto retry;
else
goto read_failed;
}
const uint64_t snap_oldest = atomic_load64(&env->lck->cached_oldest, mo_AcquireRelease);
if (unlikely(txn->txnid < snap_oldest)) {
if (env->stuck_meta < 0)
goto retry;
ERROR("target meta-page %i is referenced to an obsolete MVCC-snapshot "
"%" PRIaTXN " < cached-oldest %" PRIaTXN,
env->stuck_meta, txn->txnid, snap_oldest);
rc = MDBX_MVCC_RETARDED;
goto read_failed;
}
if (likely(r != nullptr) && unlikely(txn->txnid != atomic_load64(&r->txnid, mo_Relaxed)))
goto retry;
break;
}
if (unlikely(txn->txnid < MIN_TXNID || txn->txnid > MAX_TXNID)) {
ERROR("%s", "environment corrupted by died writer, must shutdown!");
rc = MDBX_CORRUPTED;
read_failed:
txn->txnid = INVALID_TXNID;
if (likely(r != nullptr))
safe64_reset(&r->txnid, true);
rc = txn_ro_start(txn, flags);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
}
tASSERT(txn, rc == MDBX_SUCCESS);
ENSURE(env, txn->txnid >=
/* paranoia is appropriate here */ env->lck->cached_oldest.weak);