mdbx: handling the case when all dirty pages are unspillable.

Related to https://github.com/erthink/libmdbx/issues/186

Change-Id: Iff7a123c1c171371624d8b7ae497941ec4622385
This commit is contained in:
Leonid Yuriev 2021-04-28 19:51:58 +03:00
parent aa5a39160a
commit c3b2c1fdef

View File

@ -4828,59 +4828,6 @@ static __inline int mdbx_page_retire(MDBX_cursor *mc, MDBX_page *mp) {
return mdbx_page_retire_ex(mc, mp->mp_pgno, mp, PAGETYPE(mp)); return mdbx_page_retire_ex(mc, mp->mp_pgno, mp, PAGETYPE(mp));
} }
/* Set P_KEEP in dirty, non-overflow, non-sub pages watched by txn. */
static void mdbx_cursor_keep(MDBX_txn *txn, MDBX_cursor *mc) {
if (!(mc->mc_flags & C_INITIALIZED))
return;
loop:;
const MDBX_page *mp = NULL;
for (unsigned i = 0; i < mc->mc_snum; i++) {
mp = mc->mc_pg[i];
if (IS_MODIFIABLE(txn, mp)) {
unsigned const n = mdbx_dpl_search(txn, mp->mp_pgno);
if (txn->tw.dirtylist->items[n].pgno == mp->mp_pgno)
txn->tw.dirtylist->items[n].lru = txn->tw.dirtylru;
}
}
if (!(mp && IS_LEAF(mp)))
return;
/* Proceed to mx if it is at a sub-database */
MDBX_xcursor *mx = mc->mc_xcursor;
if (!(mx && (mx->mx_cursor.mc_flags & C_INITIALIZED)))
return;
const unsigned nkeys = page_numkeys(mp);
unsigned ki = mc->mc_ki[mc->mc_top];
mdbx_cassert(mc, nkeys > 0 &&
(ki < nkeys ||
(ki == nkeys && (mx->mx_cursor.mc_flags & C_EOF))));
ki -= ki >= nkeys;
if ((node_flags(page_node(mp, ki)) & F_SUBDATA)) {
mc = &mx->mx_cursor;
goto loop;
}
}
static void mdbx_txn_keep(MDBX_txn *txn, MDBX_cursor *m0) {
if (m0)
mdbx_cursor_keep(txn, m0);
for (unsigned i = FREE_DBI; i < txn->mt_numdbs; ++i) {
const pgno_t pgno = txn->mt_dbs[i].md_root;
if ((txn->mt_dbistate[i] & DBI_DIRTY) && pgno != P_INVALID) {
unsigned const n = mdbx_dpl_search(txn, pgno);
if (likely(txn->tw.dirtylist->items[n].pgno == pgno)) {
txn->tw.dirtylist->items[n].lru = txn->tw.dirtylru;
for (MDBX_cursor *mc = txn->tw.cursors[i]; mc; mc = mc->mc_next)
if (mc != m0)
mdbx_cursor_keep(txn, mc);
}
}
}
}
struct mdbx_iov_ctx { struct mdbx_iov_ctx {
unsigned iov_items; unsigned iov_items;
size_t iov_bytes; size_t iov_bytes;
@ -5016,6 +4963,65 @@ static int spill_page(MDBX_txn *txn, struct mdbx_iov_ctx *ctx, MDBX_page *dp,
return err; return err;
} }
/* Set unspillable LRU-label for dirty pages watched by txn.
* Returns the number of pages marked as unspillable. */
static unsigned mdbx_cursor_keep(MDBX_txn *txn, MDBX_cursor *mc) {
unsigned keep = 0;
if (!(mc->mc_flags & C_INITIALIZED))
return keep;
loop:;
const MDBX_page *mp = NULL;
for (unsigned i = 0; i < mc->mc_snum; i++) {
mp = mc->mc_pg[i];
if (IS_MODIFIABLE(txn, mp)) {
unsigned const n = mdbx_dpl_search(txn, mp->mp_pgno);
if (txn->tw.dirtylist->items[n].pgno == mp->mp_pgno &&
txn->tw.dirtylist->items[n].lru != txn->tw.dirtylru) {
txn->tw.dirtylist->items[n].lru = txn->tw.dirtylru;
keep++;
}
}
}
if (!(mp && IS_LEAF(mp)))
return keep;
/* Proceed to mx if it is at a sub-database */
MDBX_xcursor *mx = mc->mc_xcursor;
if (!(mx && (mx->mx_cursor.mc_flags & C_INITIALIZED)))
return keep;
const unsigned nkeys = page_numkeys(mp);
unsigned ki = mc->mc_ki[mc->mc_top];
mdbx_cassert(mc, nkeys > 0 &&
(ki < nkeys ||
(ki == nkeys && (mx->mx_cursor.mc_flags & C_EOF))));
ki -= ki >= nkeys;
if ((node_flags(page_node(mp, ki)) & F_SUBDATA)) {
mc = &mx->mx_cursor;
goto loop;
}
return keep;
}
static unsigned mdbx_txn_keep(MDBX_txn *txn, MDBX_cursor *m0) {
unsigned keep = m0 ? mdbx_cursor_keep(txn, m0) : 0;
for (unsigned i = FREE_DBI; i < txn->mt_numdbs; ++i) {
const pgno_t pgno = txn->mt_dbs[i].md_root;
if ((txn->mt_dbistate[i] & DBI_DIRTY) && pgno != P_INVALID) {
unsigned const n = mdbx_dpl_search(txn, pgno);
if (likely(txn->tw.dirtylist->items[n].pgno == pgno)) {
txn->tw.dirtylist->items[n].lru = txn->tw.dirtylru;
for (MDBX_cursor *mc = txn->tw.cursors[i]; mc; mc = mc->mc_next)
if (mc != m0)
keep += mdbx_cursor_keep(txn, mc);
}
}
}
return keep;
}
/* Returns the spilling priority (0..255) for a dirty page: /* Returns the spilling priority (0..255) for a dirty page:
* 0 = should be spilled; * 0 = should be spilled;
* ... * ...
@ -5186,7 +5192,18 @@ static int mdbx_txn_spill(MDBX_txn *const txn, MDBX_cursor *const m0,
MDBX_dpl *const dl = mdbx_dpl_sort(txn); MDBX_dpl *const dl = mdbx_dpl_sort(txn);
/* Preserve pages which may soon be dirtied again */ /* Preserve pages which may soon be dirtied again */
mdbx_txn_keep(txn, m0); const unsigned unspillable = mdbx_txn_keep(txn, m0);
if (unspillable + txn->tw.loose_count >= dl->length) {
#if MDBX_DEBUG_SPILLING == 1 /* avoid false failure in debug mode */
if (likely(txn->tw.dirtyroom + txn->tw.loose_count >= need))
return MDBX_SUCCESS;
#endif /* MDBX_DEBUG_SPILLING */
mdbx_error("all %u dirty pages are unspillable since referenced "
"by a cursor(s), use fewer cursors or increase "
"MDBX_opt_txn_dp_limit",
unspillable);
return MDBX_TXN_FULL;
}
/* Подзадача: Вытолкнуть часть страниц на диск в соответствии с LRU, /* Подзадача: Вытолкнуть часть страниц на диск в соответствии с LRU,
* но при этом учесть важные поправки: * но при этом учесть важные поправки: