From d80654fa07f2d80d77cb7238e1c74f873e227ef3 Mon Sep 17 00:00:00 2001 From: Leonid Yuriev Date: Sun, 24 Nov 2019 19:04:21 +0300 Subject: [PATCH] mdbx: rework max key-length and limit API. Change-Id: I3d783f69d4ea438d8a8a0505fa9163715fbdcf9c --- mdbx.h | 25 +- src/elements/core.c | 529 +++++++++++++++++++++++++-------------- src/elements/internals.h | 12 +- test/append.cc | 2 +- test/config.cc | 13 +- test/keygen.cc | 99 ++++++-- test/keygen.h | 17 +- test/test.h | 9 +- 8 files changed, 462 insertions(+), 244 deletions(-) diff --git a/mdbx.h b/mdbx.h index 6a06dfb1..84dc9d66 100644 --- a/mdbx.h +++ b/mdbx.h @@ -2084,19 +2084,21 @@ __inline intptr_t mdbx_limits_pgsize_min(void) { return MDBX_MIN_PAGESIZE; } __inline intptr_t mdbx_limits_pgsize_max(void) { return MDBX_MAX_PAGESIZE; } /* Returns minimal database size in bytes for given page size, - * or the negative error code. */ + * or -1 if pagesize is invalid. */ LIBMDBX_API intptr_t mdbx_limits_dbsize_min(intptr_t pagesize); /* Returns maximal database size in bytes for given page size, - * or the negative error code. */ + * or -1 if pagesize is invalid. */ LIBMDBX_API intptr_t mdbx_limits_dbsize_max(intptr_t pagesize); -/* Returns maximal key size in bytes for given page size, - * or the negative error code. */ -LIBMDBX_API intptr_t mdbx_limits_keysize_max(intptr_t pagesize); +/* Returns maximal key and data size in bytes for given page size + * and database flags (see mdbx_dbi_open_ex() description), + * or -1 if pagesize is invalid. */ +LIBMDBX_API intptr_t mdbx_limits_keysize_max(intptr_t pagesize, unsigned flags); +LIBMDBX_API intptr_t mdbx_limits_valsize_max(intptr_t pagesize, unsigned flags); /* Returns maximal write transaction size (i.e. limit for summary volume of - * dirty pages) in bytes for given page size, or the negative error code. */ + * dirty pages) in bytes for given page size, or -1 if pagesize is invalid. */ LIBMDBX_API intptr_t mdbx_limits_txnsize_max(intptr_t pagesize); /* Set the maximum number of threads/reader slots for the environment. @@ -2150,11 +2152,16 @@ LIBMDBX_API int mdbx_env_get_maxreaders(MDBX_env *env, unsigned *readers); * - MDBX_EPERM = the environment is already open. */ LIBMDBX_API int mdbx_env_set_maxdbs(MDBX_env *env, MDBX_dbi dbs); -/* Get the maximum size of keys and MDBX_DUPSORT data we can write. +/* Get the maximum size of keys and data we can write. * - * [in] env An environment handle returned by mdbx_env_create(). + * [in] env An environment handle returned by mdbx_env_create(). + * [in] flags Database options (MDBX_DUPSORT, MDBX_INTEGERKEY ans so on), + * see mdbx_dbi_open_ex() description. * - * Returns The maximum size of a key we can write. */ + * Returns The maximum size of a key we can write, + * or -1 if something is wrong. */ +LIBMDBX_API int mdbx_env_get_maxkeysize_ex(MDBX_env *env, unsigned flags); +LIBMDBX_API int mdbx_env_get_maxvalsize_ex(MDBX_env *env, unsigned flags); LIBMDBX_API int mdbx_env_get_maxkeysize(MDBX_env *env); /* Set application information associated with the MDBX_env. diff --git a/src/elements/core.c b/src/elements/core.c index 118e4b9e..0f807f0a 100644 --- a/src/elements/core.c +++ b/src/elements/core.c @@ -250,10 +250,13 @@ static __pure_function __inline void *node_data(const MDBX_node *node) { /* Size of a node in a leaf page with a given key and data. * This is node header plus key plus data size. */ +static __pure_function __inline size_t node_size_len(const size_t key_len, + const size_t value_len) { + return NODESIZE + EVEN(key_len + value_len); +} static __pure_function __inline size_t node_size(const MDBX_val *key, const MDBX_val *value) { - return NODESIZE + - EVEN((key ? key->iov_len : 0) + (value ? value->iov_len : 0)); + return node_size_len(key ? key->iov_len : 0, value ? value->iov_len : 0); } static __pure_function __inline pgno_t peek_pgno(const void *ptr) { @@ -283,23 +286,143 @@ node_largedata_pgno(const MDBX_node *node) { return peek_pgno(node_data(node)); } +/*------------------------------------------------------------------------------ + * Key length limitation factors: + * + * - Branch-page must contain at least two (MDBX_MINKEYS) nodes, + * within each a key and a child page number. But we can't split a page if + * it contains less that 4 keys. Therefore, at least 3 branch-node should + * fit in the single branch-page: + * pageroom = pagesize - page_hdr_len; + * branch.maxnode = even_floor(pageroom / 3 - sizeof(indx_t)); + * branch.maxkey = branch.maxnode - node_hdr_len; + * + * - Leaf-node of non-dupsort database must fit into one leaf-page, + * where a value could be placed on a large/overflow page: + * leaf.maxnode = even_floor(pageroom - sizeof(indx_t)); + * leaf.maxkey = leaf.maxnode - node_hdr_len - sizeof(pgno_t); + * + * - SubDatabase-node must fit into one leaf-page: + * subdb.maxname = leaf.maxnode - node_hdr_len - sizeof(MDBX_db); + * + * - Dupsort values itself are a keys in a dupsort-subdb and couldn't be + * longer than the branch.maxkey. But dupsort node must fit into one + * leaf-page, since dupsort value couldn't be placed on a large/overflow + * page. + * + * - So, the simpliest solution is to use half of branch.maxkey as + * a common maxkey value. Nevertheless, the actual values of maxkey are: + * nondupsort.maxkey = even_floor(pageroom / 3) + * - sizeof(indx_t) - node_hdr_len; + * dupsort.maxkey(value) = min(nondupsort.maxkey, + * leaf.maxnode - even_ceil(length(value))); + */ + +#define PAGEROOM(pagesize) ((pagesize)-PAGEHDRSZ) +#define EVEN_FLOOR(n) ((n) & ~1ul) +#define BRANCH_NODEMAX(pagesize) \ + (EVEN_FLOOR(PAGEROOM(pagesize) / (MDBX_MINKEYS * 2 - 1)) - sizeof(indx_t)) +#define LEAF_NODEMAX(pagesize) (PAGEROOM(pagesize) - sizeof(indx_t)) +#define MAX_GC1OVPAGE(pagesize) (PAGEROOM(pagesize) / sizeof(pgno_t) - 1) + +__cold int mdbx_env_get_maxkeysize(MDBX_env *env) { + return mdbx_env_get_maxkeysize_ex(env, MDBX_DUPSORT); +} + +__cold int mdbx_env_get_maxkeysize_ex(MDBX_env *env, unsigned flags) { + if (unlikely(!env || env->me_signature != MDBX_ME_SIGNATURE)) + return -1; + + return (int)mdbx_limits_keysize_max((intptr_t)env->me_psize, flags); +} + +__cold intptr_t mdbx_limits_keysize_max(intptr_t pagesize, unsigned flags) { + if (pagesize < 1) + pagesize = (intptr_t)mdbx_syspagesize(); + if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || + pagesize > (intptr_t)MAX_PAGESIZE || + !is_powerof2((size_t)pagesize))) + return -1; + + STATIC_ASSERT(BRANCH_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) >= 8); + STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) >= 8); + STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE >= sizeof(MDBX_db)); + if (flags & MDBX_INTEGERKEY) + return 8 /* sizeof(uint64_t) */; + + STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) - NODESIZE - sizeof(pgno_t) < + LEAF_NODEMAX(MAX_PAGESIZE) - NODESIZE - sizeof(MDBX_db)); + STATIC_ASSERT(BRANCH_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(pgno_t) < + LEAF_NODEMAX(MIN_PAGESIZE) - NODESIZE - sizeof(MDBX_db)); + if (flags & + (MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_INTEGERDUP | MDBX_REVERSEDUP)) + return BRANCH_NODEMAX(pagesize) - NODESIZE - sizeof(MDBX_db); + + return BRANCH_NODEMAX(pagesize) - NODESIZE - sizeof(pgno_t); +} + +__cold int mdbx_env_get_maxvalsize_ex(MDBX_env *env, unsigned flags) { + if (unlikely(!env || env->me_signature != MDBX_ME_SIGNATURE)) + return -1; + + return (int)mdbx_limits_valsize_max((intptr_t)env->me_psize, flags); +} + +__cold intptr_t mdbx_limits_valsize_max(intptr_t pagesize, unsigned flags) { + if (pagesize < 1) + pagesize = (intptr_t)mdbx_syspagesize(); + if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || + pagesize > (intptr_t)MAX_PAGESIZE || + !is_powerof2((size_t)pagesize))) + return -1; + + if (flags & MDBX_INTEGERDUP) + return 8 /* sizeof(uint64_t) */; + + if (flags & + (MDBX_DUPSORT | MDBX_DUPFIXED | MDBX_INTEGERDUP | MDBX_REVERSEDUP)) + return BRANCH_NODEMAX(pagesize) - NODESIZE; + + const unsigned page_ln2 = log2n(pagesize); + const size_t hard = 0x7FF00000ul; + const size_t hard_pages = hard >> page_ln2; + const size_t limit = (hard_pages < MDBX_DPL_TXNFULL) + ? hard + : ((size_t)MDBX_DPL_TXNFULL << page_ln2); + return (limit < MAX_MAPSIZE) ? limit / 2 : MAX_MAPSIZE / 2; +} + /* Calculate the size of a leaf node. * * The size depends on the environment's page size; if a data item * is too large it will be put onto an overflow page and the node * size will only include the key and not the data. Sizes are always * rounded up to an even number of bytes, to guarantee 2-byte alignment - * of the MDBX_node headers. - * - * [in] env The environment handle. - * [in] key The key for the node. - * [in] data The data for the node. - * - * Returns The number of bytes needed to store the node. */ + * of the MDBX_node headers. */ static __pure_function __inline size_t leaf_size(const MDBX_env *env, const MDBX_val *key, const MDBX_val *data) { size_t node_bytes = node_size(key, data); - if (node_bytes > env->me_nodemax) { + /* NOTE: The actual limit is LEAF_NODEMAX(env->me_psize), but it reasonable to + * use env->me_branch_nodemax (which is 3 times less) as the treshold because: + * - Large threshold implies that any insertion/update could result split + * a single leaf page to THREE, which requires TWO insertion into parent + * branch page, then could leads to split parent page and so on up to + * the root. Such double-splitting is complex, ie costly (in case simple + * clear implementation) either dangerous (in case high-optimized + * implementation). + * - This does not affect capabilities, i.e. it does not limit the maximum + * key size. + * - At a lower threshold, on average, the density of keys on leaf pages + * increases and the height of the tree decreases. Thus, this lead the + * less number of pages participating in the search, and the search + * speed increases. + * - On the other hand, there is no universal gold ratio here and with a + * smaller threshold, we will create more overflows/large pages, + * i.e. the database size will be larger as will the IOPS volume. + * + * So, the lower threshold is not a silver bullet, but it allow implementation + * to be much simple and robust, without adding a flaws. */ + if (node_bytes > env->me_branch_nodemax) { /* put on overflow page */ node_bytes = node_size(key, nullptr) + sizeof(pgno_t); } @@ -324,7 +447,7 @@ static __pure_function __inline size_t branch_size(const MDBX_env *env, /* Size of a node in a branch page with a given key. * This is just the node header plus the key, there is no data. */ size_t node_bytes = node_size(key, nullptr); - if (unlikely(node_bytes > env->me_nodemax)) { + if (unlikely(node_bytes > env->me_branch_nodemax)) { /* put on overflow page */ /* not implemented */ mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __func__, @@ -5524,7 +5647,8 @@ static int mdbx_prep_backlog(MDBX_txn *txn, MDBX_cursor *mc) { /* LY: extra page(s) for b-tree rebalancing */ const int extra = mdbx_backlog_extragap(txn->mt_env) + - MDBX_PNL_SIZEOF(txn->tw.retired_pages) / txn->mt_env->me_maxkey_limit; + MDBX_PNL_SIZEOF(txn->tw.retired_pages) / txn->mt_env->me_maxgc_ov1page + + 1; if (mdbx_backlog_size(txn) < mc->mc_db->md_depth + extra) { mc->mc_flags &= ~C_RECLAIMING; @@ -7503,20 +7627,6 @@ fail: return rc; } -int __cold mdbx_env_get_maxkeysize(MDBX_env *env) { - if (!env || env->me_signature != MDBX_ME_SIGNATURE || !env->me_maxkey_limit) - return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; - return env->me_maxkey_limit; -} - -#define mdbx_nodemax(pagesize) \ - (((((pagesize)-PAGEHDRSZ) / MDBX_MINKEYS) & ~(uintptr_t)1) - sizeof(indx_t)) - -#define mdbx_maxkey(nodemax) ((nodemax)-NODESIZE - sizeof(MDBX_db)) - -#define mdbx_maxgc_ov1page(pagesize) \ - (((pagesize)-PAGEHDRSZ) / sizeof(pgno_t) - 1) - static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) { STATIC_ASSERT(PTRDIFF_MAX > MAX_MAPSIZE); STATIC_ASSERT(MIN_PAGESIZE > sizeof(MDBX_page) + sizeof(MDBX_meta)); @@ -7525,28 +7635,32 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) { mdbx_ensure(env, pagesize <= MAX_PAGESIZE); env->me_psize = (unsigned)pagesize; - STATIC_ASSERT(mdbx_maxgc_ov1page(MIN_PAGESIZE) > 4); - STATIC_ASSERT(mdbx_maxgc_ov1page(MAX_PAGESIZE) < MDBX_DPL_TXNFULL); + STATIC_ASSERT(MAX_GC1OVPAGE(MIN_PAGESIZE) > 4); + STATIC_ASSERT(MAX_GC1OVPAGE(MAX_PAGESIZE) < MDBX_DPL_TXNFULL); const intptr_t maxgc_ov1page = (pagesize - PAGEHDRSZ) / sizeof(pgno_t) - 1; mdbx_ensure(env, maxgc_ov1page > 42 && maxgc_ov1page < (intptr_t)MDBX_DPL_TXNFULL); env->me_maxgc_ov1page = (unsigned)maxgc_ov1page; - STATIC_ASSERT(mdbx_nodemax(MIN_PAGESIZE) > 42); - STATIC_ASSERT(mdbx_nodemax(MAX_PAGESIZE) < UINT16_MAX); - const intptr_t nodemax = mdbx_nodemax(pagesize); - mdbx_ensure(env, - nodemax > 42 && nodemax < (int)UINT16_MAX && nodemax % 2 == 0); - env->me_nodemax = (unsigned)nodemax; - - STATIC_ASSERT(mdbx_maxkey(MIN_PAGESIZE) > 42); - STATIC_ASSERT(mdbx_maxkey(MIN_PAGESIZE) < MIN_PAGESIZE); - STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) > 42); - STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) < MAX_PAGESIZE); - const intptr_t maxkey_limit = mdbx_maxkey(env->me_nodemax); - mdbx_ensure(env, maxkey_limit > 42 && (size_t)maxkey_limit < pagesize && - maxkey_limit % 2 == 0); - env->me_maxkey_limit = (unsigned)maxkey_limit; + STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) > sizeof(MDBX_db) + NODESIZE + 42); + STATIC_ASSERT(LEAF_NODEMAX(MAX_PAGESIZE) < UINT16_MAX); + STATIC_ASSERT(LEAF_NODEMAX(MIN_PAGESIZE) > BRANCH_NODEMAX(MIN_PAGESIZE)); + STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) > NODESIZE + 42); + STATIC_ASSERT(BRANCH_NODEMAX(MAX_PAGESIZE) < UINT16_MAX); + const intptr_t branch_nodemax = BRANCH_NODEMAX(pagesize); + mdbx_ensure(env, branch_nodemax > 42 && branch_nodemax < (int)UINT16_MAX && + branch_nodemax % 2 == 0); + env->me_branch_nodemax = (unsigned)branch_nodemax; + env->me_maxkey_nd = (uint16_t)mdbx_limits_keysize_max(env->me_psize, 0); + env->me_maxkey_ds = + (uint16_t)mdbx_limits_keysize_max(env->me_psize, MDBX_DUPSORT); + env->me_maxval_nd = (unsigned)mdbx_limits_valsize_max(env->me_psize, 0); + env->me_maxval_ds = + (unsigned)mdbx_limits_valsize_max(env->me_psize, MDBX_DUPSORT); + mdbx_ensure(env, env->me_maxkey_nd == + env->me_branch_nodemax - NODESIZE - sizeof(pgno_t)); + mdbx_ensure(env, env->me_maxkey_ds == + env->me_branch_nodemax - NODESIZE - sizeof(MDBX_db)); env->me_psize2log = log2n(pagesize); mdbx_assert(env, pgno2bytes(env, 1) == pagesize); @@ -10480,7 +10594,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, unsigned nflags; DKBUF; - if (unlikely(mc == NULL || key == NULL)) + if (unlikely(mc == NULL || key == NULL || data == NULL)) return MDBX_EINVAL; if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) @@ -10514,31 +10628,34 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, if (unlikely(mc->mc_txn->mt_flags & (MDBX_RDONLY | MDBX_TXN_BLOCKED))) return (mc->mc_txn->mt_flags & MDBX_RDONLY) ? MDBX_EACCESS : MDBX_BAD_TXN; - if (unlikely(key->iov_len > env->me_maxkey_limit)) - return MDBX_BAD_VALSIZE; + if ((mc->mc_flags & C_SUB) == 0) { + if (unlikely(key->iov_len > (size_t)((mc->mc_db->md_flags & MDBX_DUPSORT) + ? env->me_maxkey_ds + : env->me_maxkey_nd) || + data->iov_len > ((mc->mc_db->md_flags & MDBX_DUPSORT) + ? env->me_maxval_ds + : env->me_maxval_nd))) { + return MDBX_BAD_VALSIZE; + } - if (unlikely(data->iov_len > ((mc->mc_db->md_flags & MDBX_DUPSORT) - ? env->me_maxkey_limit - : MDBX_MAXDATASIZE))) - return MDBX_BAD_VALSIZE; + if ((mc->mc_db->md_flags & MDBX_INTEGERKEY) && + unlikely(key->iov_len != sizeof(uint32_t) && + key->iov_len != sizeof(uint64_t))) { + mdbx_cassert(mc, !"key-size is invalid for MDBX_INTEGERKEY"); + return MDBX_BAD_VALSIZE; + } - if ((mc->mc_db->md_flags & MDBX_INTEGERKEY) && - unlikely(key->iov_len != sizeof(uint32_t) && - key->iov_len != sizeof(uint64_t))) { - mdbx_cassert(mc, !"key-size is invalid for MDBX_INTEGERKEY"); - return MDBX_BAD_VALSIZE; - } - - if ((mc->mc_db->md_flags & MDBX_INTEGERDUP) && - unlikely(data->iov_len != sizeof(uint32_t) && - data->iov_len != sizeof(uint64_t))) { - mdbx_cassert(mc, !"data-size is invalid MDBX_INTEGERDUP"); - return MDBX_BAD_VALSIZE; + if ((mc->mc_db->md_flags & MDBX_INTEGERDUP) && + unlikely(data->iov_len != sizeof(uint32_t) && + data->iov_len != sizeof(uint64_t))) { + mdbx_cassert(mc, !"data-size is invalid MDBX_INTEGERDUP"); + return MDBX_BAD_VALSIZE; + } } mdbx_debug("==> put db %d key [%s], size %" PRIuPTR ", data [%s] size %" PRIuPTR, - DDBI(mc), DKEY(key), key ? key->iov_len : 0, + DDBI(mc), DKEY(key), key->iov_len, DVAL((flags & MDBX_RESERVE) ? nullptr : data), data->iov_len); int dupdata_flag = 0; @@ -10571,7 +10688,9 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, return rc; flags -= MDBX_CURRENT; } - } else if (unlikely(node_size(key, data) > env->me_nodemax)) { + } else if (unlikely(node_size(key, data) > + /* See note inside leaf_size() */ + env->me_branch_nodemax)) { rc = mdbx_cursor_del(mc, 0); if (rc != MDBX_SUCCESS) return rc; @@ -10658,12 +10777,13 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, /* The key does not exist */ mdbx_debug("inserting key at index %i", mc->mc_ki[mc->mc_top]); if ((mc->mc_db->md_flags & MDBX_DUPSORT) && - node_size(key, data) > env->me_nodemax) { + node_size(key, data) > + /* See note inside leaf_size() */ env->me_branch_nodemax) { /* Too big for a node, insert in sub-DB. Set up an empty * "old sub-page" for prep_subDB to expand to a full page. */ fp_flags = P_LEAF | P_DIRTY; fp = env->me_pbuf; - fp->mp_leaf2_ksize = (uint16_t)data->iov_len; /* used if MDBX_DUPFIXED */ + fp->mp_leaf2_ksize = (uint16_t)data->iov_len /* used if MDBX_DUPFIXED */; fp->mp_lower = fp->mp_upper = 0; olddata.iov_len = PAGEHDRSZ; goto prep_subDB; @@ -10717,7 +10837,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, /* overflow page overwrites need special handling */ if (unlikely(F_ISSET(node_flags(node), F_BIGDATA))) { int level, ovpages, - dpages = (node_size(key, data) > env->me_nodemax) + dpages = (node_size(key, data) > + /* See note inside leaf_size() */ env->me_branch_nodemax) ? number_of_ovpages(env, data->iov_len) : 0; @@ -10809,7 +10930,10 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, /* Just overwrite the current item */ if (flags & MDBX_CURRENT) { - mdbx_cassert(mc, node_size(key, data) <= env->me_nodemax); + mdbx_cassert( + mc, + node_size(key, data) <= + /* See note inside leaf_size() */ env->me_branch_nodemax); goto current; } @@ -10818,7 +10942,10 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, if (unlikely(flags & (MDBX_NODUPDATA | MDBX_APPENDDUP))) return MDBX_KEYEXIST; /* overwrite it */ - mdbx_cassert(mc, node_size(key, data) <= env->me_nodemax); + mdbx_cassert( + mc, + node_size(key, data) <= + /* See note inside leaf_size() */ env->me_branch_nodemax); goto current; } @@ -10875,7 +11002,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, } fp_flags = fp->mp_flags; - if (NODESIZE + node_ks(node) + xdata.iov_len > env->me_nodemax) { + if (NODESIZE + node_ks(node) + xdata.iov_len > + /* See note inside leaf_size() */ env->me_branch_nodemax) { /* Too big for a sub-page, convert to sub-DB */ fp_flags &= ~P_SUBP; prep_subDB: @@ -10884,9 +11012,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, if (mc->mc_db->md_flags & MDBX_DUPFIXED) { fp_flags |= P_LEAF2; nested_dupdb.md_xsize = fp->mp_leaf2_ksize; - nested_dupdb.md_flags = MDBX_DUPFIXED; if (mc->mc_db->md_flags & MDBX_INTEGERDUP) - nested_dupdb.md_flags |= MDBX_INTEGERKEY; + nested_dupdb.md_flags = MDBX_INTEGERKEY; } nested_dupdb.md_depth = 1; nested_dupdb.md_branch_pages = 0; @@ -11373,7 +11500,9 @@ static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc, /* Data already on overflow page. */ STATIC_ASSERT(sizeof(pgno_t) % 2 == 0); leaf_bytes = node_size(key, nullptr) + sizeof(pgno_t) + sizeof(indx_t); - } else if (unlikely(node_size(key, data) > mc->mc_txn->mt_env->me_nodemax)) { + } else if (unlikely(node_size(key, data) > + /* See note inside leaf_size() */ + mc->mc_txn->mt_env->me_branch_nodemax)) { /* Put data on overflow page. */ mdbx_cassert(mc, !F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)); const pgno_t ovpages = number_of_ovpages(mc->mc_txn->mt_env, data->iov_len); @@ -11612,10 +11741,9 @@ static int mdbx_xcursor_init1(MDBX_cursor *mc, MDBX_node *node) { mx->mx_cursor.mc_pg[0] = fp; mx->mx_cursor.mc_ki[0] = 0; if (mc->mc_db->md_flags & MDBX_DUPFIXED) { - mx->mx_db.md_flags = MDBX_DUPFIXED; mx->mx_db.md_xsize = fp->mp_leaf2_ksize; if (mc->mc_db->md_flags & MDBX_INTEGERDUP) - mx->mx_db.md_flags |= MDBX_INTEGERKEY; + mx->mx_db.md_flags = MDBX_INTEGERKEY; } } mdbx_debug("Sub-db -%u root page %" PRIaPGNO, mx->mx_cursor.mc_dbi, @@ -12250,6 +12378,8 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { /* Move all nodes from src to dst */ const unsigned dst_nkeys = page_numkeys(pdst); const unsigned src_nkeys = page_numkeys(psrc); + mdbx_cassert(cdst, dst_nkeys + src_nkeys >= + (unsigned)(IS_LEAF(psrc) ? 1 : MDBX_MINKEYS)); if (likely(src_nkeys)) { unsigned j = dst_nkeys; if (unlikely(pagetype & P_LEAF2)) { @@ -12278,20 +12408,15 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { rc = mdbx_page_search_lowest(&mn); if (unlikely(rc)) return rc; - if (IS_LEAF2(mn.mc_pg[mn.mc_top])) { - key.iov_len = mn.mc_db->md_xsize; - key.iov_base = page_leaf2key(mn.mc_pg[mn.mc_top], 0, key.iov_len); - } else { - MDBX_node *lowest = page_node(mn.mc_pg[mn.mc_top], 0); - key.iov_len = node_ks(lowest); - key.iov_base = node_key(lowest); + MDBX_node *lowest = page_node(mn.mc_pg[mn.mc_top], 0); + key.iov_len = node_ks(lowest); + key.iov_base = node_key(lowest); - const size_t dst_room = page_room(pdst); - const size_t src_used = page_used(cdst->mc_txn->mt_env, psrc); - const size_t space_needed = src_used - node_ks(srcnode) + key.iov_len; - if (unlikely(space_needed > dst_room)) - return MDBX_RESULT_TRUE; - } + const size_t dst_room = page_room(pdst); + const size_t src_used = page_used(cdst->mc_txn->mt_env, psrc); + const size_t space_needed = src_used - node_ks(srcnode) + key.iov_len; + if (unlikely(space_needed > dst_room)) + return MDBX_RESULT_TRUE; } /* Mark dst as dirty. */ @@ -12399,6 +12524,8 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { return MDBX_SUCCESS; } + mdbx_cassert(cdst, page_numkeys(top_page) == dst_nkeys + src_nkeys); + if (pagetype != PAGETYPE(top_page)) { /* LY: LEAF-page becomes BRANCH, unable restore cursor's stack */ goto bailout; @@ -12461,8 +12588,6 @@ bailout: * [in] csrc The cursor to copy from. * [out] cdst The cursor to copy to. */ static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) { - unsigned i; - mdbx_cassert(csrc, csrc->mc_txn->mt_txnid >= *csrc->mc_txn->mt_env->me_oldest); cdst->mc_txn = csrc->mc_txn; @@ -12473,7 +12598,7 @@ static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) { cdst->mc_top = csrc->mc_top; cdst->mc_flags = csrc->mc_flags; - for (i = 0; i < csrc->mc_snum; i++) { + for (unsigned i = 0; i < csrc->mc_snum; i++) { cdst->mc_pg[i] = csrc->mc_pg[i]; cdst->mc_ki[i] = csrc->mc_ki[i]; } @@ -12634,68 +12759,106 @@ static int mdbx_rebalance(MDBX_cursor *mc) { mdbx_cassert(mc, PAGETYPE(right) == PAGETYPE(mc->mc_pg[mc->mc_top])); } - int ki = mc->mc_ki[mc->mc_top]; - bool fromleft; - if (!left || (right && page_room(left) < page_room(right))) { - mdbx_debug("merging %s neighbor", "right"); - mn.mc_pg[mn.mc_top] = right; - mn.mc_ki[pre_top] += 1; - mn.mc_ki[mn.mc_top] = 0; - mc->mc_ki[mc->mc_top] = (indx_t)page_numkeys(mc->mc_pg[mc->mc_top]); - fromleft = false; - } else { - mdbx_debug("merging %s neighbor", "left"); + const indx_t ki_top = mc->mc_ki[mc->mc_top]; + const indx_t ki_pre_top = mn.mc_ki[pre_top]; + const indx_t nkeys = (indx_t)page_numkeys(mn.mc_pg[mn.mc_top]); + if (left && page_room(left) > spaceleft_threshold && + (!right || page_room(right) < page_room(left))) { + /* try merge with left */ + mdbx_cassert(mc, page_numkeys(left) >= minkeys); mn.mc_pg[mn.mc_top] = left; - mn.mc_ki[pre_top] -= 1; - mn.mc_ki[mn.mc_top] = (indx_t)page_numkeys(mn.mc_pg[mn.mc_top]) - 1; + mn.mc_ki[mn.mc_top - 1] = ki_pre_top - 1; + mn.mc_ki[mn.mc_top] = (indx_t)(page_numkeys(left) - 1); mc->mc_ki[mc->mc_top] = 0; - fromleft = true; - } - - mdbx_debug("found neighbor page %" PRIaPGNO " (%u keys, %.1f%% full)", - mn.mc_pg[mn.mc_top]->mp_pgno, page_numkeys(mn.mc_pg[mn.mc_top]), - page_fill(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top])); - - /* If the neighbor page is above threshold and has enough keys, - * move one key from it. Otherwise we should try to merge them. - * (A branch page must never have less than 2 keys.) */ - if (page_fill_enough(mn.mc_pg[mn.mc_top], spaceleft_threshold, minkeys + 1)) { - rc = mdbx_node_move(&mn, mc, fromleft); - if (likely(rc == MDBX_SUCCESS)) - ki += fromleft /* if we inserted on left, bump position up */; - else if (unlikely(rc != MDBX_RESULT_TRUE)) + const indx_t new_ki = (indx_t)(ki_top + page_numkeys(left)); + mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; + /* We want mdbx_rebalance to find mn when doing fixups */ + WITH_CURSOR_TRACKING(mn, rc = mdbx_page_merge(mc, &mn)); + if (likely(rc != MDBX_RESULT_TRUE)) { + mdbx_cursor_copy(&mn, mc); + mc->mc_ki[mc->mc_top] = new_ki; + mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys); return rc; - mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) || - PAGETYPE(mc->mc_pg[mc->mc_top]) == pagetype); - mdbx_cassert(mc, mc->mc_snum < mc->mc_db->md_depth || - IS_LEAF(mc->mc_pg[mc->mc_db->md_depth - 1])); - } else { - if (!fromleft) { - rc = mdbx_page_merge(&mn, mc); - if (unlikely(MDBX_IS_ERROR(rc))) - return rc; - mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) || - PAGETYPE(mc->mc_pg[mc->mc_top]) == pagetype); - mdbx_cassert(mc, mc->mc_snum < mc->mc_db->md_depth || - IS_LEAF(mc->mc_pg[mc->mc_db->md_depth - 1])); - } else { - int new_ki = ki + page_numkeys(mn.mc_pg[mn.mc_top]); - mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; - /* We want mdbx_rebalance to find mn when doing fixups */ - WITH_CURSOR_TRACKING(mn, rc = mdbx_page_merge(mc, &mn)); - if (likely(rc == MDBX_SUCCESS)) { - ki = new_ki; - mdbx_cursor_copy(&mn, mc); - } else if (unlikely(rc != MDBX_RESULT_TRUE)) - return rc; - mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) || - PAGETYPE(mc->mc_pg[mc->mc_top]) == pagetype); - mdbx_cassert(mc, mc->mc_snum < mc->mc_db->md_depth || - IS_LEAF(mc->mc_pg[mc->mc_db->md_depth - 1])); } } - mc->mc_ki[mc->mc_top] = (indx_t)ki; - return MDBX_SUCCESS; + if (right && page_room(right) > spaceleft_threshold) { + /* try merge with right */ + mdbx_cassert(mc, page_numkeys(right) >= minkeys); + mn.mc_pg[mn.mc_top] = right; + mn.mc_ki[mn.mc_top - 1] = ki_pre_top + 1; + mn.mc_ki[mn.mc_top] = 0; + mc->mc_ki[mc->mc_top] = nkeys; + rc = mdbx_page_merge(&mn, mc); + if (likely(rc != MDBX_RESULT_TRUE)) { + mc->mc_ki[mc->mc_top] = ki_top; + mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys); + return rc; + } + } + if (left && page_numkeys(left) > minkeys && + (!right || page_numkeys(right) <= minkeys || + page_room(right) > page_room(left))) { + /* try move from left */ + mn.mc_pg[mn.mc_top] = left; + mn.mc_ki[mn.mc_top - 1] = ki_pre_top - 1; + mn.mc_ki[mn.mc_top] = (indx_t)(page_numkeys(left) - 1); + mc->mc_ki[mc->mc_top] = 0; + rc = mdbx_node_move(&mn, mc, true); + if (likely(rc != MDBX_RESULT_TRUE)) { + mc->mc_ki[mc->mc_top] = ki_top + 1; + mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys); + return rc; + } + } + if (right && page_numkeys(right) > minkeys) { + /* try move from right */ + mn.mc_pg[mn.mc_top] = right; + mn.mc_ki[mn.mc_top - 1] = ki_pre_top + 1; + mn.mc_ki[mn.mc_top] = 0; + mc->mc_ki[mc->mc_top] = nkeys; + rc = mdbx_node_move(&mn, mc, false); + if (likely(rc != MDBX_RESULT_TRUE)) { + mc->mc_ki[mc->mc_top] = ki_top; + mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys); + return rc; + } + } + + if (nkeys >= minkeys) + return MDBX_SUCCESS; + + if (left && (!right || page_room(left) > page_room(right))) { + /* try merge with left */ + mdbx_cassert(mc, page_numkeys(left) >= minkeys); + mn.mc_pg[mn.mc_top] = left; + mn.mc_ki[mn.mc_top - 1] = ki_pre_top - 1; + mn.mc_ki[mn.mc_top] = (indx_t)(page_numkeys(left) - 1); + mc->mc_ki[mc->mc_top] = 0; + const indx_t new_ki = (indx_t)(ki_top + page_numkeys(left)); + mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1; + /* We want mdbx_rebalance to find mn when doing fixups */ + WITH_CURSOR_TRACKING(mn, rc = mdbx_page_merge(mc, &mn)); + if (likely(rc != MDBX_RESULT_TRUE)) { + mdbx_cursor_copy(&mn, mc); + mc->mc_ki[mc->mc_top] = new_ki; + mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys); + return rc; + } + } else { + /* try merge with right */ + mdbx_cassert(mc, page_numkeys(right) >= minkeys); + mn.mc_pg[mn.mc_top] = right; + mn.mc_ki[mn.mc_top - 1] = ki_pre_top + 1; + mn.mc_ki[mn.mc_top] = 0; + mc->mc_ki[mc->mc_top] = nkeys; + rc = mdbx_page_merge(&mn, mc); + if (likely(rc != MDBX_RESULT_TRUE)) { + mc->mc_ki[mc->mc_top] = ki_top; + mdbx_cassert(mc, rc || page_numkeys(mc->mc_pg[mc->mc_top]) >= minkeys); + return rc; + } + } + return MDBX_PROBLEM; } static __cold int mdbx_page_check(MDBX_env *env, const MDBX_page *const mp, @@ -13104,7 +13267,6 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, pgno_t pgno = 0; unsigned i, ptop; MDBX_env *env = mc->mc_txn->mt_env; - MDBX_node *node; MDBX_val sepkey, rkey, xdata; MDBX_page *copy = NULL; MDBX_page *rp, *pp; @@ -13120,6 +13282,8 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, return rc; } + mdbx_cassert(mc, + nkeys >= (unsigned)(IS_BRANCH(mp) ? MDBX_MINKEYS * 2 - 1 : 1)); mdbx_debug("-----> splitting %s page %" PRIaPGNO " and adding [%s] at index %i/%i", IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, DKEY(newkey), @@ -13184,10 +13348,9 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, split_indx = (nkeys + 1) / 2; if (IS_LEAF2(rp)) { char *split, *ins; - int x; unsigned lsize, rsize, ksize; /* Move half of the keys to the right sibling */ - x = mc->mc_ki[mc->mc_top] - split_indx; + const int x = mc->mc_ki[mc->mc_top] - split_indx; ksize = mc->mc_db->md_xsize; split = page_leaf2key(mp, split_indx, ksize); rsize = (nkeys - split_indx) * ksize; @@ -13231,11 +13394,10 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, mc->mc_ki[mc->mc_top] = (indx_t)x; } } else { - size_t psize, nsize, k; /* Maximum free space in an empty page */ const unsigned pmax = page_space(env); - nsize = IS_LEAF(mp) ? leaf_size(env, newkey, newdata) - : branch_size(env, newkey); + const size_t nsize = IS_LEAF(mp) ? leaf_size(env, newkey, newdata) + : branch_size(env, newkey); /* grab a page to hold a temporary copy */ copy = mdbx_page_malloc(mc->mc_txn, 1); @@ -13268,47 +13430,48 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, * As a final tweak, if the new item goes on the last * spot on the page (and thus, onto the new page), bias * the split so the new page is emptier than the old page. - * This yields better packing during sequential inserts. - */ - int dir; + * This yields better packing during sequential inserts. */ if (nkeys < 32 || nsize > pmax / 16 || newindx >= nkeys) { /* Find split point */ - psize = 0; + int dir; + size_t psize = 0; + unsigned k; if (newindx <= split_indx || newindx >= nkeys) { i = 0; dir = 1; k = (newindx >= nkeys) ? nkeys : split_indx + 1 + IS_LEAF(mp); + split_indx = k - 1; } else { i = nkeys; dir = -1; k = split_indx - 1; + split_indx += 1; } - for (; i != k; i += dir) { + do { if (i == newindx) { psize += nsize; - node = NULL; } else { - node = (MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ); + MDBX_node *node = + (MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ); psize += NODESIZE + node_ks(node) + sizeof(indx_t); - if (IS_LEAF(mp)) { - if (F_ISSET(node_flags(node), F_BIGDATA)) - psize += sizeof(pgno_t); - else - psize += node_ds(node); - } + if (IS_LEAF(mp)) + psize += F_ISSET(node_flags(node), F_BIGDATA) ? sizeof(pgno_t) + : node_ds(node); psize = EVEN(psize); } - if (psize > pmax || i == k - dir) { + if (psize > pmax) { split_indx = i + (dir < 0); break; } - } + i += dir; + } while (i != k); } + if (split_indx == newindx) { sepkey.iov_len = newkey->iov_len; sepkey.iov_base = newkey->iov_base; } else { - node = + MDBX_node *node = (MDBX_node *)((char *)mp + copy->mp_ptrs[split_indx] + PAGEHDRSZ); sepkey.iov_len = node_ks(node); sepkey.iov_base = node_key(node); @@ -13421,7 +13584,8 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, /* Update index for the new key. */ mc->mc_ki[mc->mc_top] = n; } else { - node = (MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ); + MDBX_node *node = + (MDBX_node *)((char *)mp + copy->mp_ptrs[i] + PAGEHDRSZ); rkey.iov_base = node_key(node); rkey.iov_len = node_ks(node); if (IS_LEAF(mp)) { @@ -13492,7 +13656,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, } } if (nflags & MDBX_RESERVE) { - node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + MDBX_node *node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); if (!(node_flags(node) & F_BIGDATA)) newdata->iov_base = node_data(node); } @@ -16510,24 +16674,13 @@ int mdbx_dbi_sequence(MDBX_txn *txn, MDBX_dbi dbi, uint64_t *result, /*----------------------------------------------------------------------------*/ -__cold intptr_t mdbx_limits_keysize_max(intptr_t pagesize) { - if (pagesize < 1) - pagesize = (intptr_t)mdbx_syspagesize(); - else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || - pagesize > (intptr_t)MAX_PAGESIZE || - !is_powerof2((size_t)pagesize))) - return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; - - return mdbx_maxkey(mdbx_nodemax(pagesize)); -} - __cold intptr_t mdbx_limits_dbsize_min(intptr_t pagesize) { if (pagesize < 1) pagesize = (intptr_t)mdbx_syspagesize(); else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || pagesize > (intptr_t)MAX_PAGESIZE || !is_powerof2((size_t)pagesize))) - return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; + return -1; return MIN_PAGENO * pagesize; } @@ -16538,7 +16691,7 @@ __cold intptr_t mdbx_limits_dbsize_max(intptr_t pagesize) { else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || pagesize > (intptr_t)MAX_PAGESIZE || !is_powerof2((size_t)pagesize))) - return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; + return -1; const uint64_t limit = MAX_PAGENO * (uint64_t)pagesize; return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit @@ -16551,9 +16704,11 @@ __cold intptr_t mdbx_limits_txnsize_max(intptr_t pagesize) { else if (unlikely(pagesize < (intptr_t)MIN_PAGESIZE || pagesize > (intptr_t)MAX_PAGESIZE || !is_powerof2((size_t)pagesize))) - return (MDBX_EINVAL > 0) ? -MDBX_EINVAL : MDBX_EINVAL; + return -1; - return pagesize * (MDBX_DPL_TXNFULL - 1); + const uint64_t limit = pagesize * (uint64_t)(MDBX_DPL_TXNFULL - 1); + return (limit < (intptr_t)MAX_MAPSIZE) ? (intptr_t)limit + : (intptr_t)MAX_MAPSIZE; } /*** Attribute support functions for Nexenta **********************************/ diff --git a/src/elements/internals.h b/src/elements/internals.h index fc68845f..83a2fc70 100644 --- a/src/elements/internals.h +++ b/src/elements/internals.h @@ -644,7 +644,7 @@ typedef union MDBX_DP { * elements are in the array. */ typedef MDBX_DP *MDBX_DPL; -/* PNL sizes - likely should be even bigger */ +/* PNL sizes */ #define MDBX_PNL_GRANULATE 1024 #define MDBX_PNL_INITIAL \ (MDBX_PNL_GRANULATE - 2 - MDBX_ASSUME_MALLOC_OVERHEAD / sizeof(pgno_t)) @@ -917,11 +917,11 @@ struct MDBX_env { MDBX_DPL me_dirtylist; /* Number of freelist items that can fit in a single overflow page */ unsigned me_maxgc_ov1page; - /* Max size of a node on a page */ - unsigned me_nodemax; - unsigned me_maxkey_limit; /* max size of a key */ - uint32_t me_live_reader; /* have liveness lock in reader table */ - void *me_userctx; /* User-settable context */ + unsigned me_branch_nodemax; /* max size of a branch-node */ + uint16_t me_maxkey_nd, me_maxkey_ds; + unsigned me_maxval_nd, me_maxval_ds; + uint32_t me_live_reader; /* have liveness lock in reader table */ + void *me_userctx; /* User-settable context */ volatile uint64_t *me_sync_timestamp; volatile uint64_t *me_autosync_period; volatile pgno_t *me_unsynced_pages; diff --git a/test/append.cc b/test/append.cc index 9f567c7e..a30351dc 100644 --- a/test/append.cc +++ b/test/append.cc @@ -53,7 +53,7 @@ bool testcase_append::run() { } log_trace("append: append-a %" PRIu64, serial); - generate_pair(serial, key, data); + generate_pair(serial); int cmp = inserted_number ? mdbx_cmp(txn_guard.get(), dbi, &key->value, &last_key->value) : 1; diff --git a/test/config.cc b/test/config.cc index add1630d..de3b657c 100644 --- a/test/config.cc +++ b/test/config.cc @@ -589,10 +589,7 @@ unsigned actor_params::mdbx_keylen_min() const { } unsigned actor_params::mdbx_keylen_max() const { - return (table_flags & MDBX_INTEGERKEY) - ? 8 - : std::min((unsigned)mdbx_limits_keysize_max(pagesize), - (unsigned)UINT16_MAX); + return (unsigned)mdbx_limits_keysize_max(pagesize, table_flags); } unsigned actor_params::mdbx_datalen_min() const { @@ -600,10 +597,6 @@ unsigned actor_params::mdbx_datalen_min() const { } unsigned actor_params::mdbx_datalen_max() const { - return (table_flags & MDBX_INTEGERDUP) - ? 8 - : std::min((table_flags & MDBX_DUPSORT) - ? (unsigned)mdbx_limits_keysize_max(pagesize) - : (unsigned)MDBX_MAXDATASIZE, - (unsigned)UINT16_MAX); + return std::min((unsigned)UINT16_MAX, + (unsigned)mdbx_limits_valsize_max(pagesize, table_flags)); } diff --git a/test/keygen.cc b/test/keygen.cc index e1561420..c2098e6e 100644 --- a/test/keygen.cc +++ b/test/keygen.cc @@ -72,7 +72,7 @@ serial_t injective(const serial_t serial, } void __hot maker::pair(serial_t serial, const buffer &key, buffer &value, - serial_t value_age) { + serial_t value_age, const bool keylen_changeable) { assert(mapping.width >= serial_minwith && mapping.width <= serial_maxwith); assert(mapping.split <= mapping.width); assert(mapping.mesh <= mapping.width); @@ -131,11 +131,62 @@ void __hot maker::pair(serial_t serial, const buffer &key, buffer &value, log_trace("keygen-pair: key %" PRIu64 ", value %" PRIu64, key_serial, value_serial); - mk(key_serial, key_essentials, *key); - mk(value_serial, value_essentials, *value); + mk_begin(key_serial, key_essentials, *key); + mk_begin(value_serial, value_essentials, *value); + +#if 0 /* unused for now */ + if (key->value.iov_len + value->value.iov_len > pair_maxlen) { + unsigned extra = key->value.iov_len + value->value.iov_len - pair_maxlen; + if (keylen_changeable && + key->value.iov_len > std::max(8u, (unsigned)key_essentials.minlen)) { +#if defined(__GNUC__) || defined(__clang__) + const bool coin = __builtin_parityll(serial) != 0; +#else + const bool coin = INT64_C(0xF2CEECA9989BD96A) * int64_t(serial) < 0; +#endif + if (coin) { + const unsigned gap = + key->value.iov_len - std::max(8u, (unsigned)key_essentials.minlen); + const unsigned chop = std::min(gap, extra); + log_trace("keygen-pair: chop %u key-len %u -> %u", chop, + (unsigned)key->value.iov_len, + (unsigned)key->value.iov_len - chop); + key->value.iov_len -= chop; + extra -= chop; + } + } + if (extra && value->value.iov_len > + std::max(8u, (unsigned)value_essentials.minlen)) { + const unsigned gap = value->value.iov_len - + std::max(8u, (unsigned)value_essentials.minlen); + const unsigned chop = std::min(gap, extra); + log_trace("keygen-pair: chop %u value-len %u -> %u", chop, + (unsigned)value->value.iov_len, + (unsigned)value->value.iov_len - chop); + value->value.iov_len -= chop; + extra -= chop; + } + if (keylen_changeable && extra && + key->value.iov_len > std::max(8u, (unsigned)key_essentials.minlen)) { + const unsigned gap = + key->value.iov_len - std::max(8u, (unsigned)key_essentials.minlen); + const unsigned chop = std::min(gap, extra); + log_trace("keygen-pair: chop %u key-len %u -> %u", chop, + (unsigned)key->value.iov_len, + (unsigned)key->value.iov_len - chop); + key->value.iov_len -= chop; + extra -= chop; + } + } +#else + (void)keylen_changeable; +#endif /* unused for now */ + + mk_continue(key_serial, key_essentials, *key); + mk_continue(value_serial, value_essentials, *value); if (log_enabled(logging::trace)) { - char dump_key[128], dump_value[128]; + char dump_key[4096], dump_value[4096]; log_trace("keygen-pair: key %s, value %s", mdbx_dump_val(&key->value, dump_key, sizeof(dump_key)), mdbx_dump_val(&value->value, dump_value, sizeof(dump_value))); @@ -146,19 +197,22 @@ void maker::setup(const config::actor_params_pod &actor, unsigned actor_id, unsigned thread_number) { key_essentials.flags = actor.table_flags & (MDBX_INTEGERKEY | MDBX_REVERSEKEY | MDBX_DUPSORT); - assert(actor.keylen_min <= UINT8_MAX); - key_essentials.minlen = (uint8_t)actor.keylen_min; - assert(actor.keylen_max <= UINT16_MAX); - key_essentials.maxlen = (uint16_t)actor.keylen_max; + assert(actor.keylen_min <= UINT16_MAX); + key_essentials.minlen = (uint16_t)actor.keylen_min; + assert(actor.keylen_max <= UINT32_MAX); + key_essentials.maxlen = std::min( + (uint32_t)actor.keylen_max, + (uint32_t)mdbx_limits_keysize_max(actor.pagesize, key_essentials.flags)); value_essentials.flags = actor.table_flags & (MDBX_INTEGERDUP | MDBX_REVERSEDUP); - assert(actor.datalen_min <= UINT8_MAX); - value_essentials.minlen = (uint8_t)actor.datalen_min; - assert(actor.datalen_max <= UINT16_MAX); - value_essentials.maxlen = (uint16_t)actor.datalen_max; + assert(actor.datalen_min <= UINT16_MAX); + value_essentials.minlen = (uint16_t)actor.datalen_min; + assert(actor.datalen_max <= UINT32_MAX); + value_essentials.maxlen = std::min( + (uint32_t)actor.datalen_max, + (uint32_t)mdbx_limits_valsize_max(actor.pagesize, key_essentials.flags)); - assert(thread_number < 2); (void)thread_number; mapping = actor.keygen; salt = (actor.keygen.seed + actor_id) * UINT64_C(14653293970879851569); @@ -226,18 +280,25 @@ buffer alloc(size_t limit) { return buffer(ptr); } -void __hot maker::mk(const serial_t serial, const essentials ¶ms, - result &out) { +void __hot maker::mk_begin(const serial_t serial, const essentials ¶ms, + result &out) { assert(out.limit >= params.maxlen); assert(params.maxlen >= params.minlen); assert(params.maxlen >= length(serial)); - out.value.iov_base = out.bytes; out.value.iov_len = (params.maxlen > params.minlen) ? params.minlen + serial % (params.maxlen - params.minlen) : params.minlen; + if ((params.flags & (MDBX_INTEGERKEY | MDBX_INTEGERDUP)) == 0 && + out.value.iov_len < 8) + out.value.iov_len = std::max(length(serial), out.value.iov_len); +} + +void __hot maker::mk_continue(const serial_t serial, const essentials ¶ms, + result &out) { + out.value.iov_base = out.bytes; if (params.flags & (MDBX_INTEGERKEY | MDBX_INTEGERDUP)) { assert(params.maxlen == params.minlen); assert(params.minlen == 4 || params.minlen == 8); @@ -251,17 +312,13 @@ void __hot maker::mk(const serial_t serial, const essentials ¶ms, unaligned::store(out.bytes + out.value.iov_len - 8, htobe64(serial)); } else { out.u64 = htobe64(serial); - if (out.value.iov_len < 8) { - out.value.iov_len = std::max(length(serial), out.value.iov_len); + if (out.value.iov_len < 8) out.value.iov_base = out.bytes + 8 - out.value.iov_len; - } } } else { out.u64 = htole64(serial); if (out.value.iov_len > 8) memset(out.bytes + 8, '\0', out.value.iov_len - 8); - else - out.value.iov_len = std::max(length(serial), out.value.iov_len); } assert(out.value.iov_len >= params.minlen); diff --git a/test/keygen.h b/test/keygen.h index 0403ab8a..d25b8834 100644 --- a/test/keygen.h +++ b/test/keygen.h @@ -107,18 +107,25 @@ class maker { serial_t salt; struct essentials { - uint8_t minlen; - uint8_t flags; - uint16_t maxlen; + uint16_t minlen; + uint16_t flags; + uint32_t maxlen; } key_essentials, value_essentials; - static void mk(const serial_t serial, const essentials ¶ms, result &out); + static void mk_begin(const serial_t serial, const essentials ¶ms, + result &out); + static void mk_continue(const serial_t serial, const essentials ¶ms, + result &out); + static void mk(const serial_t serial, const essentials ¶ms, result &out) { + mk_begin(serial, params, out); + mk_continue(serial, params, out); + } public: maker() { memset(this, 0, sizeof(*this)); } void pair(serial_t serial, const buffer &key, buffer &value, - serial_t value_age); + serial_t value_age, const bool keylen_changeable); void setup(const config::actor_params_pod &actor, unsigned actor_id, unsigned thread_number); void make_ordered(); diff --git a/test/test.h b/test/test.h index 3be05c11..630059ac 100644 --- a/test/test.h +++ b/test/test.h @@ -197,13 +197,12 @@ protected: bool should_continue(bool check_timeout_only = false) const; void generate_pair(const keygen::serial_t serial, keygen::buffer &out_key, - keygen::buffer &out_value, keygen::serial_t data_age = 0) { - keyvalue_maker.pair(serial, out_key, out_value, data_age); + keygen::buffer &out_value, keygen::serial_t data_age) { + keyvalue_maker.pair(serial, out_key, out_value, data_age, false); } - void generate_pair(const keygen::serial_t serial, - keygen::serial_t data_age = 0) { - generate_pair(serial, key, data, data_age); + void generate_pair(const keygen::serial_t serial) { + keyvalue_maker.pair(serial, key, data, 0, true); } bool mode_readonly() const {