From 9aa2aae93e07a815909f68ac5c86c7c9a69b0a72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9B=D0=B5=D0=BE=D0=BD=D0=B8=D0=B4=20=D0=AE=D1=80=D1=8C?= =?UTF-8?q?=D0=B5=D0=B2=20=28Leonid=20Yuriev=29?= Date: Fri, 8 Jul 2022 23:11:16 +0300 Subject: [PATCH] mdbx: rework/simplify `kick_longlived_readers()`. --- mdbx.h | 11 +++-- src/core.c | 135 ++++++++++++++++++++++++----------------------------- 2 files changed, 68 insertions(+), 78 deletions(-) diff --git a/mdbx.h b/mdbx.h index 9af7e74a..edc9fa02 100644 --- a/mdbx.h +++ b/mdbx.h @@ -5070,11 +5070,12 @@ LIBMDBX_API int mdbx_thread_unregister(const MDBX_env *env); * this value into account to evaluate the impact that * a long-running transaction has. * \param [in] retry A retry number starting from 0. - * If callback has returned 0 at least once, then at end - * of current handling loop the callback function will be - * called additionally with negative value to notify about - * the end of loop. The callback function can use this value - * to implement timeout logic while waiting for readers. + * If callback has returned 0 at least once, then at end of + * current handling loop the callback function will be + * called additionally with negative `retry` value to notify + * about the end of loop. The callback function can use this + * fact to implement timeout reset logic while waiting for + * a readers. * * \returns The RETURN CODE determines the further actions libmdbx and must * match the action which was executed by the callback: diff --git a/src/core.c b/src/core.c index 1632c78c..fa5102ab 100644 --- a/src/core.c +++ b/src/core.c @@ -3839,8 +3839,7 @@ typedef struct page_result { int err; } pgr_t; -static txnid_t mdbx_kick_longlived_readers(MDBX_env *env, - const txnid_t laggard); +static txnid_t kick_longlived_readers(MDBX_env *env, const txnid_t laggard); static pgr_t page_new(MDBX_cursor *mc, const unsigned flags); static pgr_t page_new_large(MDBX_cursor *mc, const unsigned npages); @@ -6966,9 +6965,8 @@ __cold static pgr_t page_alloc_slowpath(MDBX_cursor *mc, const pgno_t num, if (flags & MDBX_ALLOC_GC) { const txnid_t laggard = find_oldest_reader(env); - if (laggard >= detent || - (laggard < txn->mt_txnid - xMDBX_TXNID_STEP && - mdbx_kick_longlived_readers(env, laggard) >= detent)) + if (laggard >= detent || (laggard < txn->mt_txnid - xMDBX_TXNID_STEP && + kick_longlived_readers(env, laggard) >= detent)) continue; } @@ -21648,69 +21646,51 @@ __cold int mdbx_setup_debug(int loglevel, int flags, MDBX_debug_func *logger) { return rc; } -__cold static txnid_t mdbx_kick_longlived_readers(MDBX_env *env, - const txnid_t laggard) { +__cold static txnid_t kick_longlived_readers(MDBX_env *env, + const txnid_t laggard) { mdbx_debug("DB size maxed out by reading #%" PRIaTXN, laggard); - - int retry; - for (retry = 0; retry < INT_MAX; ++retry) { - txnid_t oldest = constmeta_txnid(env, constmeta_prefer_steady(env)); + MDBX_hsr_func *const callback = env->me_hsr_callback; + txnid_t oldest = 0; + bool notify_eof_of_loop = false; + int retry = 0; + do { + env->me_lck->mti_readers_refresh_flag.weak = /* force refresh */ true; + oldest = find_oldest_reader(env); mdbx_assert(env, oldest < env->me_txn0->mt_txnid); mdbx_assert(env, oldest >= laggard); mdbx_assert(env, oldest >= env->me_lck->mti_oldest_reader.weak); + + const txnid_t steady = meta_txnid(env, meta_prefer_steady(env)); MDBX_lockinfo *const lck = env->me_lck_mmap.lck; - if (oldest == laggard || unlikely(!lck /* without-LCK mode */)) - return oldest; + if (oldest == steady || oldest > laggard || /* without-LCK mode */ !lck) + break; if (MDBX_IS_ERROR(mdbx_cleanup_dead_readers(env, false, NULL))) break; - MDBX_reader *asleep = nullptr; - uint64_t oldest_retired = UINT64_MAX; - const unsigned snap_nreaders = - atomic_load32(&lck->mti_numreaders, mo_AcquireRelease); - for (unsigned i = 0; i < snap_nreaders; ++i) { - retry: - if (atomic_load32(&lck->mti_readers[i].mr_pid, mo_AcquireRelease)) { - /* mdbx_jitter4testing(true); */ - const uint64_t snap_retired = atomic_load64( - &lck->mti_readers[i].mr_snapshot_pages_retired, mo_Relaxed); - const txnid_t snap_txnid = safe64_read(&lck->mti_readers[i].mr_txnid); - if (unlikely(snap_retired != - atomic_load64( - &lck->mti_readers[i].mr_snapshot_pages_retired, - mo_AcquireRelease) || - snap_txnid != safe64_read(&lck->mti_readers[i].mr_txnid))) - goto retry; - if (oldest > snap_txnid && - laggard <= /* ignore pending updates */ snap_txnid) { - oldest = snap_txnid; - oldest_retired = snap_retired; - asleep = &lck->mti_readers[i]; - } - } - } - - if (laggard < oldest || !asleep) { - if (retry && env->me_hsr_callback) { - /* LY: notify end of hsr-loop */ - const txnid_t gap = oldest - laggard; - env->me_hsr_callback(env, env->me_txn, 0, 0, laggard, - (gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, 0, - -retry); - } - mdbx_notice("hsr-kick: update oldest %" PRIaTXN " -> %" PRIaTXN, - lck->mti_oldest_reader.weak, oldest); - mdbx_assert(env, lck->mti_oldest_reader.weak <= oldest); - return atomic_store64(&lck->mti_oldest_reader, oldest, mo_Relaxed); - } - - if (!env->me_hsr_callback) + if (!callback) break; - uint32_t pid = atomic_load32(&asleep->mr_pid, mo_AcquireRelease); - uint64_t tid = asleep->mr_tid.weak; - if (safe64_read(&asleep->mr_txnid) != laggard || pid <= 0) + MDBX_reader *stucked = nullptr; + uint64_t hold_retired = 0; + for (unsigned i = 0; i < lck->mti_numreaders.weak; ++i) { + const uint64_t snap_retired = atomic_load64( + &lck->mti_readers[i].mr_snapshot_pages_retired, mo_Relaxed); + const txnid_t rtxn = safe64_read(&lck->mti_readers[i].mr_txnid); + if (rtxn == laggard && + atomic_load32(&lck->mti_readers[i].mr_pid, mo_AcquireRelease)) { + hold_retired = snap_retired; + stucked = &lck->mti_readers[i]; + } + } + + if (!stucked) + break; + + uint32_t pid = atomic_load32(&stucked->mr_pid, mo_AcquireRelease); + uint64_t tid = atomic_load64(&stucked->mr_tid, mo_AcquireRelease); + if (safe64_read(&stucked->mr_txnid) != laggard || !pid || + stucked->mr_snapshot_pages_retired.weak != hold_retired) continue; const MDBX_meta *head_meta = constmeta_prefer_last(env); @@ -21719,32 +21699,41 @@ __cold static txnid_t mdbx_kick_longlived_readers(MDBX_env *env, const uint64_t head_retired = unaligned_peek_u64(4, head_meta->mm_pages_retired); const size_t space = - (head_retired > oldest_retired) - ? pgno2bytes(env, (pgno_t)(head_retired - oldest_retired)) + (head_retired > hold_retired) + ? pgno2bytes(env, (pgno_t)(head_retired - hold_retired)) : 0; - int rc = env->me_hsr_callback( - env, env->me_txn, pid, (mdbx_tid_t)tid, laggard, - (gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry); + int rc = + callback(env, env->me_txn, pid, (mdbx_tid_t)tid, laggard, + (gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry); if (rc < 0) + /* hsr returned error and/or agree MDBX_MAP_FULL error */ break; if (rc > 0) { if (rc == 1) { - safe64_reset_compare(&asleep->mr_txnid, laggard); + /* hsr reported transaction (will be) aborted asynchronous */ + safe64_reset_compare(&stucked->mr_txnid, laggard); } else { - safe64_reset(&asleep->mr_txnid, true); - atomic_store64(&asleep->mr_tid, 0, mo_Relaxed); - atomic_store32(&asleep->mr_pid, 0, mo_Relaxed); + /* hsr reported reader process was killed and slot should be cleared */ + safe64_reset(&stucked->mr_txnid, true); + atomic_store64(&stucked->mr_tid, 0, mo_Relaxed); + atomic_store32(&stucked->mr_pid, 0, mo_AcquireRelease); } - atomic_store32(&lck->mti_readers_refresh_flag, true, mo_Relaxed); - } - } + } else + notify_eof_of_loop = true; - if (retry && env->me_hsr_callback) { - /* LY: notify end of hsr-loop */ - env->me_hsr_callback(env, env->me_txn, 0, 0, laggard, 0, 0, -retry); + } while (++retry < INT_MAX); + + if (notify_eof_of_loop) { + /* notify end of hsr-loop */ + const txnid_t turn = oldest - laggard; + if (turn) + mdbx_notice("hsr-kick: done turn %" PRIaTXN " -> %" PRIaTXN " +%" PRIaTXN, + laggard, oldest, turn); + callback(env, env->me_txn, 0, 0, laggard, + (turn < UINT_MAX) ? (unsigned)turn : UINT_MAX, 0, -retry); } - return find_oldest_reader(env); + return oldest; } #ifndef LIBMDBX_NO_EXPORTS_LEGACY_API