mdbx: rework max key-length and limit API.

Change-Id: I3d783f69d4ea438d8a8a0505fa9163715fbdcf9c
This commit is contained in:
Leonid Yuriev 2019-11-24 19:04:21 +03:00
parent 9bd88d80d0
commit d80654fa07
8 changed files with 462 additions and 244 deletions

25
mdbx.h
View File

@ -2084,19 +2084,21 @@ __inline intptr_t mdbx_limits_pgsize_min(void) { return MDBX_MIN_PAGESIZE; }
__inline intptr_t mdbx_limits_pgsize_max(void) { return MDBX_MAX_PAGESIZE; } __inline intptr_t mdbx_limits_pgsize_max(void) { return MDBX_MAX_PAGESIZE; }
/* Returns minimal database size in bytes for given page size, /* Returns minimal database size in bytes for given page size,
* or the negative error code. */ * or -1 if pagesize is invalid. */
LIBMDBX_API intptr_t mdbx_limits_dbsize_min(intptr_t pagesize); LIBMDBX_API intptr_t mdbx_limits_dbsize_min(intptr_t pagesize);
/* Returns maximal database size in bytes for given page size, /* Returns maximal database size in bytes for given page size,
* or the negative error code. */ * or -1 if pagesize is invalid. */
LIBMDBX_API intptr_t mdbx_limits_dbsize_max(intptr_t pagesize); LIBMDBX_API intptr_t mdbx_limits_dbsize_max(intptr_t pagesize);
/* Returns maximal key size in bytes for given page size, /* Returns maximal key and data size in bytes for given page size
* or the negative error code. */ * and database flags (see mdbx_dbi_open_ex() description),
LIBMDBX_API intptr_t mdbx_limits_keysize_max(intptr_t pagesize); * or -1 if pagesize is invalid. */
LIBMDBX_API intptr_t mdbx_limits_keysize_max(intptr_t pagesize, unsigned flags);
LIBMDBX_API intptr_t mdbx_limits_valsize_max(intptr_t pagesize, unsigned flags);
/* Returns maximal write transaction size (i.e. limit for summary volume of /* Returns maximal write transaction size (i.e. limit for summary volume of
* dirty pages) in bytes for given page size, or the negative error code. */ * dirty pages) in bytes for given page size, or -1 if pagesize is invalid. */
LIBMDBX_API intptr_t mdbx_limits_txnsize_max(intptr_t pagesize); LIBMDBX_API intptr_t mdbx_limits_txnsize_max(intptr_t pagesize);
/* Set the maximum number of threads/reader slots for the environment. /* Set the maximum number of threads/reader slots for the environment.
@ -2150,11 +2152,16 @@ LIBMDBX_API int mdbx_env_get_maxreaders(MDBX_env *env, unsigned *readers);
* - MDBX_EPERM = the environment is already open. */ * - MDBX_EPERM = the environment is already open. */
LIBMDBX_API int mdbx_env_set_maxdbs(MDBX_env *env, MDBX_dbi dbs); LIBMDBX_API int mdbx_env_set_maxdbs(MDBX_env *env, MDBX_dbi dbs);
/* Get the maximum size of keys and MDBX_DUPSORT data we can write. /* Get the maximum size of keys and data we can write.
* *
* [in] env An environment handle returned by mdbx_env_create(). * [in] env An environment handle returned by mdbx_env_create().
* [in] flags Database options (MDBX_DUPSORT, MDBX_INTEGERKEY ans so on),
* see mdbx_dbi_open_ex() description.
* *
* Returns The maximum size of a key we can write. */ * Returns The maximum size of a key we can write,
* or -1 if something is wrong. */
LIBMDBX_API int mdbx_env_get_maxkeysize_ex(MDBX_env *env, unsigned flags);
LIBMDBX_API int mdbx_env_get_maxvalsize_ex(MDBX_env *env, unsigned flags);
LIBMDBX_API int mdbx_env_get_maxkeysize(MDBX_env *env); LIBMDBX_API int mdbx_env_get_maxkeysize(MDBX_env *env);
/* Set application information associated with the MDBX_env. /* Set application information associated with the MDBX_env.

View File

@ -250,10 +250,13 @@ static __pure_function __inline void *node_data(const MDBX_node *node) {
/* Size of a node in a leaf page with a given key and data. /* Size of a node in a leaf page with a given key and data.
* This is node header plus key plus data size. */ * This is node header plus key plus data size. */
static __pure_function __inline size_t node_size_len(const size_t key_len,
const size_t value_len) {
return NODESIZE + EVEN(key_len + value_len);
}
static __pure_function __inline size_t node_size(const MDBX_val *key, static __pure_function __inline size_t node_size(const MDBX_val *key,
const MDBX_val *value) { const MDBX_val *value) {
return NODESIZE + return node_size_len(key ? key->iov_len : 0, value ? value->iov_len : 0);
EVEN((key ? key->iov_len : 0) + (value ? value->iov_len : 0));
} }
static __pure_function __inline pgno_t peek_pgno(const void *ptr) { static __pure_function __inline pgno_t peek_pgno(const void *ptr) {
@ -283,23 +286,143 @@ node_largedata_pgno(const MDBX_node *node) {
return peek_pgno(node_data(node)); return peek_pgno(node_data(node));
} }
/*------------------------------------------------------------------------------
* Key length limitation factors:
*
* - Branch-page must contain at least two (MDBX_MINKEYS) nodes,
* within each a key and a child page number. But we can't split a page if
* it contains less that 4 keys. Therefore, at least 3 branch-node should
* fit in the single branch-page:
* pageroom = pagesize - page_hdr_len;
* branch.maxnode = even_floor(pageroom / 3 - sizeof(indx_t));
* branch.maxkey = branch.maxnode - node_hdr_len;
*
* - Leaf-node of non-dupsort database must fit into one leaf-page,
* where a value could be placed on a large/overflow page:
* leaf.maxnode = even_floor(pageroom - sizeof(indx_t));
* leaf.maxkey = leaf.maxnode - node_hdr_len - sizeof(pgno_t);
*
* - SubDatabase-node must fit into one leaf-page:
* subdb.maxname = leaf.maxnode - node_hdr_len - sizeof(MDBX_db);
*
* - Dupsort values itself are a keys in a dupsort-subdb and couldn't be
* longer than the branch.maxkey. But dupsort node must fit into one
* leaf-page, since dupsort value couldn't be placed on a large/overflow
* page.
*
* - So, the simpliest solution is to use half of branch.maxkey as
* a common maxkey value. Nevertheless, the actual values of maxkey are:
* nondupsort.maxkey = even_floor(pageroom / 3)
* - sizeof(indx_t) - node_hdr_len;
* dupsort.maxkey(value) = min(nondupsort.maxkey,
* leaf.maxnode - even_ceil(length(value)));
*/
#define PAGEROOM(pagesize) ((pagesize)-PAGEHDRSZ)
#define EVEN_FLOOR(n) ((n) & ~1ul)
#define BRANCH_NODEMAX(pagesize) \
(EVEN_FLOOR(PAGEROOM(pagesize) / (MDBX_MINKEYS * 2 - 1)) - sizeof(indx_t))
#define LEAF_NODEMAX(pagesize) (PAGEROOM(pagesize) - sizeof(indx_t))
#define MAX_GC1OVPAGE(pagesize) (PAGEROOM(pagesize) / sizeof(pgno_t) - 1)
__cold int mdbx_env_get_maxkeysize(MDBX_env *env) {
return mdbx_env_get_maxkeysize_ex(env, MDBX_DUPSORT);
}
__cold int mdbx_env_get_maxkeysize_ex(MDBX_env *env, unsigned flags) {
if (unlikely(!env || env->me_signature != MDBX_ME_SIGNATURE))
return -1;
return (int)mdbx_limits_keysize_max((intptr_t)env->me_psize, flags);
}
__cold intptr_t mdbx_limits_keysize_max(intptr_t pagesize, unsigned flags) {
if (pagesize < 1)
pagesize = (intptr_t)mdbx_syspagesize();
if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE ||
pagesize > (intptr_t)MAX_PAGESIZE ||
!is_powerof2((size_t)pagesize)))
return -1;
STATIC_ASSERT(BRANCH_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) >= 8);
STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) >= 8);
STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE >= sizeof(MDBX_db));
if (flags & MDBX_INTEGERKEY)
return 8 /* sizeof(uint64_t) */;
STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) - NODESIZE - sizeof(pgno_t) <
LEAF_NODEMAX(MAX_PAGESIZE) - NODESIZE - sizeof(MDBX_db));
STATIC_ASSERT(BRANCH_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) <
LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(MDBX_db));
if (flags &
(MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_INTEGERDUP | MDBX_REVERSEDUP))
return BRANCH_NODEMAX(pagesize) - NODESIZE - sizeof(MDBX_db);
return BRANCH_NODEMAX(pagesize) - NODESIZE - sizeof(pgno_t);
}
__cold int mdbx_env_get_maxvalsize_ex(MDBX_env *env, unsigned flags) {
if (unlikely(!env || env->me_signature != MDBX_ME_SIGNATURE))
return -1;
return (int)mdbx_limits_valsize_max((intptr_t)env->me_psize, flags);
}
__cold intptr_t mdbx_limits_valsize_max(intptr_t pagesize, unsigned flags) {
if (pagesize < 1)
pagesize = (intptr_t)mdbx_syspagesize();
if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE ||
pagesize > (intptr_t)MAX_PAGESIZE ||
!is_powerof2((size_t)pagesize)))
return -1;
if (flags & MDBX_INTEGERDUP)
return 8 /* sizeof(uint64_t) */;
if (flags &
(MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_INTEGERDUP | MDBX_REVERSEDUP))
return BRANCH_NODEMAX(pagesize) - NODESIZE;
const unsigned page_ln2 = log2n(pagesize);
const size_t hard = 0x7FF00000ul;
const size_t hard_pages = hard >> page_ln2;
const size_t limit = (hard_pages < MDBX_DPL_TXNFULL)
? hard
: ((size_t)MDBX_DPL_TXNFULL << page_ln2);
return (limit < MAX_MAPSIZE) ? limit / 2 : MAX_MAPSIZE / 2;
}
/* Calculate the size of a leaf node. /* Calculate the size of a leaf node.
* *
* The size depends on the environment's page size; if a data item * The size depends on the environment's page size; if a data item
* is too large it will be put onto an overflow page and the node * is too large it will be put onto an overflow page and the node
* size will only include the key and not the data. Sizes are always * size will only include the key and not the data. Sizes are always
* rounded up to an even number of bytes, to guarantee 2-byte alignment * rounded up to an even number of bytes, to guarantee 2-byte alignment
* of the MDBX_node headers. * of the MDBX_node headers. */
*
* [in] env The environment handle.
* [in] key The key for the node.
* [in] data The data for the node.
*
* Returns The number of bytes needed to store the node. */
static __pure_function __inline size_t static __pure_function __inline size_t
leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) { leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) {
size_t node_bytes = node_size(key, data); size_t node_bytes = node_size(key, data);
if (node_bytes > env->me_nodemax) { /* NOTE: The actual limit is LEAF_NODEMAX(env->me_psize), but it reasonable to
* use env->me_branch_nodemax (which is 3 times less) as the treshold because:
* - Large threshold implies that any insertion/update could result split
* a single leaf page to THREE, which requires TWO insertion into parent
* branch page, then could leads to split parent page and so on up to
* the root. Such double-splitting is complex, ie costly (in case simple
* clear implementation) either dangerous (in case high-optimized
* implementation).
* - This does not affect capabilities, i.e. it does not limit the maximum
* key size.
* - At a lower threshold, on average, the density of keys on leaf pages
* increases and the height of the tree decreases. Thus, this lead the
* less number of pages participating in the search, and the search
* speed increases.
* - On the other hand, there is no universal gold ratio here and with a
* smaller threshold, we will create more overflows/large pages,
* i.e. the database size will be larger as will the IOPS volume.
*
* So, the lower threshold is not a silver bullet, but it allow implementation
* to be much simple and robust, without adding a flaws. */
if (node_bytes > env->me_branch_nodemax) {
/* put on overflow page */ /* put on overflow page */
node_bytes = node_size(key, nullptr) + sizeof(pgno_t); node_bytes = node_size(key, nullptr) + sizeof(pgno_t);
} }
@ -324,7 +447,7 @@ static __pure_function __inline size_t branch_size(const MDBX_env *env,
/* Size of a node in a branch page with a given key. /* Size of a node in a branch page with a given key.
* This is just the node header plus the key, there is no data. */ * This is just the node header plus the key, there is no data. */
size_t node_bytes = node_size(key, nullptr); size_t node_bytes = node_size(key, nullptr);
if (unlikely(node_bytes > env->me_nodemax)) { if (unlikely(node_bytes > env->me_branch_nodemax)) {
/* put on overflow page */ /* put on overflow page */
/* not implemented */ /* not implemented */
mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __func__, mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __func__,
@ -5524,7 +5647,8 @@ static int mdbx_prep_backlog(MDBX_txn *txn, MDBX_cursor *mc) {
/* LY: extra page(s) for b-tree rebalancing */ /* LY: extra page(s) for b-tree rebalancing */
const int extra = const int extra =
mdbx_backlog_extragap(txn->mt_env) + mdbx_backlog_extragap(txn->mt_env) +
MDBX_PNL_SIZEOF(txn->tw.retired_pages) / txn->mt_env->me_maxkey_limit; MDBX_PNL_SIZEOF(txn->tw.retired_pages) / txn->mt_env->me_maxgc_ov1page +
1;
if (mdbx_backlog_size(txn) < mc->mc_db->md_depth + extra) { if (mdbx_backlog_size(txn) < mc->mc_db->md_depth + extra) {
mc->mc_flags &= ~C_RECLAIMING; mc->mc_flags &= ~C_RECLAIMING;
@ -7503,20 +7627,6 @@ fail:
return rc; return rc;
} }
int __cold mdbx_env_get_maxkeysize(MDBX_env *env) {
if (!env || env->me_signature != MDBX_ME_SIGNATURE || !env->me_maxkey_limit)
return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL;
return env->me_maxkey_limit;
}
#define mdbx_nodemax(pagesize) \
(((((pagesize)-PAGEHDRSZ) / MDBX_MINKEYS) & ~(uintptr_t)1) - sizeof(indx_t))
#define mdbx_maxkey(nodemax) ((nodemax)-NODESIZE - sizeof(MDBX_db))
#define mdbx_maxgc_ov1page(pagesize) \
(((pagesize)-PAGEHDRSZ) / sizeof(pgno_t) - 1)
static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) { static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) {
STATIC_ASSERT(PTRDIFF_MAX > MAX_MAPSIZE); STATIC_ASSERT(PTRDIFF_MAX > MAX_MAPSIZE);
STATIC_ASSERT(MIN_PAGESIZE > sizeof(MDBX_page) + sizeof(MDBX_meta)); STATIC_ASSERT(MIN_PAGESIZE > sizeof(MDBX_page) + sizeof(MDBX_meta));
@ -7525,28 +7635,32 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) {
mdbx_ensure(env, pagesize <= MAX_PAGESIZE); mdbx_ensure(env, pagesize <= MAX_PAGESIZE);
env->me_psize = (unsigned)pagesize; env->me_psize = (unsigned)pagesize;
STATIC_ASSERT(mdbx_maxgc_ov1page(MIN_PAGESIZE) > 4); STATIC_ASSERT(MAX_GC1OVPAGE(MIN_PAGESIZE) > 4);
STATIC_ASSERT(mdbx_maxgc_ov1page(MAX_PAGESIZE) < MDBX_DPL_TXNFULL); STATIC_ASSERT(MAX_GC1OVPAGE(MAX_PAGESIZE) < MDBX_DPL_TXNFULL);
const intptr_t maxgc_ov1page = (pagesize - PAGEHDRSZ) / sizeof(pgno_t) - 1; const intptr_t maxgc_ov1page = (pagesize - PAGEHDRSZ) / sizeof(pgno_t) - 1;
mdbx_ensure(env, mdbx_ensure(env,
maxgc_ov1page > 42 && maxgc_ov1page < (intptr_t)MDBX_DPL_TXNFULL); maxgc_ov1page > 42 && maxgc_ov1page < (intptr_t)MDBX_DPL_TXNFULL);
env->me_maxgc_ov1page = (unsigned)maxgc_ov1page; env->me_maxgc_ov1page = (unsigned)maxgc_ov1page;
STATIC_ASSERT(mdbx_nodemax(MIN_PAGESIZE) > 42); STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) > sizeof(MDBX_db) + NODESIZE + 42);
STATIC_ASSERT(mdbx_nodemax(MAX_PAGESIZE) < UINT16_MAX); STATIC_ASSERT(LEAF_NODEMAX(MAX_PAGESIZE) < UINT16_MAX);
const intptr_t nodemax = mdbx_nodemax(pagesize); STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) > BRANCH_NODEMAX(MIN_PAGESIZE));
mdbx_ensure(env, STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) > NODESIZE + 42);
nodemax > 42 && nodemax < (int)UINT16_MAX && nodemax % 2 == 0); STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) < UINT16_MAX);
env->me_nodemax = (unsigned)nodemax; const intptr_t branch_nodemax = BRANCH_NODEMAX(pagesize);
mdbx_ensure(env, branch_nodemax > 42 && branch_nodemax < (int)UINT16_MAX &&
STATIC_ASSERT(mdbx_maxkey(MIN_PAGESIZE) > 42); branch_nodemax % 2 == 0);
STATIC_ASSERT(mdbx_maxkey(MIN_PAGESIZE) < MIN_PAGESIZE); env->me_branch_nodemax = (unsigned)branch_nodemax;
STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) > 42); env->me_maxkey_nd = (uint16_t)mdbx_limits_keysize_max(env->me_psize, 0);
STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) < MAX_PAGESIZE); env->me_maxkey_ds =
const intptr_t maxkey_limit = mdbx_maxkey(env->me_nodemax); (uint16_t)mdbx_limits_keysize_max(env->me_psize, MDBX_DUPSORT);
mdbx_ensure(env, maxkey_limit > 42 && (size_t)maxkey_limit < pagesize && env->me_maxval_nd = (unsigned)mdbx_limits_valsize_max(env->me_psize, 0);
maxkey_limit % 2 == 0); env->me_maxval_ds =
env->me_maxkey_limit = (unsigned)maxkey_limit; (unsigned)mdbx_limits_valsize_max(env->me_psize, MDBX_DUPSORT);
mdbx_ensure(env, env->me_maxkey_nd ==
env->me_branch_nodemax - NODESIZE - sizeof(pgno_t));
mdbx_ensure(env, env->me_maxkey_ds ==
env->me_branch_nodemax - NODESIZE - sizeof(MDBX_db));
env->me_psize2log = log2n(pagesize); env->me_psize2log = log2n(pagesize);
mdbx_assert(env, pgno2bytes(env, 1) == pagesize); mdbx_assert(env, pgno2bytes(env, 1) == pagesize);
@ -10480,7 +10594,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
unsigned nflags; unsigned nflags;
DKBUF; DKBUF;
if (unlikely(mc == NULL || key == NULL)) if (unlikely(mc == NULL || key == NULL || data == NULL))
return MDBX_EINVAL; return MDBX_EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
@ -10514,31 +10628,34 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
if (unlikely(mc->mc_txn->mt_flags & (MDBX_RDONLY | MDBX_TXN_BLOCKED))) if (unlikely(mc->mc_txn->mt_flags & (MDBX_RDONLY | MDBX_TXN_BLOCKED)))
return (mc->mc_txn->mt_flags & MDBX_RDONLY) ? MDBX_EACCESS : MDBX_BAD_TXN; return (mc->mc_txn->mt_flags & MDBX_RDONLY) ? MDBX_EACCESS : MDBX_BAD_TXN;
if (unlikely(key->iov_len > env->me_maxkey_limit)) if ((mc->mc_flags & C_SUB) == 0) {
return MDBX_BAD_VALSIZE; if (unlikely(key->iov_len > (size_t)((mc->mc_db->md_flags & MDBX_DUPSORT)
? env->me_maxkey_ds
: env->me_maxkey_nd) ||
data->iov_len > ((mc->mc_db->md_flags & MDBX_DUPSORT)
? env->me_maxval_ds
: env->me_maxval_nd))) {
return MDBX_BAD_VALSIZE;
}
if (unlikely(data->iov_len > ((mc->mc_db->md_flags & MDBX_DUPSORT) if ((mc->mc_db->md_flags & MDBX_INTEGERKEY) &&
? env->me_maxkey_limit unlikely(key->iov_len != sizeof(uint32_t) &&
: MDBX_MAXDATASIZE))) key->iov_len != sizeof(uint64_t))) {
return MDBX_BAD_VALSIZE; mdbx_cassert(mc, !"key-size is invalid for MDBX_INTEGERKEY");
return MDBX_BAD_VALSIZE;
}
if ((mc->mc_db->md_flags & MDBX_INTEGERKEY) && if ((mc->mc_db->md_flags & MDBX_INTEGERDUP) &&
unlikely(key->iov_len != sizeof(uint32_t) && unlikely(data->iov_len != sizeof(uint32_t) &&
key->iov_len != sizeof(uint64_t))) { data->iov_len != sizeof(uint64_t))) {
mdbx_cassert(mc, !"key-size is invalid for MDBX_INTEGERKEY"); mdbx_cassert(mc, !"data-size is invalid MDBX_INTEGERDUP");
return MDBX_BAD_VALSIZE; return MDBX_BAD_VALSIZE;
} }
if ((mc->mc_db->md_flags & MDBX_INTEGERDUP) &&
unlikely(data->iov_len != sizeof(uint32_t) &&
data->iov_len != sizeof(uint64_t))) {
mdbx_cassert(mc, !"data-size is invalid MDBX_INTEGERDUP");
return MDBX_BAD_VALSIZE;
} }
mdbx_debug("==> put db %d key [%s], size %" PRIuPTR mdbx_debug("==> put db %d key [%s], size %" PRIuPTR
", data [%s] size %" PRIuPTR, ", data [%s] size %" PRIuPTR,
DDBI(mc), DKEY(key), key ? key->iov_len : 0, DDBI(mc), DKEY(key), key->iov_len,
DVAL((flags & MDBX_RESERVE) ? nullptr : data), data->iov_len); DVAL((flags & MDBX_RESERVE) ? nullptr : data), data->iov_len);
int dupdata_flag = 0; int dupdata_flag = 0;
@ -10571,7 +10688,9 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
return rc; return rc;
flags -= MDBX_CURRENT; flags -= MDBX_CURRENT;
} }
} else if (unlikely(node_size(key, data) > env->me_nodemax)) { } else if (unlikely(node_size(key, data) >
/* See note inside leaf_size() */
env->me_branch_nodemax)) {
rc = mdbx_cursor_del(mc, 0); rc = mdbx_cursor_del(mc, 0);
if (rc != MDBX_SUCCESS) if (rc != MDBX_SUCCESS)
return rc; return rc;
@ -10658,12 +10777,13 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
/* The key does not exist */ /* The key does not exist */
mdbx_debug("inserting key at index %i", mc->mc_ki[mc->mc_top]); mdbx_debug("inserting key at index %i", mc->mc_ki[mc->mc_top]);
if ((mc->mc_db->md_flags & MDBX_DUPSORT) && if ((mc->mc_db->md_flags & MDBX_DUPSORT) &&
node_size(key, data) > env->me_nodemax) { node_size(key, data) >
/* See note inside leaf_size() */ env->me_branch_nodemax) {
/* Too big for a node, insert in sub-DB. Set up an empty /* Too big for a node, insert in sub-DB. Set up an empty
* "old sub-page" for prep_subDB to expand to a full page. */ * "old sub-page" for prep_subDB to expand to a full page. */
fp_flags = P_LEAF | P_DIRTY; fp_flags = P_LEAF | P_DIRTY;
fp = env->me_pbuf; fp = env->me_pbuf;
fp->mp_leaf2_ksize = (uint16_t)data->iov_len; /* used if MDBX_DUPFIXED */ fp->mp_leaf2_ksize = (uint16_t)data->iov_len /* used if MDBX_DUPFIXED */;
fp->mp_lower = fp->mp_upper = 0; fp->mp_lower = fp->mp_upper = 0;
olddata.iov_len = PAGEHDRSZ; olddata.iov_len = PAGEHDRSZ;
goto prep_subDB; goto prep_subDB;
@ -10717,7 +10837,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
/* overflow page overwrites need special handling */ /* overflow page overwrites need special handling */
if (unlikely(F_ISSET(node_flags(node), F_BIGDATA))) { if (unlikely(F_ISSET(node_flags(node), F_BIGDATA))) {
int level, ovpages, int level, ovpages,
dpages = (node_size(key, data) > env->me_nodemax) dpages = (node_size(key, data) >
/* See note inside leaf_size() */ env->me_branch_nodemax)
? number_of_ovpages(env, data->iov_len) ? number_of_ovpages(env, data->iov_len)
: 0; : 0;
@ -10809,7 +10930,10 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
/* Just overwrite the current item */ /* Just overwrite the current item */
if (flags & MDBX_CURRENT) { if (flags & MDBX_CURRENT) {
mdbx_cassert(mc, node_size(key, data) <= env->me_nodemax); mdbx_cassert(
mc,
node_size(key, data) <=
/* See note inside leaf_size() */ env->me_branch_nodemax);
goto current; goto current;
} }
@ -10818,7 +10942,10 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
if (unlikely(flags & (MDBX_NODUPDATA | MDBX_APPENDDUP))) if (unlikely(flags & (MDBX_NODUPDATA | MDBX_APPENDDUP)))
return MDBX_KEYEXIST; return MDBX_KEYEXIST;
/* overwrite it */ /* overwrite it */
mdbx_cassert(mc, node_size(key, data) <= env->me_nodemax); mdbx_cassert(
mc,
node_size(key, data) <=
/* See note inside leaf_size() */ env->me_branch_nodemax);
goto current; goto current;
} }
@ -10875,7 +11002,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
} }
fp_flags = fp->mp_flags; fp_flags = fp->mp_flags;
if (NODESIZE + node_ks(node) + xdata.iov_len > env->me_nodemax) { if (NODESIZE + node_ks(node) + xdata.iov_len >
/* See note inside leaf_size() */ env->me_branch_nodemax) {
/* Too big for a sub-page, convert to sub-DB */ /* Too big for a sub-page, convert to sub-DB */
fp_flags &= ~P_SUBP; fp_flags &= ~P_SUBP;
prep_subDB: prep_subDB:
@ -10884,9 +11012,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
if (mc->mc_db->md_flags & MDBX_DUPFIXED) { if (mc->mc_db->md_flags & MDBX_DUPFIXED) {
fp_flags |= P_LEAF2; fp_flags |= P_LEAF2;
nested_dupdb.md_xsize = fp->mp_leaf2_ksize; nested_dupdb.md_xsize = fp->mp_leaf2_ksize;
nested_dupdb.md_flags = MDBX_DUPFIXED;
if (mc->mc_db->md_flags & MDBX_INTEGERDUP) if (mc->mc_db->md_flags & MDBX_INTEGERDUP)
nested_dupdb.md_flags |= MDBX_INTEGERKEY; nested_dupdb.md_flags = MDBX_INTEGERKEY;
} }
nested_dupdb.md_depth = 1; nested_dupdb.md_depth = 1;
nested_dupdb.md_branch_pages = 0; nested_dupdb.md_branch_pages = 0;
@ -11373,7 +11500,9 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
/* Data already on overflow page. */ /* Data already on overflow page. */
STATIC_ASSERT(sizeof(pgno_t) % 2 == 0); STATIC_ASSERT(sizeof(pgno_t) % 2 == 0);
leaf_bytes = node_size(key, nullptr) + sizeof(pgno_t) + sizeof(indx_t); leaf_bytes = node_size(key, nullptr) + sizeof(pgno_t) + sizeof(indx_t);
} else if (unlikely(node_size(key, data) > mc->mc_txn->mt_env->me_nodemax)) { } else if (unlikely(node_size(key, data) >
/* See note inside leaf_size() */
mc->mc_txn->mt_env->me_branch_nodemax)) {
/* Put data on overflow page. */ /* Put data on overflow page. */
mdbx_cassert(mc, !F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)); mdbx_cassert(mc, !F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT));
const pgno_t ovpages = number_of_ovpages(mc->mc_txn->mt_env, data->iov_len); const pgno_t ovpages = number_of_ovpages(mc->mc_txn->mt_env, data->iov_len);
@ -11612,10 +11741,9 @@ static int mdbx_xcursor_init1(MDBX_cursor *mc, MDBX_node *node) {
mx->mx_cursor.mc_pg[0] = fp; mx->mx_cursor.mc_pg[0] = fp;
mx->mx_cursor.mc_ki[0] = 0; mx->mx_cursor.mc_ki[0] = 0;
if (mc->mc_db->md_flags & MDBX_DUPFIXED) { if (mc->mc_db->md_flags & MDBX_DUPFIXED) {
mx->mx_db.md_flags = MDBX_DUPFIXED;
mx->mx_db.md_xsize = fp->mp_leaf2_ksize; mx->mx_db.md_xsize = fp->mp_leaf2_ksize;
if (mc->mc_db->md_flags & MDBX_INTEGERDUP) if (mc->mc_db->md_flags & MDBX_INTEGERDUP)
mx->mx_db.md_flags |= MDBX_INTEGERKEY; mx->mx_db.md_flags = MDBX_INTEGERKEY;
} }
} }
mdbx_debug("Sub-db -%u root page %" PRIaPGNO, mx->mx_cursor.mc_dbi, mdbx_debug("Sub-db -%u root page %" PRIaPGNO, mx->mx_cursor.mc_dbi,
@ -12250,6 +12378,8 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
/* Move all nodes from src to dst */ /* Move all nodes from src to dst */
const unsigned dst_nkeys = page_numkeys(pdst); const unsigned dst_nkeys = page_numkeys(pdst);
const unsigned src_nkeys = page_numkeys(psrc); const unsigned src_nkeys = page_numkeys(psrc);
mdbx_cassert(cdst, dst_nkeys + src_nkeys >=
(unsigned)(IS_LEAF(psrc) ? 1 : MDBX_MINKEYS));
if (likely(src_nkeys)) { if (likely(src_nkeys)) {
unsigned j = dst_nkeys; unsigned j = dst_nkeys;
if (unlikely(pagetype & P_LEAF2)) { if (unlikely(pagetype & P_LEAF2)) {
@ -12278,20 +12408,15 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
rc = mdbx_page_search_lowest(&mn); rc = mdbx_page_search_lowest(&mn);
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
if (IS_LEAF2(mn.mc_pg[mn.mc_top])) { MDBX_node *lowest = page_node(mn.mc_pg[mn.mc_top], 0);
key.iov_len = mn.mc_db->md_xsize; key.iov_len = node_ks(lowest);
key.iov_base = page_leaf2key(mn.mc_pg[mn.mc_top], 0, key.iov_len); key.iov_base = node_key(lowest);
} else {
MDBX_node *lowest = page_node(mn.mc_pg[mn.mc_top], 0);
key.iov_len = node_ks(lowest);
key.iov_base = node_key(lowest);
const size_t dst_room = page_room(pdst); const size_t dst_room = page_room(pdst);
const size_t src_used = page_used(cdst->mc_txn->mt_env, psrc); const size_t src_used = page_used(cdst->mc_txn->mt_env, psrc);
const size_t space_needed = src_used - node_ks(srcnode) + key.iov_len; const size_t space_needed = src_used - node_ks(srcnode) + key.iov_len;
if (unlikely(space_needed > dst_room)) if (unlikely(space_needed > dst_room))
return MDBX_RESULT_TRUE; return MDBX_RESULT_TRUE;
}
} }
/* Mark dst as dirty. */ /* Mark dst as dirty. */
@ -12399,6 +12524,8 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
return MDBX_SUCCESS; return MDBX_SUCCESS;
} }
mdbx_cassert(cdst, page_numkeys(top_page) == dst_nkeys + src_nkeys);
if (pagetype != PAGETYPE(top_page)) { if (pagetype != PAGETYPE(top_page)) {
/* LY: LEAF-page becomes BRANCH, unable restore cursor's stack */ /* LY: LEAF-page becomes BRANCH, unable restore cursor's stack */
goto bailout; goto bailout;
@ -12461,8 +12588,6 @@ bailout:
* [in] csrc The cursor to copy from. * [in] csrc The cursor to copy from.
* [out] cdst The cursor to copy to. */ * [out] cdst The cursor to copy to. */
static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) { static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) {
unsigned i;
mdbx_cassert(csrc, mdbx_cassert(csrc,
csrc->mc_txn->mt_txnid >= *csrc->mc_txn->mt_env->me_oldest); csrc->mc_txn->mt_txnid >= *csrc->mc_txn->mt_env->me_oldest);
cdst->mc_txn = csrc->mc_txn; cdst->mc_txn = csrc->mc_txn;
@ -12473,7 +12598,7 @@ static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) {
cdst->mc_top = csrc->mc_top; cdst->mc_top = csrc->mc_top;
cdst->mc_flags = csrc->mc_flags; cdst->mc_flags = csrc->mc_flags;
for (i = 0; i < csrc->mc_snum; i++) { for (unsigned i = 0; i < csrc->mc_snum; i++) {
cdst->mc_pg[i] = csrc->mc_pg[i]; cdst->mc_pg[i] = csrc->mc_pg[i];
cdst->mc_ki[i] = csrc->mc_ki[i]; cdst->mc_ki[i] = csrc->mc_ki[i];
} }
@ -12634,68 +12759,106 @@ static int mdbx_rebalance(MDBX_cursor *mc) {
mdbx_cassert(mc, PAGETYPE(right) == PAGETYPE(mc->mc_pg[mc->mc_top])); mdbx_cassert(mc, PAGETYPE(right) == PAGETYPE(mc->mc_pg[mc->mc_top]));
} }
int ki = mc->mc_ki[mc->mc_top]; const indx_t ki_top = mc->mc_ki[mc->mc_top];
bool fromleft; const indx_t ki_pre_top = mn.mc_ki[pre_top];
if (!left || (right && page_room(left) < page_room(right))) { const indx_t nkeys = (indx_t)page_numkeys(mn.mc_pg[mn.mc_top]);
mdbx_debug("merging %s neighbor", "right"); if (left && page_room(left) > spaceleft_threshold &&
mn.mc_pg[mn.mc_top] = right; (!right || page_room(right) < page_room(left))) {
mn.mc_ki[pre_top] += 1; /* try merge with left */
mn.mc_ki[mn.mc_top] = 0; mdbx_cassert(mc, page_numkeys(left) >= minkeys);
mc->mc_ki[mc->mc_top] = (indx_t)page_numkeys(mc->mc_pg[mc->mc_top]);
fromleft = false;
} else {
mdbx_debug("merging %s neighbor", "left");
mn.mc_pg[mn.mc_top] = left; mn.mc_pg[mn.mc_top] = left;
mn.mc_ki[pre_top] -= 1; mn.mc_ki[mn.mc_top - 1] = ki_pre_top - 1;
mn.mc_ki[mn.mc_top] = (indx_t)page_numkeys(mn.mc_pg[mn.mc_top]) - 1; mn.mc_ki[mn.mc_top] = (indx_t)(page_numkeys(left) - 1);
mc->mc_ki[mc->mc_top] = 0; mc->mc_ki[mc->mc_top] = 0;
fromleft = true; const indx_t new_ki = (indx_t)(ki_top + page_numkeys(left));
} mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1;
/* We want mdbx_rebalance to find mn when doing fixups */
mdbx_debug("found neighbor page %" PRIaPGNO " (%u keys, %.1f%% full)", WITH_CURSOR_TRACKING(mn, rc = mdbx_page_merge(mc, &mn));
mn.mc_pg[mn.mc_top]->mp_pgno, page_numkeys(mn.mc_pg[mn.mc_top]), if (likely(rc != MDBX_RESULT_TRUE)) {
page_fill(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top])); mdbx_cursor_copy(&mn, mc);
mc->mc_ki[mc->mc_top] = new_ki;
/* If the neighbor page is above threshold and has enough keys, mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys);
* move one key from it. Otherwise we should try to merge them.
* (A branch page must never have less than 2 keys.) */
if (page_fill_enough(mn.mc_pg[mn.mc_top], spaceleft_threshold, minkeys + 1)) {
rc = mdbx_node_move(&mn, mc, fromleft);
if (likely(rc == MDBX_SUCCESS))
ki += fromleft /* if we inserted on left, bump position up */;
else if (unlikely(rc != MDBX_RESULT_TRUE))
return rc; return rc;
mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) ||
PAGETYPE(mc->mc_pg[mc->mc_top]) == pagetype);
mdbx_cassert(mc, mc->mc_snum < mc->mc_db->md_depth ||
IS_LEAF(mc->mc_pg[mc->mc_db->md_depth - 1]));
} else {
if (!fromleft) {
rc = mdbx_page_merge(&mn, mc);
if (unlikely(MDBX_IS_ERROR(rc)))
return rc;
mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) ||
PAGETYPE(mc->mc_pg[mc->mc_top]) == pagetype);
mdbx_cassert(mc, mc->mc_snum < mc->mc_db->md_depth ||
IS_LEAF(mc->mc_pg[mc->mc_db->md_depth - 1]));
} else {
int new_ki = ki + page_numkeys(mn.mc_pg[mn.mc_top]);
mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1;
/* We want mdbx_rebalance to find mn when doing fixups */
WITH_CURSOR_TRACKING(mn, rc = mdbx_page_merge(mc, &mn));
if (likely(rc == MDBX_SUCCESS)) {
ki = new_ki;
mdbx_cursor_copy(&mn, mc);
} else if (unlikely(rc != MDBX_RESULT_TRUE))
return rc;
mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) ||
PAGETYPE(mc->mc_pg[mc->mc_top]) == pagetype);
mdbx_cassert(mc, mc->mc_snum < mc->mc_db->md_depth ||
IS_LEAF(mc->mc_pg[mc->mc_db->md_depth - 1]));
} }
} }
mc->mc_ki[mc->mc_top] = (indx_t)ki; if (right && page_room(right) > spaceleft_threshold) {
return MDBX_SUCCESS; /* try merge with right */
mdbx_cassert(mc, page_numkeys(right) >= minkeys);
mn.mc_pg[mn.mc_top] = right;
mn.mc_ki[mn.mc_top - 1] = ki_pre_top + 1;
mn.mc_ki[mn.mc_top] = 0;
mc->mc_ki[mc->mc_top] = nkeys;
rc = mdbx_page_merge(&mn, mc);
if (likely(rc != MDBX_RESULT_TRUE)) {
mc->mc_ki[mc->mc_top] = ki_top;
mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys);
return rc;
}
}
if (left && page_numkeys(left) > minkeys &&
(!right || page_numkeys(right) <= minkeys ||
page_room(right) > page_room(left))) {
/* try move from left */
mn.mc_pg[mn.mc_top] = left;
mn.mc_ki[mn.mc_top - 1] = ki_pre_top - 1;
mn.mc_ki[mn.mc_top] = (indx_t)(page_numkeys(left) - 1);
mc->mc_ki[mc->mc_top] = 0;
rc = mdbx_node_move(&mn, mc, true);
if (likely(rc != MDBX_RESULT_TRUE)) {
mc->mc_ki[mc->mc_top] = ki_top + 1;
mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys);
return rc;
}
}
if (right && page_numkeys(right) > minkeys) {
/* try move from right */
mn.mc_pg[mn.mc_top] = right;
mn.mc_ki[mn.mc_top - 1] = ki_pre_top + 1;
mn.mc_ki[mn.mc_top] = 0;
mc->mc_ki[mc->mc_top] = nkeys;
rc = mdbx_node_move(&mn, mc, false);
if (likely(rc != MDBX_RESULT_TRUE)) {
mc->mc_ki[mc->mc_top] = ki_top;
mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys);
return rc;
}
}
if (nkeys >= minkeys)
return MDBX_SUCCESS;
if (left && (!right || page_room(left) > page_room(right))) {
/* try merge with left */
mdbx_cassert(mc, page_numkeys(left) >= minkeys);
mn.mc_pg[mn.mc_top] = left;
mn.mc_ki[mn.mc_top - 1] = ki_pre_top - 1;
mn.mc_ki[mn.mc_top] = (indx_t)(page_numkeys(left) - 1);
mc->mc_ki[mc->mc_top] = 0;
const indx_t new_ki = (indx_t)(ki_top + page_numkeys(left));
mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1;
/* We want mdbx_rebalance to find mn when doing fixups */
WITH_CURSOR_TRACKING(mn, rc = mdbx_page_merge(mc, &mn));
if (likely(rc != MDBX_RESULT_TRUE)) {
mdbx_cursor_copy(&mn, mc);
mc->mc_ki[mc->mc_top] = new_ki;
mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys);
return rc;
}
} else {
/* try merge with right */
mdbx_cassert(mc, page_numkeys(right) >= minkeys);
mn.mc_pg[mn.mc_top] = right;
mn.mc_ki[mn.mc_top - 1] = ki_pre_top + 1;
mn.mc_ki[mn.mc_top] = 0;
mc->mc_ki[mc->mc_top] = nkeys;
rc = mdbx_page_merge(&mn, mc);
if (likely(rc != MDBX_RESULT_TRUE)) {
mc->mc_ki[mc->mc_top] = ki_top;
mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys);
return rc;
}
}
return MDBX_PROBLEM;
} }
static __cold int mdbx_page_check(MDBX_env *env, const MDBX_page *const mp, static __cold int mdbx_page_check(MDBX_env *env, const MDBX_page *const mp,
@ -13104,7 +13267,6 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
pgno_t pgno = 0; pgno_t pgno = 0;
unsigned i, ptop; unsigned i, ptop;
MDBX_env *env = mc->mc_txn->mt_env; MDBX_env *env = mc->mc_txn->mt_env;
MDBX_node *node;
MDBX_val sepkey, rkey, xdata; MDBX_val sepkey, rkey, xdata;
MDBX_page *copy = NULL; MDBX_page *copy = NULL;
MDBX_page *rp, *pp; MDBX_page *rp, *pp;
@ -13120,6 +13282,8 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
return rc; return rc;
} }
mdbx_cassert(mc,
nkeys >= (unsigned)(IS_BRANCH(mp) ? MDBX_MINKEYS * 2 - 1 : 1));
mdbx_debug("-----> splitting %s page %" PRIaPGNO mdbx_debug("-----> splitting %s page %" PRIaPGNO
" and adding [%s] at index %i/%i", " and adding [%s] at index %i/%i",
IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, DKEY(newkey), IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, DKEY(newkey),
@ -13184,10 +13348,9 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
split_indx = (nkeys + 1) / 2; split_indx = (nkeys + 1) / 2;
if (IS_LEAF2(rp)) { if (IS_LEAF2(rp)) {
char *split, *ins; char *split, *ins;
int x;
unsigned lsize, rsize, ksize; unsigned lsize, rsize, ksize;
/* Move half of the keys to the right sibling */ /* Move half of the keys to the right sibling */
x = mc->mc_ki[mc->mc_top] - split_indx; const int x = mc->mc_ki[mc->mc_top] - split_indx;
ksize = mc->mc_db->md_xsize; ksize = mc->mc_db->md_xsize;
split = page_leaf2key(mp, split_indx, ksize); split = page_leaf2key(mp, split_indx, ksize);
rsize = (nkeys - split_indx) * ksize; rsize = (nkeys - split_indx) * ksize;
@ -13231,11 +13394,10 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
mc->mc_ki[mc->mc_top] = (indx_t)x; mc->mc_ki[mc->mc_top] = (indx_t)x;
} }
} else { } else {
size_t psize, nsize, k;
/* Maximum free space in an empty page */ /* Maximum free space in an empty page */
const unsigned pmax = page_space(env); const unsigned pmax = page_space(env);
nsize = IS_LEAF(mp) ? leaf_size(env, newkey, newdata) const size_t nsize = IS_LEAF(mp) ? leaf_size(env, newkey, newdata)
: branch_size(env, newkey); : branch_size(env, newkey);
/* grab a page to hold a temporary copy */ /* grab a page to hold a temporary copy */
copy = mdbx_page_malloc(mc->mc_txn, 1); copy = mdbx_page_malloc(mc->mc_txn, 1);
@ -13268,47 +13430,48 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
* As a final tweak, if the new item goes on the last * As a final tweak, if the new item goes on the last
* spot on the page (and thus, onto the new page), bias * spot on the page (and thus, onto the new page), bias
* the split so the new page is emptier than the old page. * the split so the new page is emptier than the old page.
* This yields better packing during sequential inserts. * This yields better packing during sequential inserts. */
*/
int dir;
if (nkeys < 32 || nsize > pmax / 16 || newindx >= nkeys) { if (nkeys < 32 || nsize > pmax / 16 || newindx >= nkeys) {
/* Find split point */ /* Find split point */
psize = 0; int dir;
size_t psize = 0;
unsigned k;
if (newindx <= split_indx || newindx >= nkeys) { if (newindx <= split_indx || newindx >= nkeys) {
i = 0; i = 0;
dir = 1; dir = 1;
k = (newindx >= nkeys) ? nkeys : split_indx + 1 + IS_LEAF(mp); k = (newindx >= nkeys) ? nkeys : split_indx + 1 + IS_LEAF(mp);
split_indx = k - 1;
} else { } else {
i = nkeys; i = nkeys;
dir = -1; dir = -1;
k = split_indx - 1; k = split_indx - 1;
split_indx += 1;
} }
for (; i != k; i += dir) { do {
if (i == newindx) { if (i == newindx) {
psize += nsize; psize += nsize;
node = NULL;
} else { } else {
node = (MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ); MDBX_node *node =
(MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ);
psize += NODESIZE + node_ks(node) + sizeof(indx_t); psize += NODESIZE + node_ks(node) + sizeof(indx_t);
if (IS_LEAF(mp)) { if (IS_LEAF(mp))
if (F_ISSET(node_flags(node), F_BIGDATA)) psize += F_ISSET(node_flags(node), F_BIGDATA) ? sizeof(pgno_t)
psize += sizeof(pgno_t); : node_ds(node);
else
psize += node_ds(node);
}
psize = EVEN(psize); psize = EVEN(psize);
} }
if (psize > pmax || i == k - dir) { if (psize > pmax) {
split_indx = i + (dir < 0); split_indx = i + (dir < 0);
break; break;
} }
} i += dir;
} while (i != k);
} }
if (split_indx == newindx) { if (split_indx == newindx) {
sepkey.iov_len = newkey->iov_len; sepkey.iov_len = newkey->iov_len;
sepkey.iov_base = newkey->iov_base; sepkey.iov_base = newkey->iov_base;
} else { } else {
node = MDBX_node *node =
(MDBX_node *)((char *)mp + copy->mp_ptrs[split_indx] + PAGEHDRSZ); (MDBX_node *)((char *)mp + copy->mp_ptrs[split_indx] + PAGEHDRSZ);
sepkey.iov_len = node_ks(node); sepkey.iov_len = node_ks(node);
sepkey.iov_base = node_key(node); sepkey.iov_base = node_key(node);
@ -13421,7 +13584,8 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
/* Update index for the new key. */ /* Update index for the new key. */
mc->mc_ki[mc->mc_top] = n; mc->mc_ki[mc->mc_top] = n;
} else { } else {
node = (MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ); MDBX_node *node =
(MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ);
rkey.iov_base = node_key(node); rkey.iov_base = node_key(node);
rkey.iov_len = node_ks(node); rkey.iov_len = node_ks(node);
if (IS_LEAF(mp)) { if (IS_LEAF(mp)) {
@ -13492,7 +13656,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
} }
} }
if (nflags & MDBX_RESERVE) { if (nflags & MDBX_RESERVE) {
node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); MDBX_node *node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (!(node_flags(node) & F_BIGDATA)) if (!(node_flags(node) & F_BIGDATA))
newdata->iov_base = node_data(node); newdata->iov_base = node_data(node);
} }
@ -16510,24 +16674,13 @@ int mdbx_dbi_sequence(MDBX_txn *txn, MDBX_dbi dbi, uint64_t *result,
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
__cold intptr_t mdbx_limits_keysize_max(intptr_t pagesize) {
if (pagesize < 1)
pagesize = (intptr_t)mdbx_syspagesize();
else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE ||
pagesize > (intptr_t)MAX_PAGESIZE ||
!is_powerof2((size_t)pagesize)))
return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL;
return mdbx_maxkey(mdbx_nodemax(pagesize));
}
__cold intptr_t mdbx_limits_dbsize_min(intptr_t pagesize) { __cold intptr_t mdbx_limits_dbsize_min(intptr_t pagesize) {
if (pagesize < 1) if (pagesize < 1)
pagesize = (intptr_t)mdbx_syspagesize(); pagesize = (intptr_t)mdbx_syspagesize();
else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE ||
pagesize > (intptr_t)MAX_PAGESIZE || pagesize > (intptr_t)MAX_PAGESIZE ||
!is_powerof2((size_t)pagesize))) !is_powerof2((size_t)pagesize)))
return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; return -1;
return MIN_PAGENO * pagesize; return MIN_PAGENO * pagesize;
} }
@ -16538,7 +16691,7 @@ __cold intptr_t mdbx_limits_dbsize_max(intptr_t pagesize) {
else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE ||
pagesize > (intptr_t)MAX_PAGESIZE || pagesize > (intptr_t)MAX_PAGESIZE ||
!is_powerof2((size_t)pagesize))) !is_powerof2((size_t)pagesize)))
return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; return -1;
const uint64_t limit = MAX_PAGENO * (uint64_t)pagesize; const uint64_t limit = MAX_PAGENO * (uint64_t)pagesize;
return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit
@ -16551,9 +16704,11 @@ __cold intptr_t mdbx_limits_txnsize_max(intptr_t pagesize) {
else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE ||
pagesize > (intptr_t)MAX_PAGESIZE || pagesize > (intptr_t)MAX_PAGESIZE ||
!is_powerof2((size_t)pagesize))) !is_powerof2((size_t)pagesize)))
return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; return -1;
return pagesize * (MDBX_DPL_TXNFULL - 1); const uint64_t limit = pagesize * (uint64_t)(MDBX_DPL_TXNFULL - 1);
return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit
: (intptr_t)MAX_MAPSIZE;
} }
/*** Attribute support functions for Nexenta **********************************/ /*** Attribute support functions for Nexenta **********************************/

View File

@ -644,7 +644,7 @@ typedef union MDBX_DP {
* elements are in the array. */ * elements are in the array. */
typedef MDBX_DP *MDBX_DPL; typedef MDBX_DP *MDBX_DPL;
/* PNL sizes - likely should be even bigger */ /* PNL sizes */
#define MDBX_PNL_GRANULATE 1024 #define MDBX_PNL_GRANULATE 1024
#define MDBX_PNL_INITIAL \ #define MDBX_PNL_INITIAL \
(MDBX_PNL_GRANULATE - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(pgno_t)) (MDBX_PNL_GRANULATE - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(pgno_t))
@ -917,11 +917,11 @@ struct MDBX_env {
MDBX_DPL me_dirtylist; MDBX_DPL me_dirtylist;
/* Number of freelist items that can fit in a single overflow page */ /* Number of freelist items that can fit in a single overflow page */
unsigned me_maxgc_ov1page; unsigned me_maxgc_ov1page;
/* Max size of a node on a page */ unsigned me_branch_nodemax; /* max size of a branch-node */
unsigned me_nodemax; uint16_t me_maxkey_nd, me_maxkey_ds;
unsigned me_maxkey_limit; /* max size of a key */ unsigned me_maxval_nd, me_maxval_ds;
uint32_t me_live_reader; /* have liveness lock in reader table */ uint32_t me_live_reader; /* have liveness lock in reader table */
void *me_userctx; /* User-settable context */ void *me_userctx; /* User-settable context */
volatile uint64_t *me_sync_timestamp; volatile uint64_t *me_sync_timestamp;
volatile uint64_t *me_autosync_period; volatile uint64_t *me_autosync_period;
volatile pgno_t *me_unsynced_pages; volatile pgno_t *me_unsynced_pages;

View File

@ -53,7 +53,7 @@ bool testcase_append::run() {
} }
log_trace("append: append-a %" PRIu64, serial); log_trace("append: append-a %" PRIu64, serial);
generate_pair(serial, key, data); generate_pair(serial);
int cmp = inserted_number ? mdbx_cmp(txn_guard.get(), dbi, &key->value, int cmp = inserted_number ? mdbx_cmp(txn_guard.get(), dbi, &key->value,
&last_key->value) &last_key->value)
: 1; : 1;

View File

@ -589,10 +589,7 @@ unsigned actor_params::mdbx_keylen_min() const {
} }
unsigned actor_params::mdbx_keylen_max() const { unsigned actor_params::mdbx_keylen_max() const {
return (table_flags & MDBX_INTEGERKEY) return (unsigned)mdbx_limits_keysize_max(pagesize, table_flags);
? 8
: std::min((unsigned)mdbx_limits_keysize_max(pagesize),
(unsigned)UINT16_MAX);
} }
unsigned actor_params::mdbx_datalen_min() const { unsigned actor_params::mdbx_datalen_min() const {
@ -600,10 +597,6 @@ unsigned actor_params::mdbx_datalen_min() const {
} }
unsigned actor_params::mdbx_datalen_max() const { unsigned actor_params::mdbx_datalen_max() const {
return (table_flags & MDBX_INTEGERDUP) return std::min((unsigned)UINT16_MAX,
? 8 (unsigned)mdbx_limits_valsize_max(pagesize, table_flags));
: std::min((table_flags & MDBX_DUPSORT)
? (unsigned)mdbx_limits_keysize_max(pagesize)
: (unsigned)MDBX_MAXDATASIZE,
(unsigned)UINT16_MAX);
} }

View File

@ -72,7 +72,7 @@ serial_t injective(const serial_t serial,
} }
void __hot maker::pair(serial_t serial, const buffer &key, buffer &value, void __hot maker::pair(serial_t serial, const buffer &key, buffer &value,
serial_t value_age) { serial_t value_age, const bool keylen_changeable) {
assert(mapping.width >= serial_minwith && mapping.width <= serial_maxwith); assert(mapping.width >= serial_minwith && mapping.width <= serial_maxwith);
assert(mapping.split <= mapping.width); assert(mapping.split <= mapping.width);
assert(mapping.mesh <= mapping.width); assert(mapping.mesh <= mapping.width);
@ -131,11 +131,62 @@ void __hot maker::pair(serial_t serial, const buffer &key, buffer &value,
log_trace("keygen-pair: key %" PRIu64 ", value %" PRIu64, key_serial, log_trace("keygen-pair: key %" PRIu64 ", value %" PRIu64, key_serial,
value_serial); value_serial);
mk(key_serial, key_essentials, *key); mk_begin(key_serial, key_essentials, *key);
mk(value_serial, value_essentials, *value); mk_begin(value_serial, value_essentials, *value);
#if 0 /* unused for now */
if (key->value.iov_len + value->value.iov_len > pair_maxlen) {
unsigned extra = key->value.iov_len + value->value.iov_len - pair_maxlen;
if (keylen_changeable &&
key->value.iov_len > std::max(8u, (unsigned)key_essentials.minlen)) {
#if defined(__GNUC__) || defined(__clang__)
const bool coin = __builtin_parityll(serial) != 0;
#else
const bool coin = INT64_C(0xF2CEECA9989BD96A) * int64_t(serial) < 0;
#endif
if (coin) {
const unsigned gap =
key->value.iov_len - std::max(8u, (unsigned)key_essentials.minlen);
const unsigned chop = std::min(gap, extra);
log_trace("keygen-pair: chop %u key-len %u -> %u", chop,
(unsigned)key->value.iov_len,
(unsigned)key->value.iov_len - chop);
key->value.iov_len -= chop;
extra -= chop;
}
}
if (extra && value->value.iov_len >
std::max(8u, (unsigned)value_essentials.minlen)) {
const unsigned gap = value->value.iov_len -
std::max(8u, (unsigned)value_essentials.minlen);
const unsigned chop = std::min(gap, extra);
log_trace("keygen-pair: chop %u value-len %u -> %u", chop,
(unsigned)value->value.iov_len,
(unsigned)value->value.iov_len - chop);
value->value.iov_len -= chop;
extra -= chop;
}
if (keylen_changeable && extra &&
key->value.iov_len > std::max(8u, (unsigned)key_essentials.minlen)) {
const unsigned gap =
key->value.iov_len - std::max(8u, (unsigned)key_essentials.minlen);
const unsigned chop = std::min(gap, extra);
log_trace("keygen-pair: chop %u key-len %u -> %u", chop,
(unsigned)key->value.iov_len,
(unsigned)key->value.iov_len - chop);
key->value.iov_len -= chop;
extra -= chop;
}
}
#else
(void)keylen_changeable;
#endif /* unused for now */
mk_continue(key_serial, key_essentials, *key);
mk_continue(value_serial, value_essentials, *value);
if (log_enabled(logging::trace)) { if (log_enabled(logging::trace)) {
char dump_key[128], dump_value[128]; char dump_key[4096], dump_value[4096];
log_trace("keygen-pair: key %s, value %s", log_trace("keygen-pair: key %s, value %s",
mdbx_dump_val(&key->value, dump_key, sizeof(dump_key)), mdbx_dump_val(&key->value, dump_key, sizeof(dump_key)),
mdbx_dump_val(&value->value, dump_value, sizeof(dump_value))); mdbx_dump_val(&value->value, dump_value, sizeof(dump_value)));
@ -146,19 +197,22 @@ void maker::setup(const config::actor_params_pod &actor, unsigned actor_id,
unsigned thread_number) { unsigned thread_number) {
key_essentials.flags = key_essentials.flags =
actor.table_flags & (MDBX_INTEGERKEY | MDBX_REVERSEKEY | MDBX_DUPSORT); actor.table_flags & (MDBX_INTEGERKEY | MDBX_REVERSEKEY | MDBX_DUPSORT);
assert(actor.keylen_min <= UINT8_MAX); assert(actor.keylen_min <= UINT16_MAX);
key_essentials.minlen = (uint8_t)actor.keylen_min; key_essentials.minlen = (uint16_t)actor.keylen_min;
assert(actor.keylen_max <= UINT16_MAX); assert(actor.keylen_max <= UINT32_MAX);
key_essentials.maxlen = (uint16_t)actor.keylen_max; key_essentials.maxlen = std::min(
(uint32_t)actor.keylen_max,
(uint32_t)mdbx_limits_keysize_max(actor.pagesize, key_essentials.flags));
value_essentials.flags = value_essentials.flags =
actor.table_flags & (MDBX_INTEGERDUP | MDBX_REVERSEDUP); actor.table_flags & (MDBX_INTEGERDUP | MDBX_REVERSEDUP);
assert(actor.datalen_min <= UINT8_MAX); assert(actor.datalen_min <= UINT16_MAX);
value_essentials.minlen = (uint8_t)actor.datalen_min; value_essentials.minlen = (uint16_t)actor.datalen_min;
assert(actor.datalen_max <= UINT16_MAX); assert(actor.datalen_max <= UINT32_MAX);
value_essentials.maxlen = (uint16_t)actor.datalen_max; value_essentials.maxlen = std::min(
(uint32_t)actor.datalen_max,
(uint32_t)mdbx_limits_valsize_max(actor.pagesize, key_essentials.flags));
assert(thread_number < 2);
(void)thread_number; (void)thread_number;
mapping = actor.keygen; mapping = actor.keygen;
salt = (actor.keygen.seed + actor_id) * UINT64_C(14653293970879851569); salt = (actor.keygen.seed + actor_id) * UINT64_C(14653293970879851569);
@ -226,18 +280,25 @@ buffer alloc(size_t limit) {
return buffer(ptr); return buffer(ptr);
} }
void __hot maker::mk(const serial_t serial, const essentials &params, void __hot maker::mk_begin(const serial_t serial, const essentials &params,
result &out) { result &out) {
assert(out.limit >= params.maxlen); assert(out.limit >= params.maxlen);
assert(params.maxlen >= params.minlen); assert(params.maxlen >= params.minlen);
assert(params.maxlen >= length(serial)); assert(params.maxlen >= length(serial));
out.value.iov_base = out.bytes;
out.value.iov_len = out.value.iov_len =
(params.maxlen > params.minlen) (params.maxlen > params.minlen)
? params.minlen + serial % (params.maxlen - params.minlen) ? params.minlen + serial % (params.maxlen - params.minlen)
: params.minlen; : params.minlen;
if ((params.flags & (MDBX_INTEGERKEY | MDBX_INTEGERDUP)) == 0 &&
out.value.iov_len < 8)
out.value.iov_len = std::max(length(serial), out.value.iov_len);
}
void __hot maker::mk_continue(const serial_t serial, const essentials &params,
result &out) {
out.value.iov_base = out.bytes;
if (params.flags & (MDBX_INTEGERKEY | MDBX_INTEGERDUP)) { if (params.flags & (MDBX_INTEGERKEY | MDBX_INTEGERDUP)) {
assert(params.maxlen == params.minlen); assert(params.maxlen == params.minlen);
assert(params.minlen == 4 || params.minlen == 8); assert(params.minlen == 4 || params.minlen == 8);
@ -251,17 +312,13 @@ void __hot maker::mk(const serial_t serial, const essentials &params,
unaligned::store(out.bytes + out.value.iov_len - 8, htobe64(serial)); unaligned::store(out.bytes + out.value.iov_len - 8, htobe64(serial));
} else { } else {
out.u64 = htobe64(serial); out.u64 = htobe64(serial);
if (out.value.iov_len < 8) { if (out.value.iov_len < 8)
out.value.iov_len = std::max(length(serial), out.value.iov_len);
out.value.iov_base = out.bytes + 8 - out.value.iov_len; out.value.iov_base = out.bytes + 8 - out.value.iov_len;
}
} }
} else { } else {
out.u64 = htole64(serial); out.u64 = htole64(serial);
if (out.value.iov_len > 8) if (out.value.iov_len > 8)
memset(out.bytes + 8, '\0', out.value.iov_len - 8); memset(out.bytes + 8, '\0', out.value.iov_len - 8);
else
out.value.iov_len = std::max(length(serial), out.value.iov_len);
} }
assert(out.value.iov_len >= params.minlen); assert(out.value.iov_len >= params.minlen);

View File

@ -107,18 +107,25 @@ class maker {
serial_t salt; serial_t salt;
struct essentials { struct essentials {
uint8_t minlen; uint16_t minlen;
uint8_t flags; uint16_t flags;
uint16_t maxlen; uint32_t maxlen;
} key_essentials, value_essentials; } key_essentials, value_essentials;
static void mk(const serial_t serial, const essentials &params, result &out); static void mk_begin(const serial_t serial, const essentials &params,
result &out);
static void mk_continue(const serial_t serial, const essentials &params,
result &out);
static void mk(const serial_t serial, const essentials &params, result &out) {
mk_begin(serial, params, out);
mk_continue(serial, params, out);
}
public: public:
maker() { memset(this, 0, sizeof(*this)); } maker() { memset(this, 0, sizeof(*this)); }
void pair(serial_t serial, const buffer &key, buffer &value, void pair(serial_t serial, const buffer &key, buffer &value,
serial_t value_age); serial_t value_age, const bool keylen_changeable);
void setup(const config::actor_params_pod &actor, unsigned actor_id, void setup(const config::actor_params_pod &actor, unsigned actor_id,
unsigned thread_number); unsigned thread_number);
void make_ordered(); void make_ordered();

View File

@ -197,13 +197,12 @@ protected:
bool should_continue(bool check_timeout_only = false) const; bool should_continue(bool check_timeout_only = false) const;
void generate_pair(const keygen::serial_t serial, keygen::buffer &out_key, void generate_pair(const keygen::serial_t serial, keygen::buffer &out_key,
keygen::buffer &out_value, keygen::serial_t data_age = 0) { keygen::buffer &out_value, keygen::serial_t data_age) {
keyvalue_maker.pair(serial, out_key, out_value, data_age); keyvalue_maker.pair(serial, out_key, out_value, data_age, false);
} }
void generate_pair(const keygen::serial_t serial, void generate_pair(const keygen::serial_t serial) {
keygen::serial_t data_age = 0) { keyvalue_maker.pair(serial, key, data, 0, true);
generate_pair(serial, key, data, data_age);
} }
bool mode_readonly() const { bool mode_readonly() const {