mdbx: rework API and Docs around Handle-Slow-Readers (no algorithmic changes).

Change-Id: I5b76a8400ce6f5f241f8e4a7f53d746fe39f8e1e
This commit is contained in:
Leonid Yuriev
2020-09-29 19:24:57 +03:00
parent 6294e1710a
commit c8a0951566
12 changed files with 124 additions and 90 deletions

View File

@@ -900,7 +900,7 @@ static __always_inline void safe64_reset(mdbx_safe64_t *ptr,
static __always_inline bool safe64_reset_compare(mdbx_safe64_t *ptr,
txnid_t compare) {
mdbx_compiler_barrier();
/* LY: This function is used to reset `mr_txnid` from OOM-kick in case
/* LY: This function is used to reset `mr_txnid` from hsr-handler in case
* the asynchronously cancellation of read transaction. Therefore,
* there may be a collision between the cleanup performed here and
* asynchronous termination and restarting of the read transaction
@@ -3074,7 +3074,8 @@ static __must_check_result int mdbx_page_retire(MDBX_cursor *mc, MDBX_page *mp);
static __must_check_result int mdbx_page_loose(MDBX_txn *txn, MDBX_page *mp);
static int mdbx_page_alloc(MDBX_cursor *mc, const unsigned num,
MDBX_page **const mp, int flags);
static txnid_t mdbx_oomkick(MDBX_env *env, const txnid_t laggard);
static txnid_t mdbx_kick_longlived_readers(MDBX_env *env,
const txnid_t laggard);
static int mdbx_page_new(MDBX_cursor *mc, uint32_t flags, unsigned num,
MDBX_page **mp);
@@ -5071,7 +5072,7 @@ skip_cache:
txnid_t oldest = 0, last = 0;
const unsigned wanna_range = num - 1;
while (true) { /* oom-kick retry loop */
while (true) { /* hsr-kick retry loop */
/* If our dirty list is already full, we can't do anything */
if (unlikely(txn->tw.dirtyroom == 0)) {
rc = MDBX_TXN_FULL;
@@ -5374,7 +5375,7 @@ skip_cache:
/* it is reasonable check/kick lagging reader(s) here,
* since we made a new steady point or wipe the last. */
if (oldest < txn->mt_txnid - MDBX_TXNID_STEP &&
mdbx_oomkick(env, oldest) > oldest)
mdbx_kick_longlived_readers(env, oldest) > oldest)
continue;
} else if (unlikely(rc != MDBX_RESULT_TRUE))
goto fail;
@@ -5386,7 +5387,7 @@ skip_cache:
if ((flags & MDBX_ALLOC_NEW) && next <= txn->mt_end_pgno)
goto done;
if ((flags & MDBX_ALLOC_GC) && oldest < txn->mt_txnid - MDBX_TXNID_STEP &&
mdbx_oomkick(env, oldest) > oldest)
mdbx_kick_longlived_readers(env, oldest) > oldest)
continue;
rc = MDBX_NOTFOUND;
@@ -6005,7 +6006,7 @@ static bind_rslot_result bind_rslot(MDBX_env *env, const uintptr_t tid) {
if (likely(slot < env->me_maxreaders))
break;
result.err = mdbx_reader_check0(env, true, NULL);
result.err = mdbx_cleanup_dead_readers(env, true, NULL);
if (result.err != MDBX_RESULT_TRUE) {
mdbx_rdt_unlock(env);
result.err =
@@ -10613,7 +10614,7 @@ __cold int mdbx_env_open(MDBX_env *env, const char *pathname,
if (rc != MDBX_SUCCESS)
goto bailout;
} else {
rc = mdbx_reader_check0(env, false, NULL);
rc = mdbx_cleanup_dead_readers(env, false, NULL);
if (MDBX_IS_ERROR(rc))
goto bailout;
}
@@ -17612,15 +17613,16 @@ static bool __cold mdbx_pid_insert(uint32_t *ids, uint32_t pid) {
int __cold mdbx_reader_check(MDBX_env *env, int *dead) {
if (dead)
*dead = 0;
return mdbx_reader_check0(env, false, dead);
return mdbx_cleanup_dead_readers(env, false, dead);
}
/* Return:
* MDBX_RESULT_TRUE - done and mutex recovered
* MDBX_SUCCESS - done
* Otherwise errcode. */
MDBX_INTERNAL_FUNC int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked,
int *dead) {
MDBX_INTERNAL_FUNC int __cold mdbx_cleanup_dead_readers(MDBX_env *env,
int rdt_locked,
int *dead) {
int rc = check_env(env);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
@@ -17737,8 +17739,9 @@ int __cold mdbx_setup_debug(int loglevel, int flags, MDBX_debug_func *logger) {
return rc;
}
static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
mdbx_debug("%s", "DB size maxed out");
static txnid_t __cold mdbx_kick_longlived_readers(MDBX_env *env,
const txnid_t laggard) {
mdbx_debug("DB size maxed out by reading #%" PRIaTXN, laggard);
int retry;
for (retry = 0; retry < INT_MAX; ++retry) {
@@ -17746,10 +17749,10 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
mdbx_assert(env, oldest < env->me_txn0->mt_txnid);
mdbx_assert(env, oldest >= laggard);
mdbx_assert(env, oldest >= *env->me_oldest);
if (oldest == laggard || unlikely(env->me_lck == NULL /* exclusive mode */))
if (oldest == laggard || unlikely(!env->me_lck /* without-LCK mode */))
return oldest;
if (MDBX_IS_ERROR(mdbx_reader_check0(env, false, NULL)))
if (MDBX_IS_ERROR(mdbx_cleanup_dead_readers(env, false, NULL)))
break;
MDBX_reader *asleep = nullptr;
@@ -17778,20 +17781,20 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
}
if (laggard < oldest || !asleep) {
if (retry && env->me_oom_func) {
/* LY: notify end of oom-loop */
if (retry && env->me_hsr_callback) {
/* LY: notify end of hsr-loop */
const txnid_t gap = oldest - laggard;
env->me_oom_func(env, 0, 0, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, 0,
-retry);
env->me_hsr_callback(env, env->me_txn, 0, 0, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, 0,
-retry);
}
mdbx_notice("oom-kick: update oldest %" PRIaTXN " -> %" PRIaTXN,
mdbx_notice("hsr-kick: update oldest %" PRIaTXN " -> %" PRIaTXN,
*env->me_oldest, oldest);
mdbx_assert(env, *env->me_oldest <= oldest);
return *env->me_oldest = oldest;
}
if (!env->me_oom_func)
if (!env->me_hsr_callback)
break;
uint32_t pid = asleep->mr_pid;
@@ -17807,9 +17810,9 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
(oldest_retired > head_retired)
? pgno2bytes(env, (pgno_t)(oldest_retired - head_retired))
: 0;
int rc = env->me_oom_func(env, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX,
space, retry);
int rc = env->me_hsr_callback(
env, env->me_txn, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry);
if (rc < 0)
break;
@@ -17827,9 +17830,9 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
}
}
if (retry && env->me_oom_func) {
/* LY: notify end of oom-loop */
env->me_oom_func(env, 0, 0, laggard, 0, 0, -retry);
if (retry && env->me_hsr_callback) {
/* LY: notify end of hsr-loop */
env->me_hsr_callback(env, env->me_txn, 0, 0, laggard, 0, 0, -retry);
}
return mdbx_find_oldest(env->me_txn);
}
@@ -17874,18 +17877,18 @@ int __cold mdbx_env_set_syncperiod(MDBX_env *env, unsigned seconds_16dot16) {
return MDBX_SUCCESS;
}
int __cold mdbx_env_set_oomfunc(MDBX_env *env, MDBX_oom_func *oomfunc) {
int __cold mdbx_env_set_hsr(MDBX_env *env, MDBX_hsr_func *hsr) {
int rc = check_env(env);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
env->me_oom_func = oomfunc;
env->me_hsr_callback = hsr;
return MDBX_SUCCESS;
}
MDBX_oom_func *__cold mdbx_env_get_oomfunc(const MDBX_env *env) {
MDBX_hsr_func *__cold mdbx_env_get_hsr(const MDBX_env *env) {
return likely(env && env->me_signature == MDBX_ME_SIGNATURE)
? env->me_oom_func
? env->me_hsr_callback
: NULL;
}

View File

@@ -989,7 +989,7 @@ struct MDBX_env {
volatile pgno_t *me_autosync_threshold;
volatile pgno_t *me_discarded_tail;
volatile uint32_t *me_meta_sync_txnid;
MDBX_oom_func *me_oom_func; /* Callback for kicking laggard readers */
MDBX_hsr_func *me_hsr_callback; /* Callback for kicking laggard readers */
struct {
#if MDBX_LOCKING > 0
mdbx_ipclock_t wlock;
@@ -1203,8 +1203,8 @@ mdbx_flush_incoherent_mmap(void *addr, size_t nbytes, const intptr_t pagesize) {
/*----------------------------------------------------------------------------*/
/* Internal prototypes */
MDBX_INTERNAL_FUNC int mdbx_reader_check0(MDBX_env *env, int rlocked,
int *dead);
MDBX_INTERNAL_FUNC int mdbx_cleanup_dead_readers(MDBX_env *env, int rlocked,
int *dead);
MDBX_INTERNAL_FUNC int mdbx_rthc_alloc(mdbx_thread_key_t *key,
MDBX_reader *begin, MDBX_reader *end);
MDBX_INTERNAL_FUNC void mdbx_rthc_remove(const mdbx_thread_key_t key);

View File

@@ -702,7 +702,7 @@ static int __cold mdbx_ipclock_failed(MDBX_env *env, mdbx_ipclock_t *ipc,
mdbx_warning("%clock owner died, %s", (rlocked ? 'r' : 'w'),
(rc ? "this process' env is hosed" : "recovering"));
int check_rc = mdbx_reader_check0(env, rlocked, NULL);
int check_rc = mdbx_cleanup_dead_readers(env, rlocked, NULL);
check_rc = (check_rc == MDBX_SUCCESS) ? MDBX_RESULT_TRUE : check_rc;
#if MDBX_LOCKING == MDBX_LOCKING_SYSV