mdbx: rework/simplify kick_longlived_readers().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-07-08 23:11:16 +03:00
parent 9421bb424d
commit 9aa2aae93e
2 changed files with 68 additions and 78 deletions

11
mdbx.h
View File

@ -5070,11 +5070,12 @@ LIBMDBX_API int mdbx_thread_unregister(const MDBX_env *env);
* this value into account to evaluate the impact that
* a long-running transaction has.
* \param [in] retry A retry number starting from 0.
* If callback has returned 0 at least once, then at end
* of current handling loop the callback function will be
* called additionally with negative value to notify about
* the end of loop. The callback function can use this value
* to implement timeout logic while waiting for readers.
* If callback has returned 0 at least once, then at end of
* current handling loop the callback function will be
* called additionally with negative `retry` value to notify
* about the end of loop. The callback function can use this
* fact to implement timeout reset logic while waiting for
* a readers.
*
* \returns The RETURN CODE determines the further actions libmdbx and must
* match the action which was executed by the callback:

View File

@ -3839,8 +3839,7 @@ typedef struct page_result {
int err;
} pgr_t;
static txnid_t mdbx_kick_longlived_readers(MDBX_env *env,
const txnid_t laggard);
static txnid_t kick_longlived_readers(MDBX_env *env, const txnid_t laggard);
static pgr_t page_new(MDBX_cursor *mc, const unsigned flags);
static pgr_t page_new_large(MDBX_cursor *mc, const unsigned npages);
@ -6966,9 +6965,8 @@ __cold static pgr_t page_alloc_slowpath(MDBX_cursor *mc, const pgno_t num,
if (flags & MDBX_ALLOC_GC) {
const txnid_t laggard = find_oldest_reader(env);
if (laggard >= detent ||
(laggard < txn->mt_txnid - xMDBX_TXNID_STEP &&
mdbx_kick_longlived_readers(env, laggard) >= detent))
if (laggard >= detent || (laggard < txn->mt_txnid - xMDBX_TXNID_STEP &&
kick_longlived_readers(env, laggard) >= detent))
continue;
}
@ -21648,69 +21646,51 @@ __cold int mdbx_setup_debug(int loglevel, int flags, MDBX_debug_func *logger) {
return rc;
}
__cold static txnid_t mdbx_kick_longlived_readers(MDBX_env *env,
const txnid_t laggard) {
__cold static txnid_t kick_longlived_readers(MDBX_env *env,
const txnid_t laggard) {
mdbx_debug("DB size maxed out by reading #%" PRIaTXN, laggard);
int retry;
for (retry = 0; retry < INT_MAX; ++retry) {
txnid_t oldest = constmeta_txnid(env, constmeta_prefer_steady(env));
MDBX_hsr_func *const callback = env->me_hsr_callback;
txnid_t oldest = 0;
bool notify_eof_of_loop = false;
int retry = 0;
do {
env->me_lck->mti_readers_refresh_flag.weak = /* force refresh */ true;
oldest = find_oldest_reader(env);
mdbx_assert(env, oldest < env->me_txn0->mt_txnid);
mdbx_assert(env, oldest >= laggard);
mdbx_assert(env, oldest >= env->me_lck->mti_oldest_reader.weak);
const txnid_t steady = meta_txnid(env, meta_prefer_steady(env));
MDBX_lockinfo *const lck = env->me_lck_mmap.lck;
if (oldest == laggard || unlikely(!lck /* without-LCK mode */))
return oldest;
if (oldest == steady || oldest > laggard || /* without-LCK mode */ !lck)
break;
if (MDBX_IS_ERROR(mdbx_cleanup_dead_readers(env, false, NULL)))
break;
MDBX_reader *asleep = nullptr;
uint64_t oldest_retired = UINT64_MAX;
const unsigned snap_nreaders =
atomic_load32(&lck->mti_numreaders, mo_AcquireRelease);
for (unsigned i = 0; i < snap_nreaders; ++i) {
retry:
if (atomic_load32(&lck->mti_readers[i].mr_pid, mo_AcquireRelease)) {
/* mdbx_jitter4testing(true); */
const uint64_t snap_retired = atomic_load64(
&lck->mti_readers[i].mr_snapshot_pages_retired, mo_Relaxed);
const txnid_t snap_txnid = safe64_read(&lck->mti_readers[i].mr_txnid);
if (unlikely(snap_retired !=
atomic_load64(
&lck->mti_readers[i].mr_snapshot_pages_retired,
mo_AcquireRelease) ||
snap_txnid != safe64_read(&lck->mti_readers[i].mr_txnid)))
goto retry;
if (oldest > snap_txnid &&
laggard <= /* ignore pending updates */ snap_txnid) {
oldest = snap_txnid;
oldest_retired = snap_retired;
asleep = &lck->mti_readers[i];
}
}
}
if (laggard < oldest || !asleep) {
if (retry && env->me_hsr_callback) {
/* LY: notify end of hsr-loop */
const txnid_t gap = oldest - laggard;
env->me_hsr_callback(env, env->me_txn, 0, 0, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, 0,
-retry);
}
mdbx_notice("hsr-kick: update oldest %" PRIaTXN " -> %" PRIaTXN,
lck->mti_oldest_reader.weak, oldest);
mdbx_assert(env, lck->mti_oldest_reader.weak <= oldest);
return atomic_store64(&lck->mti_oldest_reader, oldest, mo_Relaxed);
}
if (!env->me_hsr_callback)
if (!callback)
break;
uint32_t pid = atomic_load32(&asleep->mr_pid, mo_AcquireRelease);
uint64_t tid = asleep->mr_tid.weak;
if (safe64_read(&asleep->mr_txnid) != laggard || pid <= 0)
MDBX_reader *stucked = nullptr;
uint64_t hold_retired = 0;
for (unsigned i = 0; i < lck->mti_numreaders.weak; ++i) {
const uint64_t snap_retired = atomic_load64(
&lck->mti_readers[i].mr_snapshot_pages_retired, mo_Relaxed);
const txnid_t rtxn = safe64_read(&lck->mti_readers[i].mr_txnid);
if (rtxn == laggard &&
atomic_load32(&lck->mti_readers[i].mr_pid, mo_AcquireRelease)) {
hold_retired = snap_retired;
stucked = &lck->mti_readers[i];
}
}
if (!stucked)
break;
uint32_t pid = atomic_load32(&stucked->mr_pid, mo_AcquireRelease);
uint64_t tid = atomic_load64(&stucked->mr_tid, mo_AcquireRelease);
if (safe64_read(&stucked->mr_txnid) != laggard || !pid ||
stucked->mr_snapshot_pages_retired.weak != hold_retired)
continue;
const MDBX_meta *head_meta = constmeta_prefer_last(env);
@ -21719,32 +21699,41 @@ __cold static txnid_t mdbx_kick_longlived_readers(MDBX_env *env,
const uint64_t head_retired =
unaligned_peek_u64(4, head_meta->mm_pages_retired);
const size_t space =
(head_retired > oldest_retired)
? pgno2bytes(env, (pgno_t)(head_retired - oldest_retired))
(head_retired > hold_retired)
? pgno2bytes(env, (pgno_t)(head_retired - hold_retired))
: 0;
int rc = env->me_hsr_callback(
env, env->me_txn, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry);
int rc =
callback(env, env->me_txn, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry);
if (rc < 0)
/* hsr returned error and/or agree MDBX_MAP_FULL error */
break;
if (rc > 0) {
if (rc == 1) {
safe64_reset_compare(&asleep->mr_txnid, laggard);
/* hsr reported transaction (will be) aborted asynchronous */
safe64_reset_compare(&stucked->mr_txnid, laggard);
} else {
safe64_reset(&asleep->mr_txnid, true);
atomic_store64(&asleep->mr_tid, 0, mo_Relaxed);
atomic_store32(&asleep->mr_pid, 0, mo_Relaxed);
/* hsr reported reader process was killed and slot should be cleared */
safe64_reset(&stucked->mr_txnid, true);
atomic_store64(&stucked->mr_tid, 0, mo_Relaxed);
atomic_store32(&stucked->mr_pid, 0, mo_AcquireRelease);
}
atomic_store32(&lck->mti_readers_refresh_flag, true, mo_Relaxed);
}
}
} else
notify_eof_of_loop = true;
if (retry && env->me_hsr_callback) {
/* LY: notify end of hsr-loop */
env->me_hsr_callback(env, env->me_txn, 0, 0, laggard, 0, 0, -retry);
} while (++retry < INT_MAX);
if (notify_eof_of_loop) {
/* notify end of hsr-loop */
const txnid_t turn = oldest - laggard;
if (turn)
mdbx_notice("hsr-kick: done turn %" PRIaTXN " -> %" PRIaTXN " +%" PRIaTXN,
laggard, oldest, turn);
callback(env, env->me_txn, 0, 0, laggard,
(turn < UINT_MAX) ? (unsigned)turn : UINT_MAX, 0, -retry);
}
return find_oldest_reader(env);
return oldest;
}
#ifndef LIBMDBX_NO_EXPORTS_LEGACY_API