mdbx: rework find_oldest_reader().

1. Fix regression `assert: oldest >= lck->mti_oldest_reader.weak` after d4bf0a3332c7b05331ab0a87e3cd65b0903edc3c.
2. Add explicit check, kick and notice for stuck reader.
3. Made more e2k-frendly.
This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-07-08 22:14:28 +03:00
parent d572052178
commit a4a35ce9cb

View File

@ -5952,53 +5952,66 @@ static const char *mdbx_durable_str(volatile const MDBX_meta *const meta) {
/*----------------------------------------------------------------------------*/
/* Find oldest txnid still referenced. */
static txnid_t find_oldest_reader(const MDBX_env *env) {
const txnid_t steady_edge =
constmeta_txnid(env, constmeta_prefer_steady(env));
mdbx_assert(env, steady_edge <= env->me_txn0->mt_txnid);
static txnid_t find_oldest_reader(MDBX_env *env) {
const uint32_t nothing_changed = MDBX_STRING_TETRAD("None");
const txnid_t steady = constmeta_txnid(env, constmeta_prefer_steady(env));
mdbx_assert(env, steady <= env->me_txn0->mt_txnid);
MDBX_lockinfo *const lck = env->me_lck_mmap.lck;
if (unlikely(lck == NULL /* exclusive without-lck mode */)) {
mdbx_assert(env, env->me_lck == (void *)&env->x_lckless_stub);
return env->me_lck->mti_oldest_reader.weak = steady_edge;
return env->me_lck->mti_oldest_reader.weak = steady;
}
const txnid_t last_oldest =
const txnid_t prev_oldest =
atomic_load64(&lck->mti_oldest_reader, mo_AcquireRelease);
mdbx_assert(env, steady_edge >= last_oldest);
if (likely(last_oldest == steady_edge))
return steady_edge;
mdbx_assert(env, steady >= prev_oldest);
const uint32_t nothing_changed = MDBX_STRING_TETRAD("None");
const uint32_t snap_readers_refresh_flag =
atomic_load32(&lck->mti_readers_refresh_flag, mo_AcquireRelease);
txnid_t new_oldest = prev_oldest;
while (new_oldest != steady &&
nothing_changed !=
atomic_load32(&lck->mti_readers_refresh_flag, mo_AcquireRelease)) {
lck->mti_readers_refresh_flag.weak = nothing_changed;
mdbx_jitter4testing(false);
if (snap_readers_refresh_flag == nothing_changed)
return last_oldest;
atomic_store32(&lck->mti_readers_refresh_flag, nothing_changed, mo_Relaxed);
const unsigned snap_nreaders =
atomic_load32(&lck->mti_numreaders, mo_AcquireRelease);
txnid_t oldest = steady_edge;
new_oldest = steady;
for (unsigned i = 0; i < snap_nreaders; ++i) {
if (atomic_load32(&lck->mti_readers[i].mr_pid, mo_AcquireRelease)) {
/* mdbx_jitter4testing(true); */
const txnid_t snap = safe64_read(&lck->mti_readers[i].mr_txnid);
if (oldest > snap && /* ignore pending updates */ snap <= steady_edge) {
oldest = snap;
if (oldest == last_oldest)
return oldest;
const mdbx_pid_t pid =
atomic_load32(&lck->mti_readers[i].mr_pid, mo_AcquireRelease);
if (!pid)
continue;
mdbx_jitter4testing(true);
const txnid_t rtxn = safe64_read(&lck->mti_readers[i].mr_txnid);
if (unlikely(rtxn < prev_oldest)) {
if (unlikely(nothing_changed ==
atomic_load32(&lck->mti_readers_refresh_flag,
mo_AcquireRelease)) &&
safe64_reset_compare(&lck->mti_readers[i].mr_txnid, rtxn)) {
mdbx_notice("kick stuck reader[%u of %u].pid_%u %" PRIaTXN
" < prev-oldest %" PRIaTXN ", steady-txn %" PRIaTXN,
i, snap_nreaders, pid, rtxn, prev_oldest, steady);
}
continue;
}
if (rtxn < new_oldest) {
new_oldest = rtxn;
if (!MDBX_DEBUG && !MDBX_FORCE_ASSERTIONS && new_oldest == prev_oldest)
break;
}
}
if (oldest != last_oldest) {
mdbx_verbose("update oldest %" PRIaTXN " -> %" PRIaTXN, last_oldest,
oldest);
mdbx_assert(env, oldest >= lck->mti_oldest_reader.weak);
atomic_store64(&lck->mti_oldest_reader, oldest, mo_Relaxed);
if (new_oldest != prev_oldest) {
mdbx_verbose("update oldest %" PRIaTXN " -> %" PRIaTXN, prev_oldest,
new_oldest);
mdbx_assert(env, new_oldest >= lck->mti_oldest_reader.weak);
atomic_store64(&lck->mti_oldest_reader, new_oldest, mo_Relaxed);
}
return oldest;
}
return new_oldest;
}
/* Find largest mvcc-snapshot still referenced. */