mdbx: more checking for a large/overflow nodes and pages.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-07-06 22:51:57 +03:00
parent b31b270ffd
commit fbe97a79a3

View File

@ -536,7 +536,7 @@ __cold intptr_t mdbx_limits_valsize_max(intptr_t pagesize,
/* Calculate the size of a leaf node.
*
* The size depends on the environment's page size; if a data item
* is too large it will be put onto an overflow page and the node
* is too large it will be put onto an large/overflow page and the node
* size will only include the key and not the data. Sizes are always
* rounded up to an even number of bytes, to guarantee 2-byte alignment
* of the MDBX_node headers. */
@ -544,7 +544,7 @@ MDBX_NOTHROW_PURE_FUNCTION static __always_inline size_t
leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) {
size_t node_bytes = node_size(key, data);
if (node_bytes > env->me_leaf_nodemax) {
/* put on overflow page */
/* put on large/overflow page */
node_bytes = node_size_len(key->iov_len, 0) + sizeof(pgno_t);
}
@ -554,7 +554,7 @@ leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) {
/* Calculate the size of a branch node.
*
* The size should depend on the environment's page size but since
* we currently don't support spilling large keys onto overflow
* we currently don't support spilling large keys onto large/overflow
* pages, it's simply the size of the MDBX_node header plus the
* size of the key. Sizes are always rounded up to an even number
* of bytes, to guarantee 2-byte alignment of the MDBX_node headers.
@ -569,7 +569,7 @@ branch_size(const MDBX_env *env, const MDBX_val *key) {
* This is just the node header plus the key, there is no data. */
size_t node_bytes = node_size(key, nullptr);
if (unlikely(node_bytes > env->me_leaf_nodemax)) {
/* put on overflow page */
/* put on large/overflow page */
/* not implemented */
mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __func__,
__LINE__);
@ -677,7 +677,7 @@ page_fill(const MDBX_env *env, const MDBX_page *mp) {
return page_used(env, mp) * 100.0 / page_space(env);
}
/* The number of overflow pages needed to store the given size. */
/* The number of large/overflow pages needed to store the given size. */
MDBX_NOTHROW_PURE_FUNCTION static __always_inline pgno_t
number_of_ovpages(const MDBX_env *env, size_t bytes) {
return bytes2pgno(env, PAGEHDRSZ - 1 + bytes) + 1;
@ -4240,7 +4240,7 @@ static const char *mdbx_leafnode_type(MDBX_node *n) {
static const char *const tp[2][2] = {{"", ": DB"},
{": sub-page", ": sub-DB"}};
return F_ISSET(node_flags(n), F_BIGDATA)
? ": overflow page"
? ": large page"
: tp[F_ISSET(node_flags(n), F_DUPDATA)]
[F_ISSET(node_flags(n), F_SUBDATA)];
}
@ -6726,7 +6726,7 @@ page_alloc_slowpath(MDBX_cursor *mc, const pgno_t num, int flags) {
txn->mt_next_pgno + (size_t)num) ||
gc_len + MDBX_PNL_SIZE(txn->tw.reclaimed_pglist) >=
MDBX_PGL_LIMIT)) {
/* Stop reclaiming to avoid overflow the page list.
/* Stop reclaiming to avoid large/overflow the page list.
* This is a rare case while search for a continuously multi-page region
* in a large database.
* todo4recovery://erased_by_github/libmdbx/issues/123 */
@ -7079,7 +7079,7 @@ __hot static struct page_result page_alloc(MDBX_cursor *mc) {
return page_alloc_slowpath(mc, 1, MDBX_ALLOC_ALL);
}
/* Copy the used portions of a non-overflow page. */
/* Copy the used portions of a non-large/overflow page. */
__hot static void mdbx_page_copy(MDBX_page *dst, const MDBX_page *src,
size_t psize) {
STATIC_ASSERT(UINT16_MAX > MAX_PAGESIZE - PAGEHDRSZ);
@ -10368,7 +10368,7 @@ static __inline void mdbx_txn_merge(MDBX_txn *const parent, MDBX_txn *const txn,
}
}
} else {
/* from begin to end with dst shrinking (a lot of new overflow pages) */
/* from begin to end with shrinking (a lot of new large/overflow pages) */
for (l = s = d = 1; s <= src->length && d <= dst->length;) {
if (unlikely(l >= d)) {
/* squash to get a gap of free space for merge */
@ -13469,7 +13469,7 @@ __cold int mdbx_env_open(MDBX_env *env, const char *pathname,
mdbx_debug("entries: %" PRIu64, db->md_entries);
mdbx_debug("branch pages: %" PRIaPGNO, db->md_branch_pages);
mdbx_debug("leaf pages: %" PRIaPGNO, db->md_leaf_pages);
mdbx_debug("overflow pages: %" PRIaPGNO, db->md_overflow_pages);
mdbx_debug("large/overflow pages: %" PRIaPGNO, db->md_overflow_pages);
mdbx_debug("root: %" PRIaPGNO, db->md_root);
mdbx_debug("schema_altered: %" PRIaTXN, db->md_mod_txnid);
}
@ -13955,15 +13955,30 @@ dirty:
goto bailout;
}
if (unlikely((ret.page->mp_upper < ret.page->mp_lower ||
if (!IS_OVERFLOW(ret.page)) {
if (unlikely(ret.page->mp_upper < ret.page->mp_lower ||
((ret.page->mp_lower | ret.page->mp_upper) & 1) ||
PAGEHDRSZ + ret.page->mp_upper > env->me_psize) &&
!IS_OVERFLOW(ret.page))) {
PAGEHDRSZ + ret.page->mp_upper > env->me_psize)) {
ret.err =
bad_page(ret.page, "invalid page lower(%u)/upper(%u) with limit (%u)\n",
bad_page(ret.page, "invalid page lower(%u)/upper(%u) with limit %u\n",
ret.page->mp_lower, ret.page->mp_upper, page_space(env));
goto bailout;
}
} else {
const pgno_t npages = ret.page->mp_pages;
if (unlikely(npages < 1 || npages >= MAX_PAGENO / 2)) {
ret.err =
bad_page(ret.page, "invalid n-pages (%u) for large-page\n", npages);
goto bailout;
}
if (unlikely(ret.page->mp_pgno + npages > mc->mc_txn->mt_next_pgno)) {
ret.err = bad_page(
ret.page,
"end of large-page beyond (%u) allocated space (%u next-pgno)\n",
ret.page->mp_pgno + npages, mc->mc_txn->mt_next_pgno);
goto bailout;
}
}
#endif /* !MDBX_DISABLE_VALIDATION */
if (unlikely(mc->mc_checking & CC_PAGECHECK) &&
@ -14257,40 +14272,40 @@ __hot static int mdbx_page_search(MDBX_cursor *mc, const MDBX_val *key,
return mdbx_page_search_root(mc, key, flags);
}
/* Read overflow node data. */
/* Read large/overflow node data. */
static __noinline int node_read_bigdata(MDBX_cursor *mc, const MDBX_node *node,
MDBX_val *data, const MDBX_page *mp) {
mdbx_cassert(mc,
node_flags(node) == F_BIGDATA && data->iov_len == node_ds(node));
struct page_result ret =
struct page_result lp =
mdbx_page_get_ex(mc, node_largedata_pgno(node), mp->mp_txnid);
if (unlikely((ret.err != MDBX_SUCCESS))) {
mdbx_debug("read overflow page %" PRIaPGNO " failed",
if (unlikely((lp.err != MDBX_SUCCESS))) {
mdbx_debug("read large/overflow page %" PRIaPGNO " failed",
node_largedata_pgno(node));
return ret.err;
return lp.err;
}
data->iov_base = page_data(ret.page);
data->iov_base = page_data(lp.page);
if (!MDBX_DISABLE_VALIDATION &&
unlikely(PAGETYPE_EXTRA(ret.page) != P_OVERFLOW))
return bad_page(ret.page, "invalid page-type 0x%x for bigdata-node",
PAGETYPE_EXTRA(ret.page));
if (!MDBX_DISABLE_VALIDATION &&
unlikely(node_size_len(node_ks(node), data->iov_len) <=
mc->mc_txn->mt_env->me_leaf_nodemax))
bad_page(mp, "too small data (%zu bytes) for bigdata-node", data->iov_len);
if (!MDBX_DISABLE_VALIDATION &&
unlikely(ret.page->mp_pages !=
number_of_ovpages(mc->mc_txn->mt_env, data->iov_len))) {
if (ret.page->mp_pages <
number_of_ovpages(mc->mc_txn->mt_env, data->iov_len))
return bad_page(ret.page,
unlikely(PAGETYPE_EXTRA(lp.page) != P_OVERFLOW))
return bad_page(lp.page, "invalid page-type 0x%x for bigdata-node",
PAGETYPE_EXTRA(lp.page));
if (!MDBX_DISABLE_VALIDATION) {
const MDBX_env *env = mc->mc_txn->mt_env;
const size_t dsize = data->iov_len;
if (unlikely(node_size_len(node_ks(node), dsize) <= env->me_leaf_nodemax))
bad_page(mp, "too small data (%zu bytes) for bigdata-node", dsize);
const unsigned npages = number_of_ovpages(env, dsize);
if (unlikely(lp.page->mp_pages != npages)) {
if (lp.page->mp_pages < npages)
return bad_page(lp.page,
"too less n-pages %u for bigdata-node (%zu bytes)",
ret.page->mp_pages, data->iov_len);
lp.page->mp_pages, dsize);
else
bad_page(ret.page, "extra n-pages %u for bigdata-node (%zu bytes)",
ret.page->mp_pages, data->iov_len);
bad_page(lp.page, "extra n-pages %u for bigdata-node (%zu bytes)",
lp.page->mp_pages, dsize);
}
}
return MDBX_SUCCESS;
}
@ -16096,7 +16111,7 @@ new_sub:;
nflags |= MDBX_SPLIT_REPLACE;
rc = mdbx_page_split(mc, key, rdata, P_INVALID, nflags);
if (rc == MDBX_SUCCESS && mdbx_audit_enabled())
rc = mdbx_cursor_check(mc);
rc = insert_key ? mdbx_cursor_check(mc) : mdbx_cursor_check_updating(mc);
} else {
/* There is room already in this leaf page. */
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
@ -16338,7 +16353,7 @@ int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
else if (unlikely((node_flags(node) ^ flags) & F_SUBDATA))
return MDBX_INCOMPATIBLE;
/* add overflow pages to free list */
/* add large/overflow pages to free list */
if (F_ISSET(node_flags(node), F_BIGDATA)) {
MDBX_page *omp;
if (unlikely((rc = mdbx_page_get(mc, node_largedata_pgno(node), &omp,
@ -16511,13 +16526,13 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
size_t node_bytes;
if (unlikely(flags & F_BIGDATA)) {
/* Data already on overflow page. */
/* Data already on large/overflow page. */
STATIC_ASSERT(sizeof(pgno_t) % 2 == 0);
node_bytes =
node_size_len(key->iov_len, 0) + sizeof(pgno_t) + sizeof(indx_t);
} else if (unlikely(node_size(key, data) >
mc->mc_txn->mt_env->me_leaf_nodemax)) {
/* Put data on overflow page. */
/* Put data on large/overflow page. */
if (unlikely(mc->mc_db->md_flags & MDBX_DUPSORT)) {
mdbx_error("Unexpected target %s flags 0x%x for large data-item",
"dupsort-db", mc->mc_db->md_flags);
@ -16533,7 +16548,7 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
if (unlikely(npr.err != MDBX_SUCCESS))
return npr.err;
largepage = npr.page;
mdbx_debug("allocated %u overflow page(s) %" PRIaPGNO "for %" PRIuPTR
mdbx_debug("allocated %u large/overflow page(s) %" PRIaPGNO "for %" PRIuPTR
" data bytes",
largepage->mp_pages, largepage->mp_pgno, data->iov_len);
flags |= F_BIGDATA;
@ -18124,30 +18139,43 @@ __cold static int mdbx_page_check(MDBX_cursor *const mc,
rc = bad_page(mp, "unknown/extra page-flags (have 0x%x, expect 0x%x)\n",
mp->mp_flags & flags_mask, flags_expected);
mdbx_cassert(mc, (mc->mc_checking & CC_LEAF2) == 0 ||
(mc->mc_flags & C_SUB) != 0);
const uint8_t type = PAGETYPE_EXTRA(mp);
switch (type) {
default:
return bad_page(mp, "invalid type (%u)\n", type);
case P_OVERFLOW:
if (unlikely((mc->mc_flags & C_SUB) || (mc->mc_checking & CC_LEAF2)))
rc =
bad_page(mp, "unexpected overflow-page for dupsort db (flags 0x%x)\n",
mc->mc_db->md_flags);
if (unlikely(mp->mp_pages < 1 && mp->mp_pages >= MAX_PAGENO / 2))
rc = bad_page(mp, "invalid overflow n-pages (%u)\n", mp->mp_pages);
if (unlikely(mp->mp_pgno + mp->mp_pages > mc->mc_txn->mt_next_pgno))
rc = bad_page(mp, "overflow page beyond (%u) next-pgno\n",
mp->mp_pgno + mp->mp_pages);
return rc;
case P_LEAF:
if (unlikely(mc->mc_flags & C_SUB))
rc = bad_page(mp, "unexpected %s-page for %s (db-flags 0x%x)\n", "large",
"nested dupsort tree", mc->mc_db->md_flags);
const pgno_t npages = mp->mp_pages;
if (unlikely(npages < 1 || npages >= MAX_PAGENO / 2))
rc = bad_page(mp, "invalid n-pages (%u) for large-page\n", npages);
if (unlikely(mp->mp_pgno + npages > mc->mc_txn->mt_next_pgno))
rc = bad_page(
mp, "end of large-page beyond (%u) allocated space (%u next-pgno)\n",
mp->mp_pgno + npages, mc->mc_txn->mt_next_pgno);
return rc; //-------------------------- end of large/overflow page handling
case P_LEAF | P_SUBP:
if (unlikely(mc->mc_db->md_depth != 1))
rc = bad_page(mp, "unexpected %s-page for %s (db-flags 0x%x)\n",
"leaf-sub", "nested dupsort db", mc->mc_db->md_flags);
/* fall through */
__fallthrough;
case P_LEAF:
if (unlikely((mc->mc_checking & CC_LEAF2) != 0))
rc = bad_page(
mp, "unexpected leaf-page for dupfixed subtree (db-lags 0x%x)\n",
mc->mc_db->md_flags);
break;
case P_LEAF | P_LEAF2:
case P_LEAF | P_LEAF2 | P_SUBP:
if (unlikely(mc->mc_db->md_depth != 1))
rc = bad_page(mp, "unexpected %s-page for %s (db-flags 0x%x)\n",
"leaf2-sub", "nested dupsort db", mc->mc_db->md_flags);
/* fall through */
__fallthrough;
case P_LEAF | P_LEAF2:
if (unlikely((mc->mc_checking & CC_LEAF2) == 0))
rc = bad_page(
mp,
@ -18158,12 +18186,24 @@ __cold static int mdbx_page_check(MDBX_cursor *const mc,
break;
}
if (unlikely(mp->mp_upper < mp->mp_lower ||
((mp->mp_lower | mp->mp_upper) & 1) ||
PAGEHDRSZ + mp->mp_upper > env->me_psize))
rc = bad_page(mp, "invalid page lower(%u)/upper(%u) with limit %u\n",
mp->mp_lower, mp->mp_upper, page_space(env));
char *const end_of_page = (char *)mp + env->me_psize;
const unsigned nkeys = page_numkeys(mp);
if ((mc->mc_checking & CC_UPDATING) == 0 || !IS_MODIFIABLE(mc->mc_txn, mp)) {
if (unlikely(nkeys < 2 && IS_BRANCH(mp)))
rc = bad_page(mp, "branch-page nkeys (%u) < 2\n", nkeys);
}
if (unlikely(nkeys <= IS_BRANCH(mp)) &&
(!(mc->mc_flags & C_SUB) || mc->mc_db->md_entries) &&
((mc->mc_checking & CC_UPDATING) == 0 || !IS_MODIFIABLE(mc->mc_txn, mp)))
rc = bad_page(mp, "%s-page nkeys (%u) < %u\n",
IS_BRANCH(mp) ? "branch" : "leaf", nkeys, 1 + IS_BRANCH(mp));
if (!IS_LEAF2(mp) && unlikely(PAGEHDRSZ + mp->mp_upper +
nkeys * sizeof(MDBX_node) + nkeys - 1 >
env->me_psize))
rc = bad_page(mp, "invalid page upper (%u) for nkeys %u with limit %u\n",
mp->mp_upper, nkeys, page_space(env));
const size_t ksize_max = keysize_max(env->me_psize, 0);
const size_t leaf2_ksize = mp->mp_leaf2_ksize;
@ -18279,29 +18319,35 @@ __cold static int mdbx_page_check(MDBX_cursor *const mc,
mp,
"big-node data size (%zu) <> min/max value-length (%zu/%zu)\n",
dsize, mc->mc_dbx->md_vlen_min, mc->mc_dbx->md_vlen_max);
if (unlikely(node_size_len(node_ks(node), dsize) <=
mc->mc_txn->mt_env->me_leaf_nodemax))
bad_page(mp, "too small data (%zu bytes) for bigdata-node", dsize);
if ((mc->mc_checking & CC_RETIRING) == 0) {
/* Disable full checking to avoid infinite recursion
* with a corrupted DB */
#if !MDBX_DISABLE_VALIDATION
const uint8_t save_checking_level = mc->mc_checking;
mc->mc_checking &= ~CC_PAGECHECK;
#endif /* MDBX_DISABLE_VALIDATION */
const struct page_result lp =
mdbx_page_get_ex(mc, node_largedata_pgno(node), mp->mp_txnid);
#if !MDBX_DISABLE_VALIDATION
mc->mc_checking = save_checking_level;
#endif /* MDBX_DISABLE_VALIDATION */
if (unlikely(lp.err != MDBX_SUCCESS))
return lp.err;
if (unlikely(!IS_OVERFLOW(lp.page))) {
rc = bad_page(mp, "big-node refs to non-overflow page (%u)\n",
rc = bad_page(mp, "big-node refs to non-large page (%u)\n",
lp.page->mp_pgno);
continue;
}
if (unlikely(number_of_ovpages(env, dsize) > lp.page->mp_pages))
rc =
bad_page(mp, "big-node size (%zu) mismatch n-pages size (%u)\n",
dsize, lp.page->mp_pages);
const unsigned npages = number_of_ovpages(env, dsize);
if (unlikely(lp.page->mp_pages != npages)) {
if (lp.page->mp_pages < npages)
rc = bad_page(lp.page,
"too less n-pages %u for bigdata-node (%zu bytes)",
lp.page->mp_pages, dsize);
else
bad_page(lp.page, "extra n-pages %u for bigdata-node (%zu bytes)",
lp.page->mp_pages, dsize);
}
}
continue;
}
@ -18345,8 +18391,6 @@ __cold static int mdbx_page_check(MDBX_cursor *const mc,
continue;
} else {
const MDBX_page *const sp = (MDBX_page *)data;
const char *const end_of_subpage = data + dsize;
const int nsubkeys = page_numkeys(sp);
switch (sp->mp_flags & /* ignore legacy P_DIRTY flag */ ~0x10) {
case P_LEAF | P_SUBP:
case P_LEAF | P_LEAF2 | P_SUBP:
@ -18357,6 +18401,13 @@ __cold static int mdbx_page_check(MDBX_cursor *const mc,
continue;
}
const char *const end_of_subpage = data + dsize;
const int nsubkeys = page_numkeys(sp);
if (unlikely(nsubkeys == 0) && !(mc->mc_checking & CC_UPDATING) &&
mc->mc_db->md_entries)
rc = bad_page(mp, "no keys on a %s-page\n",
IS_LEAF2(sp) ? "leaf2-sub" : "leaf-sub");
MDBX_val sub_here, sub_prev = {0, 0};
for (int j = 0; j < nsubkeys; j++) {
if (IS_LEAF2(sp)) {
@ -20974,7 +21025,7 @@ static int mdbx_drop_tree(MDBX_cursor *mc, const bool may_have_subDBs) {
/* DUPSORT sub-DBs have no ovpages/DBs. Omit scanning leaves.
* This also avoids any P_LEAF2 pages, which have no nodes.
* Also if the DB doesn't have sub-DBs and has no overflow
* Also if the DB doesn't have sub-DBs and has no large/overflow
* pages, omit scanning leaves. */
if (!(may_have_subDBs | mc->mc_db->md_overflow_pages))
mdbx_cursor_pop(mc);