mdbx: rework nodes/keys limitations.

Change-Id: Id0515346d762d4554102775f26a0fc33f3c0f29e
This commit is contained in:
Leonid Yuriev 2021-03-07 16:27:12 +03:00
parent b164baa1f5
commit 7dfd3f18f8
2 changed files with 136 additions and 153 deletions

View File

@ -349,44 +349,90 @@ node_largedata_pgno(const MDBX_node *const __restrict node) {
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
* Key length limitation factors: * Nodes, Keys & Values length limitation factors:
* *
* - Branch-page must contain at least two (MDBX_MINKEYS) nodes, * BRANCH_NODE_MAX
* within each a key and a child page number. But we can't split a page if * Branch-page must contain at least two nodes, within each a key and a child
* it contains less that 4 keys. Therefore, at least 3 branch-node should * page number. But page can't be splitted if it contains less that 4 keys,
* fit in the single branch-page: * i.e. a page should not overflow before adding the fourth key.
* pageroom = pagesize - page_hdr_len; * Therefore, at least 3 branch-node should fit in the single branch-page:
* branch.maxnode = even_floor(pageroom / 3 - sizeof(indx_t)); * PAGEROOM = pagesize - page_hdr_len;
* branch.maxkey = branch.maxnode - node_hdr_len; * BRANCH_NODE_MAX = even_floor(PAGEROOM / 3 - sizeof(indx_t));
* KEYLEN_MAX = BRANCH_NODE_MAX - node_hdr_len;
* *
* - Leaf-node of non-dupsort database must fit into one leaf-page, * LEAF_NODE_MAX
* where a value could be placed on a large/overflow page: * Leaf-node must fit into single leaf-page, where a value could be placed on
* leaf.maxnode = even_floor(pageroom - sizeof(indx_t)); * a large/overflow page. However, may require to insert a nearly page-sized
* leaf.maxkey = leaf.maxnode - node_hdr_len - sizeof(pgno_t); * node between two large nodes are already fill-up a page. In this case the
* page must be splitted to two if some pair of nodes fits on one page, or
* otherwise the page should be splitted to the THREE with a single node
* per each of ones. Such 1-into-3 page splitting is costly and complex since
* requires TWO insertion into the parent page, that could lead to split it
* and so on up to the root. Therefore double-splitting is avoided here and
* the maximum node size is half of a leaf page space:
* LEAF_NODE_MAX = even_floor(PAGEROOM / 2 - sizeof(indx_t));
* DATALEN_NO_OVERFLOW = LEAF_NODE_MAX - KEYLEN_MAX;
* *
* - SubDatabase-node must fit into one leaf-page: * - SubDatabase-node must fit into one leaf-page:
* subdb.maxname = leaf.maxnode - node_hdr_len - sizeof(MDBX_db); * SUBDB_NAME_MAX = LEAF_NODE_MAX - node_hdr_len - sizeof(MDBX_db);
* *
* - Dupsort values itself are a keys in a dupsort-subdb and couldn't be * - Dupsort values itself are a keys in a dupsort-subdb and couldn't be longer
* longer than the branch.maxkey. But dupsort node must fit into one * than the KEYLEN_MAX. But dupsort node must not great than LEAF_NODE_MAX,
* leaf-page, since dupsort value couldn't be placed on a large/overflow * since dupsort value couldn't be placed on a large/overflow page:
* page. * DUPSORT_DATALEN_MAX = min(KEYLEN_MAX,
* * max(DATALEN_NO_OVERFLOW, sizeof(MDBX_db));
* - So, the simplest solution is to use half of branch.maxkey as
* a common maxkey value. Nevertheless, the actual values of maxkey are:
* nondupsort.maxkey = even_floor(pageroom / 3)
* - sizeof(indx_t) - node_hdr_len;
* dupsort.maxkey(value) = min(nondupsort.maxkey,
* leaf.maxnode - even_ceil(length(value)));
*/ */
#define PAGEROOM(pagesize) ((pagesize)-PAGEHDRSZ) #define PAGEROOM(pagesize) ((pagesize)-PAGEHDRSZ)
#define EVEN_FLOOR(n) ((n) & ~(size_t)1) #define EVEN_FLOOR(n) ((n) & ~(size_t)1)
#define BRANCH_NODEMAX(pagesize) \ #define BRANCH_NODE_MAX(pagesize) \
(EVEN_FLOOR(PAGEROOM(pagesize) / (MDBX_MINKEYS * 2 - 1)) - sizeof(indx_t)) (EVEN_FLOOR(PAGEROOM(pagesize) / 3) - sizeof(indx_t))
#define LEAF_NODEMAX(pagesize) (PAGEROOM(pagesize) - sizeof(indx_t)) #define LEAF_NODE_MAX(pagesize) \
(EVEN_FLOOR(PAGEROOM(pagesize) / 2) - sizeof(indx_t))
#define MAX_GC1OVPAGE(pagesize) (PAGEROOM(pagesize) / sizeof(pgno_t) - 1) #define MAX_GC1OVPAGE(pagesize) (PAGEROOM(pagesize) / sizeof(pgno_t) - 1)
static __inline unsigned keysize_max(size_t pagesize, MDBX_db_flags_t flags) {
assert(pagesize >= MIN_PAGESIZE && pagesize <= MAX_PAGESIZE &&
is_powerof2(pagesize));
STATIC_ASSERT(BRANCH_NODE_MAX(MIN_PAGESIZE) - NODESIZE >= 8);
if (flags & MDBX_INTEGERKEY)
return 8 /* sizeof(uint64_t) */;
const intptr_t max_branch_key = BRANCH_NODE_MAX(pagesize) - NODESIZE;
STATIC_ASSERT(LEAF_NODE_MAX(MIN_PAGESIZE) - NODESIZE -
/* sizeof(uint64) as a key */ 8 >
sizeof(MDBX_db));
if (flags &
(MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_REVERSEDUP | MDBX_INTEGERDUP)) {
const intptr_t max_dupsort_leaf_key =
LEAF_NODE_MAX(pagesize) - NODESIZE - sizeof(MDBX_db);
return (max_branch_key < max_dupsort_leaf_key)
? (unsigned)max_branch_key
: (unsigned)max_dupsort_leaf_key;
}
return (unsigned)max_branch_key;
}
static __inline size_t valsize_max(size_t pagesize, MDBX_db_flags_t flags) {
assert(pagesize >= MIN_PAGESIZE && pagesize <= MAX_PAGESIZE &&
is_powerof2(pagesize));
if (flags & MDBX_INTEGERDUP)
return 8 /* sizeof(uint64_t) */;
if (flags & (MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_REVERSEDUP))
return keysize_max(pagesize, 0);
const unsigned page_ln2 = log2n(pagesize);
const size_t hard = 0x7FF00000ul;
const size_t hard_pages = hard >> page_ln2;
STATIC_ASSERT(MDBX_PGL_LIMIT <= MAX_PAGENO);
const size_t pages_limit = MDBX_PGL_LIMIT / 4;
const size_t limit =
(hard_pages < pages_limit) ? hard : (pages_limit << page_ln2);
return (limit < MAX_MAPSIZE / 2) ? limit : MAX_MAPSIZE / 2;
}
__cold int mdbx_env_get_maxkeysize(const MDBX_env *env) { __cold int mdbx_env_get_maxkeysize(const MDBX_env *env) {
return mdbx_env_get_maxkeysize_ex(env, MDBX_DUPSORT); return mdbx_env_get_maxkeysize_ex(env, MDBX_DUPSORT);
} }
@ -416,21 +462,7 @@ __cold intptr_t mdbx_limits_keysize_max(intptr_t pagesize,
!is_powerof2((size_t)pagesize))) !is_powerof2((size_t)pagesize)))
return -1; return -1;
STATIC_ASSERT(BRANCH_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) >= 8); return keysize_max(pagesize, flags);
STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) >= 8);
STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE >= sizeof(MDBX_db));
if (flags & MDBX_INTEGERKEY)
return 8 /* sizeof(uint64_t) */;
STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) - NODESIZE - sizeof(pgno_t) <
LEAF_NODEMAX(MAX_PAGESIZE) - NODESIZE - sizeof(MDBX_db));
STATIC_ASSERT(BRANCH_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) <
LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(MDBX_db));
if (flags &
(MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_INTEGERDUP | MDBX_REVERSEDUP))
return BRANCH_NODEMAX(pagesize) - NODESIZE - sizeof(MDBX_db);
return BRANCH_NODEMAX(pagesize) - NODESIZE - sizeof(pgno_t);
} }
__cold int mdbx_env_get_maxvalsize_ex(const MDBX_env *env, __cold int mdbx_env_get_maxvalsize_ex(const MDBX_env *env,
@ -450,20 +482,7 @@ __cold intptr_t mdbx_limits_valsize_max(intptr_t pagesize,
!is_powerof2((size_t)pagesize))) !is_powerof2((size_t)pagesize)))
return -1; return -1;
if (flags & MDBX_INTEGERDUP) return valsize_max(pagesize, flags);
return 8 /* sizeof(uint64_t) */;
if (flags & (MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_REVERSEDUP))
return BRANCH_NODEMAX(pagesize) - NODESIZE;
const unsigned page_ln2 = log2n(pagesize);
const size_t hard = 0x7FF00000ul;
const size_t hard_pages = hard >> page_ln2;
STATIC_ASSERT(MDBX_PGL_LIMIT <= MAX_PAGENO);
const size_t pages_limit = MDBX_PGL_LIMIT / 4;
const size_t limit =
(hard_pages < pages_limit) ? hard : (pages_limit << page_ln2);
return (limit < MAX_MAPSIZE / 2) ? limit : MAX_MAPSIZE / 2;
} }
/* Calculate the size of a leaf node. /* Calculate the size of a leaf node.
@ -476,30 +495,9 @@ __cold intptr_t mdbx_limits_valsize_max(intptr_t pagesize,
MDBX_NOTHROW_PURE_FUNCTION static __always_inline size_t MDBX_NOTHROW_PURE_FUNCTION static __always_inline size_t
leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) { leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) {
size_t node_bytes = node_size(key, data); size_t node_bytes = node_size(key, data);
/* NOTE: The actual limit is LEAF_NODEMAX(env->me_psize), but it reasonable to if (node_bytes > env->me_leaf_nodemax) {
* use env->me_branch_nodemax (which is 3 times less) as the threshold
* because:
* - Large threshold implies that any insertion/update could result split
* a single leaf page to THREE, which requires TWO insertion into parent
* branch page, then could leads to split parent page and so on up to
* the root. Such double-splitting is complex, ie costly (in case simple
* clear implementation) either dangerous (in case high-optimized
* implementation).
* - This does not affect capabilities, i.e. it does not limit the maximum
* key size.
* - At a lower threshold, on average, the density of keys on leaf pages
* increases and the height of the tree decreases. Thus, this lead the
* less number of pages participating in the search, and the search
* speed increases.
* - On the other hand, there is no universal gold ratio here and with a
* smaller threshold, we will create more overflows/large pages,
* i.e. the database size will be larger as will the IOPS volume.
*
* So, the lower threshold is not a silver bullet, but it allow implementation
* to be much simple and robust, without adding a flaws. */
if (node_bytes > env->me_branch_nodemax) {
/* put on overflow page */ /* put on overflow page */
node_bytes = node_size(key, nullptr) + sizeof(pgno_t); node_bytes = node_size_len(key->iov_len, 0) + sizeof(pgno_t);
} }
return node_bytes + sizeof(indx_t); return node_bytes + sizeof(indx_t);
@ -522,7 +520,7 @@ branch_size(const MDBX_env *env, const MDBX_val *key) {
/* Size of a node in a branch page with a given key. /* Size of a node in a branch page with a given key.
* This is just the node header plus the key, there is no data. */ * This is just the node header plus the key, there is no data. */
size_t node_bytes = node_size(key, nullptr); size_t node_bytes = node_size(key, nullptr);
if (unlikely(node_bytes > env->me_branch_nodemax)) { if (unlikely(node_bytes > env->me_leaf_nodemax)) {
/* put on overflow page */ /* put on overflow page */
/* not implemented */ /* not implemented */
mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __func__, mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __func__,
@ -10393,15 +10391,20 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) {
maxgc_ov1page < (intptr_t)MDBX_PGL_LIMIT / 4); maxgc_ov1page < (intptr_t)MDBX_PGL_LIMIT / 4);
env->me_maxgc_ov1page = (unsigned)maxgc_ov1page; env->me_maxgc_ov1page = (unsigned)maxgc_ov1page;
STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) > sizeof(MDBX_db) + NODESIZE + 42); STATIC_ASSERT(LEAF_NODE_MAX(MIN_PAGESIZE) > sizeof(MDBX_db) + NODESIZE + 42);
STATIC_ASSERT(LEAF_NODEMAX(MAX_PAGESIZE) < UINT16_MAX); STATIC_ASSERT(LEAF_NODE_MAX(MAX_PAGESIZE) < UINT16_MAX);
STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) > BRANCH_NODEMAX(MIN_PAGESIZE)); STATIC_ASSERT(LEAF_NODE_MAX(MIN_PAGESIZE) > BRANCH_NODE_MAX(MIN_PAGESIZE));
STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) > NODESIZE + 42); STATIC_ASSERT(BRANCH_NODE_MAX(MAX_PAGESIZE) > NODESIZE + 42);
STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) < UINT16_MAX); STATIC_ASSERT(BRANCH_NODE_MAX(MAX_PAGESIZE) < UINT16_MAX);
const intptr_t branch_nodemax = BRANCH_NODEMAX(pagesize); const intptr_t branch_nodemax = BRANCH_NODE_MAX(pagesize);
mdbx_ensure(env, branch_nodemax > 42 && branch_nodemax < (int)UINT16_MAX && const intptr_t leaf_nodemax = LEAF_NODE_MAX(pagesize);
branch_nodemax % 2 == 0); mdbx_ensure(env,
env->me_branch_nodemax = (unsigned)branch_nodemax; branch_nodemax > (intptr_t)(NODESIZE + 42) &&
branch_nodemax % 2 == 0 &&
leaf_nodemax > (intptr_t)(sizeof(MDBX_db) + NODESIZE + 42) &&
leaf_nodemax > branch_nodemax &&
leaf_nodemax < (int)UINT16_MAX && leaf_nodemax % 2 == 0);
env->me_leaf_nodemax = (unsigned)leaf_nodemax;
env->me_psize2log = (uint8_t)log2n(pagesize); env->me_psize2log = (uint8_t)log2n(pagesize);
mdbx_assert(env, pgno2bytes(env, 1) == pagesize); mdbx_assert(env, pgno2bytes(env, 1) == pagesize);
mdbx_assert(env, bytes2pgno(env, pagesize + pagesize) == 2); mdbx_assert(env, bytes2pgno(env, pagesize + pagesize) == 2);
@ -12758,13 +12761,13 @@ static int mdbx_setup_dbx(MDBX_dbx *const dbx, const MDBX_db *const db,
dbx->md_klen_min = dbx->md_klen_min =
(db->md_flags & MDBX_INTEGERKEY) ? 4 /* sizeof(uint32_t) */ : 0; (db->md_flags & MDBX_INTEGERKEY) ? 4 /* sizeof(uint32_t) */ : 0;
dbx->md_klen_max = mdbx_limits_keysize_max(pagesize, db->md_flags); dbx->md_klen_max = keysize_max(pagesize, db->md_flags);
assert(dbx->md_klen_max != (unsigned)-1); assert(dbx->md_klen_max != (unsigned)-1);
dbx->md_vlen_min = (db->md_flags & MDBX_INTEGERDUP) dbx->md_vlen_min = (db->md_flags & MDBX_INTEGERDUP)
? 4 /* sizeof(uint32_t) */ ? 4 /* sizeof(uint32_t) */
: ((db->md_flags & MDBX_DUPFIXED) ? 1 : 0); : ((db->md_flags & MDBX_DUPFIXED) ? 1 : 0);
dbx->md_vlen_max = mdbx_limits_valsize_max(pagesize, db->md_flags); dbx->md_vlen_max = valsize_max(pagesize, db->md_flags);
assert(dbx->md_vlen_max != (unsigned)-1); assert(dbx->md_vlen_max != (unsigned)-1);
if ((db->md_flags & (MDBX_DUPFIXED | MDBX_INTEGERDUP)) != 0 && db->md_xsize) { if ((db->md_flags & (MDBX_DUPFIXED | MDBX_INTEGERDUP)) != 0 && db->md_xsize) {
@ -13925,8 +13928,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
return MDBX_BAD_VALSIZE; return MDBX_BAD_VALSIZE;
if (unlikely(mc->mc_db->md_xsize != data->iov_len) && mc->mc_db->md_xsize) if (unlikely(mc->mc_db->md_xsize != data->iov_len) && mc->mc_db->md_xsize)
return MDBX_BAD_VALSIZE; return MDBX_BAD_VALSIZE;
if (unlikely(dcount > if (unlikely(dcount > MAX_MAPSIZE / 2 /
MAX_MAPSIZE / 2 / (BRANCH_NODEMAX(MAX_PAGESIZE) - NODESIZE))) { (BRANCH_NODE_MAX(MAX_PAGESIZE) - NODESIZE))) {
/* checking for multiplication overflow */ /* checking for multiplication overflow */
if (unlikely(dcount > MAX_MAPSIZE / 2 / data->iov_len)) if (unlikely(dcount > MAX_MAPSIZE / 2 / data->iov_len))
return MDBX_TOO_LARGE; return MDBX_TOO_LARGE;
@ -14056,9 +14059,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
flags -= MDBX_CURRENT; flags -= MDBX_CURRENT;
goto skip_check_samedata; goto skip_check_samedata;
} }
} else if (unlikely(node_size(key, data) > } else if (unlikely(node_size(key, data) > env->me_leaf_nodemax)) {
/* See note inside leaf_size() */
env->me_branch_nodemax)) {
rc = mdbx_cursor_del(mc, 0); rc = mdbx_cursor_del(mc, 0);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
return rc; return rc;
@ -14203,8 +14204,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
/* The key does not exist */ /* The key does not exist */
mdbx_debug("inserting key at index %i", mc->mc_ki[mc->mc_top]); mdbx_debug("inserting key at index %i", mc->mc_ki[mc->mc_top]);
if ((mc->mc_db->md_flags & MDBX_DUPSORT) && if ((mc->mc_db->md_flags & MDBX_DUPSORT) &&
node_size(key, data) > node_size(key, data) > env->me_leaf_nodemax) {
/* See note inside leaf_size() */ env->me_branch_nodemax) {
/* Too big for a node, insert in sub-DB. Set up an empty /* Too big for a node, insert in sub-DB. Set up an empty
* "old sub-page" for prep_subDB to expand to a full page. */ * "old sub-page" for prep_subDB to expand to a full page. */
fp->mp_leaf2_ksize = fp->mp_leaf2_ksize =
@ -14262,8 +14262,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
/* overflow page overwrites need special handling */ /* overflow page overwrites need special handling */
if (unlikely(F_ISSET(node_flags(node), F_BIGDATA))) { if (unlikely(F_ISSET(node_flags(node), F_BIGDATA))) {
int level, ovpages, int level, ovpages,
dpages = (node_size(key, data) > dpages = (node_size(key, data) > env->me_leaf_nodemax)
/* See note inside leaf_size() */ env->me_branch_nodemax)
? number_of_ovpages(env, data->iov_len) ? number_of_ovpages(env, data->iov_len)
: 0; : 0;
@ -14376,19 +14375,13 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
* considers them equal. So continue update since called without. * considers them equal. So continue update since called without.
* Continue to update since was called without MDBX_NODUPDATA. */ * Continue to update since was called without MDBX_NODUPDATA. */
} }
mdbx_cassert( mdbx_cassert(mc, node_size(key, data) <= env->me_leaf_nodemax);
mc,
node_size(key, data) <=
/* See note inside leaf_size() */ env->me_branch_nodemax);
goto current; goto current;
} }
/* Just overwrite the current item */ /* Just overwrite the current item */
if (flags & MDBX_CURRENT) { if (flags & MDBX_CURRENT) {
mdbx_cassert( mdbx_cassert(mc, node_size(key, data) <= env->me_leaf_nodemax);
mc,
node_size(key, data) <=
/* See note inside leaf_size() */ env->me_branch_nodemax);
goto current; goto current;
} }
@ -14446,8 +14439,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
} }
fp_flags = fp->mp_flags; fp_flags = fp->mp_flags;
if (NODESIZE + node_ks(node) + xdata.iov_len > if (node_size_len(node_ks(node), xdata.iov_len) >
/* See note inside leaf_size() */ env->me_branch_nodemax) { env->me_leaf_nodemax) {
/* Too big for a sub-page, convert to sub-DB */ /* Too big for a sub-page, convert to sub-DB */
fp_flags &= ~P_SUBP; fp_flags &= ~P_SUBP;
prep_subDB: prep_subDB:
@ -14949,16 +14942,17 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
data ? data->iov_len : 0, key ? key->iov_len : 0, DKEY(key)); data ? data->iov_len : 0, key ? key->iov_len : 0, DKEY(key));
mdbx_cassert(mc, key != NULL && data != NULL); mdbx_cassert(mc, key != NULL && data != NULL);
mdbx_cassert(mc, PAGETYPE(mp) == P_LEAF); mdbx_cassert(mc, PAGETYPE(mp) == P_LEAF);
mdbx_cassert(mc, page_room(mp) >= leaf_size(mc->mc_txn->mt_env, key, data));
MDBX_page *largepage = NULL; MDBX_page *largepage = NULL;
size_t leaf_bytes; size_t node_bytes;
if (unlikely(flags & F_BIGDATA)) { if (unlikely(flags & F_BIGDATA)) {
/* Data already on overflow page. */ /* Data already on overflow page. */
STATIC_ASSERT(sizeof(pgno_t) % 2 == 0); STATIC_ASSERT(sizeof(pgno_t) % 2 == 0);
leaf_bytes = node_size(key, nullptr) + sizeof(pgno_t) + sizeof(indx_t); node_bytes =
node_size_len(key->iov_len, 0) + sizeof(pgno_t) + sizeof(indx_t);
} else if (unlikely(node_size(key, data) > } else if (unlikely(node_size(key, data) >
/* See note inside leaf_size() */ mc->mc_txn->mt_env->me_leaf_nodemax)) {
mc->mc_txn->mt_env->me_branch_nodemax)) {
/* Put data on overflow page. */ /* Put data on overflow page. */
mdbx_ensure(mc->mc_txn->mt_env, mdbx_ensure(mc->mc_txn->mt_env,
!F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)); !F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT));
@ -14972,11 +14966,12 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
" data bytes", " data bytes",
largepage->mp_pages, largepage->mp_pgno, data->iov_len); largepage->mp_pages, largepage->mp_pgno, data->iov_len);
flags |= F_BIGDATA; flags |= F_BIGDATA;
leaf_bytes = node_size(key, nullptr) + sizeof(pgno_t) + sizeof(indx_t); node_bytes =
node_size_len(key->iov_len, 0) + sizeof(pgno_t) + sizeof(indx_t);
} else { } else {
leaf_bytes = node_size(key, data) + sizeof(indx_t); node_bytes = node_size(key, data) + sizeof(indx_t);
} }
mdbx_cassert(mc, leaf_bytes == leaf_size(mc->mc_txn->mt_env, key, data)); mdbx_cassert(mc, node_bytes == leaf_size(mc->mc_txn->mt_env, key, data));
/* Move higher pointers up one slot. */ /* Move higher pointers up one slot. */
const unsigned nkeys = page_numkeys(mp); const unsigned nkeys = page_numkeys(mp);
@ -14986,7 +14981,7 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
/* Adjust free space offsets. */ /* Adjust free space offsets. */
const intptr_t lower = mp->mp_lower + sizeof(indx_t); const intptr_t lower = mp->mp_lower + sizeof(indx_t);
const intptr_t upper = mp->mp_upper - (leaf_bytes - sizeof(indx_t)); const intptr_t upper = mp->mp_upper - (node_bytes - sizeof(indx_t));
if (unlikely(lower > upper)) { if (unlikely(lower > upper)) {
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR; mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return MDBX_PAGE_FULL; return MDBX_PAGE_FULL;
@ -15635,7 +15630,7 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) {
ptr = mp->mp_ptrs[indx]; ptr = mp->mp_ptrs[indx];
if (MDBX_DEBUG) { if (MDBX_DEBUG) {
MDBX_val k2; MDBX_val k2;
char kbuf2[DKBUF_MAXKEYSIZE * 2 + 1]; char kbuf2[DKBUF_MAX * 2 + 1];
k2.iov_base = node_key(node); k2.iov_base = node_key(node);
k2.iov_len = node_ks(node); k2.iov_len = node_ks(node);
mdbx_debug("update key %u (offset %u) [%s] to [%s] on page %" PRIaPGNO, mdbx_debug("update key %u (offset %u) [%s] to [%s] on page %" PRIaPGNO,
@ -16023,8 +16018,7 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
/* Move all nodes from src to dst */ /* Move all nodes from src to dst */
const unsigned dst_nkeys = page_numkeys(pdst); const unsigned dst_nkeys = page_numkeys(pdst);
const unsigned src_nkeys = page_numkeys(psrc); const unsigned src_nkeys = page_numkeys(psrc);
mdbx_cassert(cdst, dst_nkeys + src_nkeys >= mdbx_cassert(cdst, dst_nkeys + src_nkeys >= (IS_LEAF(psrc) ? 1u : 2u));
(unsigned)(IS_LEAF(psrc) ? 1 : MDBX_MINKEYS));
if (likely(src_nkeys)) { if (likely(src_nkeys)) {
unsigned j = dst_nkeys; unsigned j = dst_nkeys;
if (unlikely(pagetype & P_LEAF2)) { if (unlikely(pagetype & P_LEAF2)) {
@ -17061,8 +17055,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
return rc; return rc;
} }
mdbx_cassert(mc, mdbx_cassert(mc, nkeys + 1 >= (unsigned)(IS_BRANCH(mp) ? 4 : 2));
nkeys >= (unsigned)(IS_BRANCH(mp) ? MDBX_MINKEYS * 2 - 1 : 1));
mdbx_debug("-----> splitting %s page %" PRIaPGNO mdbx_debug("-----> splitting %s page %" PRIaPGNO
" and adding [%s] at index %i/%i", " and adding [%s] at index %i/%i",
IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, DKEY(newkey), IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, DKEY(newkey),
@ -18726,6 +18719,11 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
return rc; return rc;
} }
MDBX_env *env = txn->mt_env;
size_t len = strlen(table_name);
if (len > env->me_leaf_nodemax - NODESIZE - sizeof(MDBX_db))
return MDBX_EINVAL;
if (txn->mt_dbxs[MAIN_DBI].md_cmp == NULL) { if (txn->mt_dbxs[MAIN_DBI].md_cmp == NULL) {
txn->mt_dbxs[MAIN_DBI].md_cmp = txn->mt_dbxs[MAIN_DBI].md_cmp =
get_default_keycmp(txn->mt_dbs[MAIN_DBI].md_flags); get_default_keycmp(txn->mt_dbs[MAIN_DBI].md_flags);
@ -18734,7 +18732,6 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
} }
/* Is the DB already open? */ /* Is the DB already open? */
size_t len = strlen(table_name);
MDBX_dbi scan, slot; MDBX_dbi scan, slot;
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) { for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
if (!txn->mt_dbxs[scan].md_name.iov_len) { if (!txn->mt_dbxs[scan].md_name.iov_len) {
@ -18753,7 +18750,6 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
} }
/* Fail, if no free slot and max hit */ /* Fail, if no free slot and max hit */
MDBX_env *env = txn->mt_env;
if (unlikely(slot >= env->me_maxdbs)) { if (unlikely(slot >= env->me_maxdbs)) {
rc = MDBX_DBS_FULL; rc = MDBX_DBS_FULL;
goto early_bailout; goto early_bailout;
@ -20687,7 +20683,7 @@ __cold intptr_t mdbx_limits_dbsize_max(intptr_t pagesize) {
return -1; return -1;
STATIC_ASSERT(MAX_MAPSIZE < INTPTR_MAX); STATIC_ASSERT(MAX_MAPSIZE < INTPTR_MAX);
const uint64_t limit = MAX_PAGENO * (uint64_t)pagesize; const uint64_t limit = (1 + (uint64_t)MAX_PAGENO) * pagesize;
return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit
: (intptr_t)MAX_MAPSIZE; : (intptr_t)MAX_MAPSIZE;
} }
@ -20701,9 +20697,10 @@ __cold intptr_t mdbx_limits_txnsize_max(intptr_t pagesize) {
return -1; return -1;
STATIC_ASSERT(MAX_MAPSIZE < INTPTR_MAX); STATIC_ASSERT(MAX_MAPSIZE < INTPTR_MAX);
const uint64_t limit = pagesize * (uint64_t)(MDBX_PGL_LIMIT - 1); const uint64_t pgl_limit =
return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit pagesize * (uint64_t)(MDBX_PGL_LIMIT / 1.6180339887498948482);
: (intptr_t)MAX_MAPSIZE; const uint64_t map_limit = MAX_MAPSIZE / 1.6180339887498948482;
return (pgl_limit < map_limit) ? (intptr_t)pgl_limit : (intptr_t)map_limit;
} }
/*** Key-making functions to avoid custom comparators *************************/ /*** Key-making functions to avoid custom comparators *************************/

View File

@ -211,19 +211,6 @@ typedef union {
#endif #endif
} MDBX_atomic_uint64_t; } MDBX_atomic_uint64_t;
/* The minimum number of keys required in a database page.
* Setting this to a larger value will place a smaller bound on the
* maximum size of a data item. Data items larger than this size will
* be pushed into overflow pages instead of being stored directly in
* the B-tree node. This value used to default to 4. With a page size
* of 4096 bytes that meant that any item larger than 1024 bytes would
* go into an overflow page. That also meant that on average 2-3KB of
* each overflow page was wasted space. The value cannot be lower than
* 2 because then there would no longer be a tree structure. With this
* value, items larger than 2KB will go into overflow pages, and on
* average only 1KB will be wasted. */
#define MDBX_MINKEYS 2
/* A stamp that identifies a file as an MDBX file. /* A stamp that identifies a file as an MDBX file.
* There's nothing special about this value other than that it is easily * There's nothing special about this value other than that it is easily
* recognizable, and it will reflect any byte order mismatches. */ * recognizable, and it will reflect any byte order mismatches. */
@ -939,8 +926,9 @@ struct MDBX_env {
#define me_lfd me_lck_mmap.fd #define me_lfd me_lck_mmap.fd
#define me_lck me_lck_mmap.lck #define me_lck me_lck_mmap.lck
unsigned me_psize; /* DB page size, initialized from me_os_psize */ unsigned me_psize; /* DB page size, initialized from me_os_psize */
uint8_t me_psize2log; /* log2 of DB page size */ unsigned me_leaf_nodemax; /* max size of a leaf-node */
uint8_t me_psize2log; /* log2 of DB page size */
int8_t me_stuck_meta; /* recovery-only: target meta page or less that zero */ int8_t me_stuck_meta; /* recovery-only: target meta page or less that zero */
unsigned me_os_psize; /* OS page size, from mdbx_syspagesize() */ unsigned me_os_psize; /* OS page size, from mdbx_syspagesize() */
unsigned me_maxreaders; /* size of the reader table */ unsigned me_maxreaders; /* size of the reader table */
@ -975,9 +963,8 @@ struct MDBX_env {
MDBX_PNL me_retired_pages; MDBX_PNL me_retired_pages;
/* Number of freelist items that can fit in a single overflow page */ /* Number of freelist items that can fit in a single overflow page */
unsigned me_maxgc_ov1page; unsigned me_maxgc_ov1page;
unsigned me_branch_nodemax; /* max size of a branch-node */ uint32_t me_live_reader; /* have liveness lock in reader table */
uint32_t me_live_reader; /* have liveness lock in reader table */ void *me_userctx; /* User-settable context */
void *me_userctx; /* User-settable context */
MDBX_atomic_uint64_t *me_sync_timestamp; MDBX_atomic_uint64_t *me_sync_timestamp;
MDBX_atomic_uint64_t *me_autosync_period; MDBX_atomic_uint64_t *me_autosync_period;
atomic_pgno_t *me_unsynced_pages; atomic_pgno_t *me_unsynced_pages;
@ -1240,14 +1227,13 @@ static __maybe_unused __inline void mdbx_jitter4testing(bool tiny) {
#define DDBI(mc) \ #define DDBI(mc) \
(((mc)->mc_flags & C_SUB) ? -(int)(mc)->mc_dbi : (int)(mc)->mc_dbi) (((mc)->mc_flags & C_SUB) ? -(int)(mc)->mc_dbi : (int)(mc)->mc_dbi)
/* Key size which fits in a DKBUF. */ /* Key size which fits in a DKBUF (debug key buffer). */
#define DKBUF_MAXKEYSIZE 511 /* FIXME */ #define DKBUF_MAX 511
#if MDBX_DEBUG #if MDBX_DEBUG
#define DKBUF char _kbuf[DKBUF_MAXKEYSIZE * 4 + 2] #define DKBUF char _kbuf[DKBUF_MAX * 4 + 2]
#define DKEY(x) mdbx_dump_val(x, _kbuf, DKBUF_MAXKEYSIZE * 2 + 1) #define DKEY(x) mdbx_dump_val(x, _kbuf, DKBUF_MAX * 2 + 1)
#define DVAL(x) \ #define DVAL(x) mdbx_dump_val(x, _kbuf + DKBUF_MAX * 2 + 1, DKBUF_MAX * 2 + 1)
mdbx_dump_val(x, _kbuf + DKBUF_MAXKEYSIZE * 2 + 1, DKBUF_MAXKEYSIZE * 2 + 1)
#else #else
#define DKBUF ((void)(0)) #define DKBUF ((void)(0))
#define DKEY(x) ("-") #define DKEY(x) ("-")