mdbx: minor refactoring: use page_result.

Change-Id: I7749d9463832ce9b270d06f04f43e413d5ba26b7
This commit is contained in:
Leonid Yuriev 2021-04-17 22:33:02 +03:00
parent e4db019f47
commit b3aba4691b
2 changed files with 113 additions and 112 deletions

View File

@ -1093,6 +1093,7 @@ notracking
NOWAIT
npages
npos
npr
nptl
nreaders
nrepeat
@ -1211,6 +1212,7 @@ PGL
pglist
pgno
pgnumber
pgr
pgsize
pgstate
pgvisitor

View File

@ -3610,13 +3610,19 @@ uint8_t mdbx_loglevel = MDBX_LOG_FATAL;
MDBX_debug_func *mdbx_debug_logger;
static __must_check_result int mdbx_page_retire(MDBX_cursor *mc, MDBX_page *mp);
struct page_result {
MDBX_page *page;
int err;
};
static int mdbx_page_alloc(MDBX_cursor *mc, const unsigned num,
MDBX_page **const mp, int flags);
static txnid_t mdbx_kick_longlived_readers(MDBX_env *env,
const txnid_t laggard);
static int mdbx_page_new(MDBX_cursor *mc, uint32_t flags, unsigned num,
MDBX_page **mp);
static struct page_result mdbx_page_new(MDBX_cursor *mc, const unsigned flags,
const unsigned num);
static int mdbx_page_touch(MDBX_cursor *mc);
static int mdbx_cursor_touch(MDBX_cursor *mc);
static int mdbx_touch_dbi(MDBX_cursor *mc);
@ -3643,20 +3649,15 @@ enum {
#define MDBX_END_SLOT 0x80 /* release any reader slot if MDBX_NOTLS */
static int mdbx_txn_end(MDBX_txn *txn, const unsigned mode);
struct page_get_result {
int err;
MDBX_page *mp;
};
__hot static struct page_get_result __must_check_result
__hot static struct page_result __must_check_result
mdbx_page_get_ex(MDBX_cursor *const mc, const pgno_t pgno, txnid_t front);
static __inline int __must_check_result mdbx_page_get(MDBX_cursor *mc,
pgno_t pgno,
MDBX_page **mp,
txnid_t front) {
struct page_get_result ret = mdbx_page_get_ex(mc, pgno, front);
*mp = ret.mp;
struct page_result ret = mdbx_page_get_ex(mc, pgno, front);
*mp = ret.page;
return ret.err;
}
@ -6374,19 +6375,14 @@ __hot static void mdbx_page_copy(MDBX_page *dst, MDBX_page *src, size_t psize) {
/* Pull a page off the txn's spill list, if present.
*
* If a page being referenced was spilled to disk in this txn, bring
* it back and make it dirty/writable again.
*
* [in] txn the transaction handle.
* [in] mp the page being referenced. It must not be dirty.
* [out] ret the writable page, if any.
* ret is unchanged if mp wasn't spilled. */
static int __must_check_result mdbx_page_unspill(MDBX_txn *const txn,
MDBX_page *mp,
MDBX_page **ret) {
* it back and make it dirty/writable again. */
static struct page_result __must_check_result
mdbx_page_unspill(MDBX_txn *const txn, MDBX_page *mp) {
mdbx_tassert(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0);
mdbx_tassert(txn, IS_SPILLED(txn, mp));
const pgno_t spilled_pgno = mp->mp_pgno << 1;
const MDBX_txn *scan = txn;
struct page_result ret;
do {
mdbx_tassert(txn, (scan->mt_flags & MDBX_TXN_SPILLS) != 0);
if (!scan->tw.spill_pages)
@ -6395,10 +6391,12 @@ static int __must_check_result mdbx_page_unspill(MDBX_txn *const txn,
if (!si)
continue;
const unsigned npages = IS_OVERFLOW(mp) ? mp->mp_pages : 1;
MDBX_page *np = mdbx_page_malloc(txn, npages);
if (unlikely(!np))
return MDBX_ENOMEM;
mdbx_page_copy(np, mp, pgno2bytes(txn->mt_env, npages));
ret.page = mdbx_page_malloc(txn, npages);
if (unlikely(!ret.page)) {
ret.err = MDBX_ENOMEM;
return ret;
}
mdbx_page_copy(ret.page, mp, pgno2bytes(txn->mt_env, npages));
mdbx_debug("unspill page %" PRIaPGNO, mp->mp_pgno);
if (scan == txn) {
/* If in current txn, this page is no longer spilled.
@ -6408,17 +6406,18 @@ static int __must_check_result mdbx_page_unspill(MDBX_txn *const txn,
} /* otherwise, if belonging to a parent txn, the
* page remains spilled until child commits */
int rc = mdbx_page_dirty(txn, np);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_dpage_free(txn->mt_env, np, npages);
return rc;
ret.err = mdbx_page_dirty(txn, ret.page);
if (unlikely(ret.err != MDBX_SUCCESS)) {
mdbx_dpage_free(txn->mt_env, ret.page, npages);
return ret;
}
np->mp_flags |= (scan == txn) ? 0 : P_SPILLED;
*ret = np;
return MDBX_SUCCESS;
ret.page->mp_flags |= (scan == txn) ? 0 : P_SPILLED;
ret.err = MDBX_SUCCESS;
return ret;
} while ((scan = scan->mt_parent) != nullptr &&
(scan->mt_flags & MDBX_TXN_SPILLS) != 0);
return MDBX_PROBLEM;
ret.err = MDBX_PROBLEM;
return ret;
}
/* Touch a page: make it dirty and re-insert into tree with updated pgno.
@ -6499,8 +6498,9 @@ __hot static int mdbx_page_touch(MDBX_cursor *mc) {
mdbx_tassert(txn, mdbx_dirtylist_check(txn));
} else {
np = nullptr;
rc = mdbx_page_unspill(txn, mp, &np);
struct page_result pur = mdbx_page_unspill(txn, mp);
np = pur.page;
rc = pur.err;
if (likely(rc == MDBX_SUCCESS)) {
mdbx_tassert(txn, np != nullptr);
goto done;
@ -12644,19 +12644,19 @@ static __inline int mdbx_cursor_push(MDBX_cursor *mc, MDBX_page *mp) {
return MDBX_SUCCESS;
}
__hot static struct page_get_result
__hot static struct page_result
mdbx_page_get_ex(MDBX_cursor *const mc, const pgno_t pgno,
/* TODO: use parent-page ptr */ txnid_t front) {
struct page_get_result r;
struct page_result ret;
MDBX_txn *const txn = mc->mc_txn;
mdbx_tassert(txn, front <= txn->mt_front);
if (unlikely(pgno >= txn->mt_next_pgno)) {
mdbx_error("page #%" PRIaPGNO " beyond next-pgno", pgno);
r.mp = nullptr;
ret.page = nullptr;
corrupted:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
r.err = MDBX_PAGE_NOTFOUND;
return r;
ret.err = MDBX_PAGE_NOTFOUND;
return ret;
}
MDBX_env *const env = txn->mt_env;
@ -12673,56 +12673,56 @@ mdbx_page_get_ex(MDBX_cursor *const mc, const pgno_t pgno,
mdbx_pnl_exist(spiller->tw.spill_pages, pgno << 1)) {
goto spilled;
}
r.mp = mdbx_dpl_find(spiller->tw.dirtylist, pgno);
if (r.mp)
ret.page = mdbx_dpl_find(spiller->tw.dirtylist, pgno);
if (ret.page)
goto dirty;
spiller = spiller->mt_parent;
} while (spiller != NULL);
}
spilled:
r.mp = pgno2page(env, pgno);
ret.page = pgno2page(env, pgno);
dirty:
if (unlikely(r.mp->mp_pgno != pgno)) {
bad_page(r.mp,
if (unlikely(ret.page->mp_pgno != pgno)) {
bad_page(ret.page,
"mismatch actual pgno (%" PRIaPGNO ") != expected (%" PRIaPGNO
")\n",
r.mp->mp_pgno, pgno);
ret.page->mp_pgno, pgno);
goto corrupted;
}
#if !MDBX_DISABLE_PAGECHECKS
if (unlikely(r.mp->mp_flags & P_ILL_BITS)) {
bad_page(r.mp, "invalid page's flags (%u)\n", r.mp->mp_flags);
if (unlikely(ret.page->mp_flags & P_ILL_BITS)) {
bad_page(ret.page, "invalid page's flags (%u)\n", ret.page->mp_flags);
goto corrupted;
}
if (unlikely(r.mp->mp_txnid > front) &&
(r.mp->mp_txnid > txn->mt_front || front < txn->mt_txnid)) {
bad_page(r.mp,
if (unlikely(ret.page->mp_txnid > front) &&
(ret.page->mp_txnid > txn->mt_front || front < txn->mt_txnid)) {
bad_page(ret.page,
"invalid page txnid (%" PRIaTXN ") for %s' txnid (%" PRIaTXN ")\n",
r.mp->mp_txnid,
ret.page->mp_txnid,
(front == txn->mt_front && front != txn->mt_txnid) ? "front-txn"
: "parent-page",
front);
goto corrupted;
}
if (unlikely((r.mp->mp_upper < r.mp->mp_lower ||
((r.mp->mp_lower | r.mp->mp_upper) & 1) ||
PAGEHDRSZ + r.mp->mp_upper > env->me_psize) &&
!IS_OVERFLOW(r.mp))) {
bad_page(r.mp, "invalid page lower(%u)/upper(%u) with limit (%u)\n",
r.mp->mp_lower, r.mp->mp_upper, page_space(env));
if (unlikely((ret.page->mp_upper < ret.page->mp_lower ||
((ret.page->mp_lower | ret.page->mp_upper) & 1) ||
PAGEHDRSZ + ret.page->mp_upper > env->me_psize) &&
!IS_OVERFLOW(ret.page))) {
bad_page(ret.page, "invalid page lower(%u)/upper(%u) with limit (%u)\n",
ret.page->mp_lower, ret.page->mp_upper, page_space(env));
goto corrupted;
}
#endif /* !MDBX_DISABLE_PAGECHECKS */
r.err = MDBX_SUCCESS;
ret.err = MDBX_SUCCESS;
if (mdbx_audit_enabled())
r.err = mdbx_page_check(mc, r.mp, C_UPDATING);
return r;
ret.err = mdbx_page_check(mc, ret.page, C_UPDATING);
return ret;
}
/* Finish mdbx_page_search() / mdbx_page_search_lowest().
@ -14229,7 +14229,6 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
}
if (unlikely(rc == MDBX_NO_ROOT)) {
MDBX_page *np;
/* new database, write a root leaf page */
mdbx_debug("%s", "allocating new root leaf page");
if (unlikely((*mc->mc_dbistate & DBI_DIRTY) == 0)) {
@ -14237,13 +14236,13 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
if (unlikely(rc2 != MDBX_SUCCESS))
return rc2;
}
rc2 = mdbx_page_new(mc, P_LEAF, 1, &np);
if (unlikely(rc2 != MDBX_SUCCESS))
return rc;
rc2 = mdbx_cursor_push(mc, np);
if (unlikely(rc2 != MDBX_SUCCESS))
return rc2;
mc->mc_db->md_root = np->mp_pgno;
struct page_result npr = mdbx_page_new(mc, P_LEAF, 1);
if (unlikely(npr.err != MDBX_SUCCESS))
return npr.err;
npr.err = mdbx_cursor_push(mc, npr.page);
if (unlikely(npr.err != MDBX_SUCCESS))
return npr.err;
mc->mc_db->md_root = npr.page->mp_pgno;
mc->mc_db->md_depth++;
if (mc->mc_db->md_flags & MDBX_INTEGERKEY) {
assert(key->iov_len >= mc->mc_dbx->md_klen_min &&
@ -14261,7 +14260,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
data->iov_len);
}
if ((mc->mc_db->md_flags & (MDBX_DUPSORT | MDBX_DUPFIXED)) == MDBX_DUPFIXED)
np->mp_flags |= P_LEAF2;
npr.page->mp_flags |= P_LEAF2;
mc->mc_flags |= C_INITIALIZED;
} else {
/* make sure all cursor pages are writable */
@ -14308,13 +14307,12 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
mc->mc_top--;
dtop++;
}
rc2 = MDBX_SUCCESS;
if (mc->mc_ki[mc->mc_top])
rc2 = mdbx_update_key(mc, key);
else
rc2 = MDBX_SUCCESS;
mdbx_cassert(mc, mc->mc_top + dtop < UINT16_MAX);
mc->mc_top += (uint16_t)dtop;
if (rc2)
if (unlikely(rc2 != MDBX_SUCCESS))
return rc2;
}
@ -14341,29 +14339,29 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
: 0;
const pgno_t pgno = node_largedata_pgno(node);
struct page_get_result gr = mdbx_page_get_ex(
struct page_result pgr = mdbx_page_get_ex(
mc, pgno, pp_txnid4chk(mc->mc_pg[mc->mc_top], mc->mc_txn));
if (unlikely(gr.err != 0))
return gr.err;
if (unlikely(!IS_OVERFLOW(gr.mp)))
if (unlikely(pgr.err != MDBX_SUCCESS))
return pgr.err;
if (unlikely(!IS_OVERFLOW(pgr.page)))
return MDBX_CORRUPTED;
/* Is the ov page from this txn (or a parent) and big enough? */
int ovpages = gr.mp->mp_pages;
if (!IS_FROZEN(mc->mc_txn, gr.mp) &&
int ovpages = pgr.page->mp_pages;
if (!IS_FROZEN(mc->mc_txn, pgr.page) &&
(unlikely(mc->mc_flags & C_GCFREEZE)
? (ovpages >= dpages)
: (ovpages ==
/* LY: add configurable threshold to keep reserve space */
dpages))) {
/* yes, overwrite it. */
if (!IS_MODIFIABLE(mc->mc_txn, gr.mp)) {
if (IS_SPILLED(mc->mc_txn, gr.mp)) {
rc2 = /* TODO: avoid search and get txn & spill-index from
page_get_result */
mdbx_page_unspill(mc->mc_txn, gr.mp, &gr.mp);
if (unlikely(rc2))
return rc2;
if (!IS_MODIFIABLE(mc->mc_txn, pgr.page)) {
if (IS_SPILLED(mc->mc_txn, pgr.page)) {
pgr = /* TODO: avoid search and get txn & spill-index from
page_result */
mdbx_page_unspill(mc->mc_txn, pgr.page);
if (unlikely(pgr.err))
return pgr.err;
} else {
if (unlikely(!mc->mc_txn->mt_parent))
return MDBX_PROBLEM;
@ -14381,16 +14379,16 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
goto fail;
}
memcpy(np, gr.mp, PAGEHDRSZ); /* Copy header of page */
gr.mp = np;
memcpy(np, pgr.page, PAGEHDRSZ); /* Copy header of page */
pgr.page = np;
mdbx_cassert(mc, mdbx_dirtylist_check(mc->mc_txn));
}
}
node_set_ds(node, data->iov_len);
if (F_ISSET(flags, MDBX_RESERVE))
data->iov_base = page_data(gr.mp);
data->iov_base = page_data(pgr.page);
else
memcpy(page_data(gr.mp), data->iov_base, data->iov_len);
memcpy(page_data(pgr.page), data->iov_base, data->iov_len);
if (mdbx_audit_enabled()) {
int err = mdbx_cursor_check(mc, 0);
@ -14400,7 +14398,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
return MDBX_SUCCESS;
}
if ((rc2 = mdbx_page_retire(mc, gr.mp)) != MDBX_SUCCESS)
if ((rc2 = mdbx_page_retire(mc, pgr.page)) != MDBX_SUCCESS)
return rc2;
} else {
olddata.iov_len = node_ds(node);
@ -14879,26 +14877,25 @@ fail:
* [out] mp Address of a page, or NULL on failure.
*
* Returns 0 on success, non-zero on failure. */
static int mdbx_page_new(MDBX_cursor *mc, unsigned flags, unsigned num,
MDBX_page **mp) {
MDBX_page *np;
int rc;
static struct page_result mdbx_page_new(MDBX_cursor *mc, const unsigned flags,
const unsigned num) {
struct page_result ret;
ret.err = mdbx_page_alloc(mc, num, &ret.page, MDBX_ALLOC_ALL);
if (unlikely(ret.err != MDBX_SUCCESS))
return ret;
if (unlikely((rc = mdbx_page_alloc(mc, num, &np, MDBX_ALLOC_ALL))))
return rc;
*mp = np;
mdbx_debug("db %u allocated new page %" PRIaPGNO ", num %u", mc->mc_dbi,
np->mp_pgno, num);
np->mp_flags = (uint16_t)flags;
np->mp_txnid = mc->mc_txn->mt_front;
ret.page->mp_pgno, num);
ret.page->mp_flags = (uint16_t)flags;
ret.page->mp_txnid = mc->mc_txn->mt_front;
mdbx_cassert(mc, *mc->mc_dbistate & DBI_DIRTY);
mdbx_cassert(mc, mc->mc_txn->mt_flags & MDBX_TXN_DIRTY);
if (likely((flags & P_OVERFLOW) == 0)) {
STATIC_ASSERT(P_BRANCH == 1);
const bool is_branch = flags & P_BRANCH;
np->mp_lower = 0;
np->mp_upper = (indx_t)(mc->mc_txn->mt_env->me_psize - PAGEHDRSZ);
ret.page->mp_lower = 0;
ret.page->mp_upper = (indx_t)(mc->mc_txn->mt_env->me_psize - PAGEHDRSZ);
mc->mc_db->md_branch_pages += is_branch;
mc->mc_db->md_leaf_pages += 1 - is_branch;
if (unlikely(mc->mc_flags & C_SUB)) {
@ -14908,11 +14905,11 @@ static int mdbx_page_new(MDBX_cursor *mc, unsigned flags, unsigned num,
}
} else {
mc->mc_db->md_overflow_pages += num;
np->mp_pages = num;
ret.page->mp_pages = num;
mdbx_cassert(mc, !(mc->mc_flags & C_SUB));
}
return MDBX_SUCCESS;
return ret;
}
static int __must_check_result mdbx_node_add_leaf2(MDBX_cursor *mc,
@ -15026,9 +15023,10 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
if (unlikely(flags & (F_DUPDATA | F_SUBDATA)))
return MDBX_PROBLEM;
const pgno_t ovpages = number_of_ovpages(mc->mc_txn->mt_env, data->iov_len);
int rc = mdbx_page_new(mc, P_OVERFLOW, ovpages, &largepage);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
const struct page_result npr = mdbx_page_new(mc, P_OVERFLOW, ovpages);
if (unlikely(npr.err != MDBX_SUCCESS))
return npr.err;
largepage = npr.page;
mdbx_debug("allocated %u overflow page(s) %" PRIaPGNO "for %" PRIuPTR
" data bytes",
largepage->mp_pages, largepage->mp_pgno, data->iov_len);
@ -17135,10 +17133,10 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
mdbx_cassert(mc, nkeys + 1 >= minkeys * 2);
/* Create a new sibling page. */
MDBX_page *sister;
rc = mdbx_page_new(mc, mp->mp_flags, 1, &sister);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
struct page_result npr = mdbx_page_new(mc, mp->mp_flags, 1);
if (unlikely(npr.err != MDBX_SUCCESS))
return npr.err;
MDBX_page *const sister = npr.page;
sister->mp_leaf2_ksize = mp->mp_leaf2_ksize;
mdbx_debug("new sibling: page %" PRIaPGNO, sister->mp_pgno);
@ -17147,10 +17145,11 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
* the cursor height may be greater because it walks
* up the stack while finding the branch slot to update. */
if (mc->mc_top < 1) {
MDBX_page *pp;
rc = mdbx_page_new(mc, P_BRANCH, 1, &pp);
npr = mdbx_page_new(mc, P_BRANCH, 1);
rc = npr.err;
if (unlikely(rc != MDBX_SUCCESS))
goto done;
MDBX_page *const pp = npr.page;
/* shift current top to make room for new parent */
mdbx_cassert(mc, mc->mc_snum < 2 && mc->mc_db->md_depth > 0);
#if MDBX_DEBUG