lmdb: ITS#7974 oom-handler feature.

Change-Id: I2c56e003fa1e4abe934288581e4c52c80db27c08
This commit is contained in:
Leo Yuriev 2014-12-29 23:57:26 +03:00
parent 84ce8b4cd8
commit 9eedc88441
2 changed files with 129 additions and 4 deletions

33
lmdb.h
View File

@ -1585,6 +1585,39 @@ int mdb_reader_check(MDB_env *env, int *dead);
*/
int mdb_txn_straggler(MDB_txn *txnm, int *percent);
/** @brief A callback function for killing a laggard readers,
* called in case of MDB_MAP_FULL error.
*
* @param[in] env An environment handle returned by #mdb_env_create().
* @param[in] pid pid of the reader process.
* @param[in] thread_id thread_id of the reader thread.
* @param[in] txn Transaction number on which stalled.
* @return -1 on failure (reader is not killed),
* 0 on a race condition (no such reader),
* 1 on success (reader was killed),
* >1 on success (reader was SURE killed).
*/
typedef int (MDB_oom_func)(MDB_env *env, int pid, void* thread_id, size_t txn, unsigned gap, int retry);
/** @brief Set the OOM callback.
*
* Callback will be called only on out-of-pages case for killing
* a laggard readers to allowing reclaiming of freeDB.
*
* @param[in] env An environment handle returned by #mdb_env_create().
* @param[in] oomfunc A #MDB_oom_func function or NULL to disable.
*/
void mdb_env_set_oomfunc(MDB_env *env, MDB_oom_func *oom_func);
/** @brief Get the current oom_func callback.
*
* Callback will be called only on out-of-pages case for killing
* a laggard readers to allowing reclaiming of freeDB.
*
* @param[in] env An environment handle returned by #mdb_env_create().
* @return A #MDB_oom_func function or NULL if disabled.
*/
MDB_oom_func* mdb_env_get_oomfunc(MDB_env *env);
/** @} */
#ifdef __cplusplus

94
mdb.c
View File

@ -1191,6 +1191,7 @@ struct MDB_env {
uint64_t me_sync_pending; /**< Total dirty/commited bytes since the last mdb_env_sync() */
uint64_t me_sync_threshold; /**< Treshold of above to force synchronous flush */
size_t me_sync_size; /**< Tracking me_size for FGREW/fsync() */
MDB_oom_func *me_oom_func; /**< Callback for kicking laggard readers */
};
/** Nested transaction */
@ -1947,6 +1948,80 @@ mdb_find_oldest(MDB_txn *txn)
return oldest;
}
static txnid_t
mdb_laggard_reader(MDB_env *env, int *laggard)
{
txnid_t tail = 0;
if (laggard)
*laggard = -1;
if (env->me_txns->mti_txnid > 1) {
int i;
MDB_reader *r = env->me_txns->mti_readers;
tail = env->me_txns->mti_txnid - 1;
for (i = env->me_txns->mti_numreaders; --i >= 0; ) {
if (r[i].mr_pid) {
txnid_t mr = r[i].mr_txnid;
if (tail > mr) {
tail = mr;
if (laggard)
*laggard = i;
}
}
}
}
return tail;
}
static int
mdb_oomkick_laggard(MDB_env *env)
{
int idx, retry;
txnid_t snap, tail = mdb_laggard_reader(env, &idx);
if (idx < 0)
return 0;
for(retry = 0; ; ++retry) {
MDB_reader *r;
MDB_THR_T tid;
pid_t pid;
int rc;
if (mdb_reader_check(env, NULL))
break;
snap = mdb_laggard_reader(env, NULL);
if (tail < snap)
return 1;
if (!env->me_oom_func)
break;
r = &env->me_txns->mti_readers[ idx ];
pid = r->mr_pid;
tid = r->mr_tid;
if (r->mr_txnid != tail || pid <= 0)
continue;
rc = env->me_oom_func(env, pid, (void*) tid, tail,
env->me_metas[ mdb_env_pick_meta(env) ]->mm_txnid - tail, retry);
if (rc < 0)
break;
if (rc) {
r->mr_txnid = (txnid_t)-1;
if (rc > 1) {
r->mr_tid = 0;
r->mr_pid = 0;
}
}
}
snap = mdb_laggard_reader(env, NULL);
return tail < snap;
}
/** Add a page to the txn's dirty list */
static void
mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
@ -2025,6 +2100,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
goto fail;
}
oomkick_retry:;
for (op = MDB_FIRST;; op = MDB_NEXT) {
MDB_val key, data;
MDB_node *leaf;
@ -2121,6 +2197,8 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
pgno = txn->mt_next_pgno;
if (pgno + num > env->me_maxpg) {
DPUTS("DB size maxed out");
if (mdb_oomkick_laggard(env))
goto oomkick_retry;
rc = MDB_MAP_FULL;
goto fail;
}
@ -2911,7 +2989,7 @@ mdb_txn_straggler(MDB_txn *txn, int *percent)
*percent = (meta->mm_last_pg + cent / 2 + 1) / (cent ? cent : 1);
}
lag = meta->mm_txnid - txn->mt_u.reader->mr_txnid;
return (0 > (int) lag) ? ~0u >> 1: lag;
return (0 > (long) lag) ? ~0u >> 1: lag;
}
/** Common code for #mdb_txn_reset() and #mdb_txn_abort().
@ -9719,4 +9797,18 @@ static int mdb_mutex_failed(MDB_env *env, mdb_mutex_t *mutex, int rc)
return rc;
}
#endif /* MDB_ROBUST_SUPPORTED */
void
mdb_env_set_oomfunc(MDB_env *env, MDB_oom_func *oomfunc)
{
if (env)
env->me_oom_func = oomfunc;
}
MDB_oom_func*
mdb_env_get_oomfunc(MDB_env *env)
{
return env ? env->me_oom_func : NULL;
}
/** @} */