mdbx: add space argument for oom-callback.

Change-Id: I27634e02046df375fffae66de3124e8cd90cc61c
This commit is contained in:
Leonid Yuriev 2019-09-28 19:03:02 +03:00
parent 86496e4480
commit 87f8c01ac4
4 changed files with 103 additions and 39 deletions

66
mdbx.h
View File

@ -3082,22 +3082,59 @@ LIBMDBX_API int mdbx_reader_check(MDBX_env *env, int *dead);
* read, or negative value on failure. */
LIBMDBX_API int mdbx_txn_straggler(MDBX_txn *txn, int *percent);
/* A callback function for killing a laggard readers,
* but also could waiting ones. Called in case of MDBX_MAP_FULL error.
/* A callback function to resolve issues with a laggard readers.
*
* Read transactions prevent reuse of pages freed by newer write transactions,
* thus the database can grow quickly. This callback will be called when there
* is not enough space in the database (ie. before increasing the database size
* or before MDBX_MAP_FULL error) and thus can be used to resolve issues with
* a "long-lived" read transactions.
*
* Depending on the arguments and needs, your implementation may wait, terminate
* a process or thread that is performing a long read, or perform some other
* action. In doing so it is important that the returned code always corresponds
* to the performed action.
*
* [in] env An environment handle returned by mdbx_env_create().
* [in] pid pid of the reader process.
* [in] tid thread_id of the reader thread.
* [in] txn Transaction number on which stalled.
* [in] pid A pid of the reader process.
* [in] tid A thread_id of the reader thread.
* [in] txn A transaction number on which stalled.
* [in] gap A lag from the last commited txn.
* [in] retry A retry number, less that zero for notify end of OOM-loop.
* [in] space A space that actually become available for reuse after this
* reader finished. The callback function can take this value into
* account to evaluate the impact that a long-running transaction
* has.
* [in] retry A retry number starting from 0. if callback has returned 0
* at least once, then at end of current OOM-handler loop callback
* will be called additionally with negative value to notify about
* the end of loop. The callback function can use this value to
* implement timeout logic while waiting for readers.
*
* Returns -1 on failure (reader is not killed),
* 0 should wait or retry,
* 1 drop reader txn-lock (reading-txn was aborted),
* >1 drop reader registration (reader process was killed). */
* The RETURN CODE determines the further actions libmdbx and must match the
* action which was executed by the callback:
*
* -2 or less = An error condition and the reader was not killed.
*
* -1 = The callback was unable to solve the problem and agreed
* on MDBX_MAP_FULL error, libmdbx should increase the
* database size or return MDBX_MAP_FULL error.
*
* 0 (zero) = The callback solved the problem or just waited for
* a while, libmdbx should rescan the reader lock table and
* retry. This also includes a situation when corresponding
* transaction terminated in normal way by mdbx_txn_abort()
* or mdbx_txn_reset(), and my be restarted. I.e. reader
* slot don't needed to be cleaned from transaction.
*
* 1 = Transaction aborted asynchronous and reader slot should
* be cleared immediately, i.e. read transaction will not
* continue but mdbx_txn_abort() or mdbx_txn_reset() will
* be called later.
*
* 2 or great = The reader process was terminated or killed, and libmdbx
* should entirely reset reader registration. */
typedef int(MDBX_oom_func)(MDBX_env *env, mdbx_pid_t pid, mdbx_tid_t tid,
uint64_t txn, unsigned gap, int retry);
uint64_t txn, unsigned gap, size_t space, int retry);
/* Set the OOM callback.
*
@ -3105,16 +3142,13 @@ typedef int(MDBX_oom_func)(MDBX_env *env, mdbx_pid_t pid, mdbx_tid_t tid,
* lagging reader(s) (i.e. to kill it) for resume reuse pages from the garbage
* collector.
*
* [in] env An environment handle returned by mdbx_env_create().
* [in] oomfunc A MDBX_oom_func function or NULL to disable.
* [in] env An environment handle returned by mdbx_env_create().
* [in] oom_func A MDBX_oom_func function or NULL to disable.
*
* Returns A non-zero error value on failure and 0 on success. */
LIBMDBX_API int mdbx_env_set_oomfunc(MDBX_env *env, MDBX_oom_func *oom_func);
/* Get the current oom_func callback.
*
* Callback will be called only on out-of-pages case for killing
* a laggard readers to allowing reclaiming of freeDB.
*
* [in] env An environment handle returned by mdbx_env_create().
*

View File

@ -305,6 +305,15 @@ static __inline void safe64_reset(mdbx_safe64_t *ptr) {
mdbx_jitter4testing(true);
}
static __inline bool safe64_reset_compare(mdbx_safe64_t *ptr, txnid_t compare) {
mdbx_compiler_barrier();
bool rc =
mdbx_atomic_compare_and_swap64(&ptr->inconsistent, compare, UINT64_MAX);
mdbx_flush_noncoherent_cpu_writeback();
mdbx_jitter4testing(true);
return rc;
}
static __inline void safe64_write(mdbx_safe64_t *ptr, const uint64_t v) {
mdbx_compiler_barrier();
assert(ptr->inconsistent >= SAFE64_INVALID_THRESHOLD);
@ -13843,15 +13852,27 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
if (MDBX_IS_ERROR(mdbx_reader_check0(env, false, NULL)))
break;
MDBX_reader *const rlt = env->me_lck->mti_readers;
MDBX_reader *asleep = nullptr;
for (int i = env->me_lck->mti_numreaders; --i >= 0;) {
if (rlt[i].mr_pid) {
mdbx_jitter4testing(true);
const txnid_t snap = safe64_read(&rlt[i].mr_txnid);
if (oldest > snap && laggard <= /* ignore pending updates */ snap) {
oldest = snap;
asleep = &rlt[i];
MDBX_lockinfo *const lck = env->me_lck;
uint64_t oldest_retired = UINT64_MAX;
const unsigned snap_nreaders = lck->mti_numreaders;
for (unsigned i = 0; i < snap_nreaders; ++i) {
retry:
if (lck->mti_readers[i].mr_pid) {
/* mdbx_jitter4testing(true); */
const uint64_t snap_retired =
lck->mti_readers[i].mr_snapshot_pages_retired;
const txnid_t snap_txnid = safe64_read(&lck->mti_readers[i].mr_txnid);
mdbx_memory_barrier();
if (unlikely(snap_retired !=
lck->mti_readers[i].mr_snapshot_pages_retired ||
snap_txnid != safe64_read(&lck->mti_readers[i].mr_txnid)))
goto retry;
if (oldest > snap_txnid &&
laggard <= /* ignore pending updates */ snap_txnid) {
oldest = snap_txnid;
oldest_retired = snap_retired;
asleep = &lck->mti_readers[i];
}
}
}
@ -13861,7 +13882,8 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
/* LY: notify end of oom-loop */
const txnid_t gap = oldest - laggard;
env->me_oom_func(env, 0, 0, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, -retry);
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, 0,
-retry);
}
mdbx_notice("oom-kick: update oldest %" PRIaTXN " -> %" PRIaTXN,
*env->me_oldest, oldest);
@ -13877,28 +13899,35 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
if (safe64_read(&asleep->mr_txnid) != laggard || pid <= 0)
continue;
const txnid_t gap =
mdbx_meta_txnid_stable(env, mdbx_meta_head(env)) - laggard;
int rc =
env->me_oom_func(env, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, retry);
const MDBX_meta *head_meta = mdbx_meta_head(env);
const txnid_t gap = mdbx_meta_txnid_stable(env, head_meta) - laggard;
const uint64_t head_retired = head_meta->mm_pages_retired;
const size_t space =
(oldest_retired > head_retired)
? pgno2bytes(env, (pgno_t)(oldest_retired - head_retired))
: 0;
int rc = env->me_oom_func(env, pid, (mdbx_tid_t)tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX,
space, retry);
if (rc < 0)
break;
if (rc) {
safe64_reset(&asleep->mr_txnid);
if (rc > 1) {
if (rc > 0) {
if (rc == 1) {
safe64_reset_compare(&asleep->mr_txnid, laggard);
} else {
safe64_reset(&asleep->mr_txnid);
asleep->mr_tid = 0;
asleep->mr_pid = 0;
}
env->me_lck->mti_readers_refresh_flag = true;
lck->mti_readers_refresh_flag = true;
mdbx_flush_noncoherent_cpu_writeback();
}
}
if (retry && env->me_oom_func) {
/* LY: notify end of oom-loop */
env->me_oom_func(env, 0, 0, laggard, 0, -retry);
env->me_oom_func(env, 0, 0, laggard, 0, 0, -retry);
}
return mdbx_find_oldest(env->me_txn);
}

View File

@ -77,14 +77,15 @@ const char *keygencase2str(const keygen_case keycase) {
//-----------------------------------------------------------------------------
int testcase::oom_callback(MDBX_env *env, mdbx_pid_t pid, mdbx_tid_t tid,
uint64_t txn, unsigned gap, int retry) {
uint64_t txn, unsigned gap, size_t space,
int retry) {
testcase *self = (testcase *)mdbx_env_get_userctx(env);
if (retry == 0)
log_notice("oom_callback: waitfor pid %u, thread %" PRIuPTR
", txn #%" PRIu64 ", gap %d",
pid, (size_t)tid, txn, gap);
", txn #%" PRIu64 ", gap %d, scape %zu",
pid, (size_t)tid, txn, gap, space);
if (self->should_continue(true)) {
osal_yield();

View File

@ -99,7 +99,7 @@ protected:
} last;
static int oom_callback(MDBX_env *env, mdbx_pid_t pid, mdbx_tid_t tid,
uint64_t txn, unsigned gap, int retry);
uint64_t txn, unsigned gap, size_t space, int retry);
void db_prepare();
void db_open();