diff --git a/src/bits.h b/src/bits.h index 5be1b6ce..261f84dc 100644 --- a/src/bits.h +++ b/src/bits.h @@ -1058,7 +1058,7 @@ static __inline unsigned mdbx_log2(size_t value) { #define NUMKEYS(p) ((unsigned)(p)->mp_lower >> 1) /* The amount of space remaining in the page */ -#define SIZELEFT(p) (indx_t)((p)->mp_upper - (p)->mp_lower) +#define SIZELEFT(p) ((indx_t)((p)->mp_upper - (p)->mp_lower)) /* The percentage of space used in the page, in tenths of a percent. */ #define PAGEFILL(env, p) \ diff --git a/src/mdbx.c b/src/mdbx.c index 4414d492..3e801d88 100644 --- a/src/mdbx.c +++ b/src/mdbx.c @@ -1072,10 +1072,20 @@ static int __must_check_result mdbx_sync_locked(MDBX_env *env, unsigned flags, static void mdbx_env_close0(MDBX_env *env); static MDBX_node *mdbx_node_search(MDBX_cursor *mc, MDBX_val *key, int *exactp); -static int __must_check_result mdbx_node_add(MDBX_cursor *mc, unsigned indx, - const MDBX_val *key, - MDBX_val *data, pgno_t pgno, - unsigned flags); + +static int __must_check_result mdbx_node_add_branch(MDBX_cursor *mc, + unsigned indx, + const MDBX_val *key, + pgno_t pgno); +static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc, + unsigned indx, + const MDBX_val *key, + MDBX_val *data, + unsigned flags); +static int __must_check_result mdbx_node_add_leaf2(MDBX_cursor *mc, + unsigned indx, + const MDBX_val *key); + static void mdbx_node_del(MDBX_cursor *mc, size_t ksize); static void mdbx_node_shrink(MDBX_page *mp, unsigned indx); static int __must_check_result mdbx_node_move(MDBX_cursor *csrc, @@ -5242,7 +5252,7 @@ int __cold mdbx_env_get_maxkeysize(MDBX_env *env) { } #define mdbx_nodemax(pagesize) \ - (((((pagesize)-PAGEHDRSZ) / MDBX_MINKEYS) & -(intptr_t)2) - sizeof(indx_t)) + (((((pagesize)-PAGEHDRSZ) / MDBX_MINKEYS) & ~(uintptr_t)1) - sizeof(indx_t)) #define mdbx_maxkey(nodemax) (((nodemax)-NODESIZE - sizeof(MDBX_db)) / 2) @@ -5267,7 +5277,7 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) { STATIC_ASSERT(mdbx_nodemax(MIN_PAGESIZE) > 42); STATIC_ASSERT(mdbx_nodemax(MAX_PAGESIZE) < UINT16_MAX); const intptr_t nodemax = mdbx_nodemax(pagesize); - mdbx_ensure(env, nodemax > 42 && nodemax < UINT16_MAX); + mdbx_ensure(env, nodemax > 42 && nodemax < UINT16_MAX && nodemax % 2 == 0); env->me_nodemax = (unsigned)nodemax; STATIC_ASSERT(mdbx_maxkey(MIN_PAGESIZE) > 42); @@ -5275,7 +5285,8 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) { STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) > 42); STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) < MAX_PAGESIZE); const intptr_t maxkey_limit = mdbx_maxkey(env->me_nodemax); - mdbx_ensure(env, maxkey_limit > 42 && (size_t)maxkey_limit < pagesize); + mdbx_ensure(env, maxkey_limit > 42 && (size_t)maxkey_limit < pagesize && + maxkey_limit % 2 == 0); env->me_maxkey_limit = (unsigned)maxkey_limit; env->me_psize2log = mdbx_log2(pagesize); @@ -8293,8 +8304,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, memcpy(olddata.iov_base, data->iov_base, data->iov_len); else { mdbx_cassert(mc, NUMKEYS(mc->mc_pg[mc->mc_top]) == 1); - mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) && - !IS_LEAF2(mc->mc_pg[mc->mc_top])); + mdbx_cassert(mc, PAGETYPE(mc->mc_pg[mc->mc_top]) == P_LEAF); mdbx_cassert(mc, NODEDSZ(leaf) == 0); mdbx_cassert(mc, leaf->mn_flags == 0); mdbx_cassert(mc, key->iov_len < UINT16_MAX); @@ -8323,7 +8333,7 @@ new_sub: rc = mdbx_page_split(mc, key, rdata, P_INVALID, nflags); } else { /* There is room already in this leaf page. */ - rc = mdbx_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags); + rc = mdbx_node_add_leaf(mc, mc->mc_ki[mc->mc_top], key, rdata, nflags); if (likely(rc == 0)) { /* Adjust other cursors pointing to mp */ MDBX_cursor *m2, *m3; @@ -8636,12 +8646,10 @@ static int mdbx_page_new(MDBX_cursor *mc, unsigned flags, unsigned num, * Returns The number of bytes needed to store the node. */ static __inline size_t mdbx_leaf_size(MDBX_env *env, const MDBX_val *key, const MDBX_val *data) { - size_t sz; - - sz = LEAFSIZE(key, data); + size_t sz = LEAFSIZE(key, data); if (sz > env->me_nodemax) { /* put on overflow page */ - sz -= data->iov_len - sizeof(pgno_t); + sz = sz - data->iov_len + sizeof(pgno_t); } return EVEN(sz + sizeof(indx_t)); @@ -8660,20 +8668,223 @@ static __inline size_t mdbx_leaf_size(MDBX_env *env, const MDBX_val *key, * * Returns The number of bytes needed to store the node. */ static __inline size_t mdbx_branch_size(MDBX_env *env, const MDBX_val *key) { - size_t sz; - - sz = INDXSIZE(key); + size_t sz = INDXSIZE(key); if (unlikely(sz > env->me_nodemax)) { /* put on overflow page */ /* not implemented */ mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __FUNCTION__, __LINE__); - sz -= key->iov_len - sizeof(pgno_t); + sz = sz - key->iov_len + sizeof(pgno_t); } return sz + sizeof(indx_t); } +static int __must_check_result mdbx_node_add_leaf2(MDBX_cursor *mc, + unsigned indx, + const MDBX_val *key) { + MDBX_page *mp = mc->mc_pg[mc->mc_top]; + DKBUF; + mdbx_debug("add to leaf2-%spage %" PRIaPGNO " index %i, " + " key size %" PRIuPTR " [%s]", + IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, + key ? key->iov_len : 0, DKEY(key)); + + mdbx_cassert(mc, key); + mdbx_cassert(mc, PAGETYPE(mp) == (P_LEAF | P_LEAF2)); + const unsigned ksize = mc->mc_db->md_xsize; + mdbx_cassert(mc, ksize == key->iov_len); + + const int room = SIZELEFT(mp); + const int entry_size = ksize + sizeof(indx_t); + mdbx_cassert(mc, room >= entry_size); + if (unlikely(room < entry_size)) { + bailout: + mc->mc_txn->mt_flags |= MDBX_TXN_ERROR; + return MDBX_PAGE_FULL; + } + + char *const ptr = LEAF2KEY(mp, indx, ksize); + mdbx_cassert(mc, NUMKEYS(mp) >= indx); + if (unlikely(NUMKEYS(mp) < indx)) + goto bailout; + + const unsigned diff = NUMKEYS(mp) - indx; + if (likely(diff > 0)) + /* Move higher keys up one slot. */ + memmove(ptr + ksize, ptr, diff * ksize); + /* insert new key */ + memcpy(ptr, key->iov_base, ksize); + + /* Just using these for counting */ + mdbx_cassert(mc, UINT16_MAX - mp->mp_lower >= (int)sizeof(indx_t)); + mp->mp_lower += sizeof(indx_t); + mdbx_cassert(mc, mp->mp_upper >= ksize - sizeof(indx_t)); + mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t)); + + mdbx_cassert(mc, + mp->mp_upper >= mp->mp_lower && + PAGEHDRSZ + mp->mp_upper <= mc->mc_txn->mt_env->me_psize); + return MDBX_SUCCESS; +} + +static int __must_check_result mdbx_node_add_branch(MDBX_cursor *mc, + unsigned indx, + const MDBX_val *key, + pgno_t pgno) { + MDBX_page *mp = mc->mc_pg[mc->mc_top]; + DKBUF; + mdbx_debug("add to branch-%spage %" PRIaPGNO " index %i, node-pgno %" PRIaPGNO + " key size %" PRIuPTR " [%s]", + IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, pgno, + key ? key->iov_len : 0, DKEY(key)); + + mdbx_cassert(mc, PAGETYPE(mp) == P_BRANCH); + STATIC_ASSERT(NODESIZE % 2 == 0); + + const size_t room = SIZELEFT(mp); + const size_t node_size = + likely(key != NULL) ? NODESIZE + EVEN(key->iov_len) : NODESIZE; + mdbx_cassert(mc, mdbx_branch_size(mc->mc_txn->mt_env, key) == + node_size + sizeof(indx_t)); + mdbx_cassert(mc, room >= node_size + sizeof(indx_t)); + if (unlikely(room < node_size + sizeof(indx_t))) { + bailout: + mc->mc_txn->mt_flags |= MDBX_TXN_ERROR; + return MDBX_PAGE_FULL; + } + + const unsigned numkeys = NUMKEYS(mp); + mdbx_cassert(mc, numkeys >= indx); + if (unlikely(numkeys < indx)) + goto bailout; + + /* Move higher pointers up one slot. */ + for (unsigned i = numkeys; i > indx; --i) + mp->mp_ptrs[i] = mp->mp_ptrs[i - 1]; + + /* Adjust free space offsets. */ + const size_t ofs = mp->mp_upper - node_size; + mdbx_cassert(mc, ofs >= mp->mp_lower + sizeof(indx_t)); + mdbx_cassert(mc, ofs <= UINT16_MAX); + mp->mp_ptrs[indx] = (uint16_t)ofs; + mp->mp_upper = (uint16_t)ofs; + mp->mp_lower += sizeof(indx_t); + + /* Write the node data. */ + MDBX_node *node = NODEPTR(mp, indx); + SETPGNO(node, pgno); + node->mn_ksize = 0; + node->mn_flags = 0; + if (likely(key != NULL)) { + node->mn_ksize = (uint16_t)key->iov_len; + memcpy(NODEKEY(node), key->iov_base, key->iov_len); + } + + mdbx_cassert(mc, + mp->mp_upper >= mp->mp_lower && + PAGEHDRSZ + mp->mp_upper <= mc->mc_txn->mt_env->me_psize); + return MDBX_SUCCESS; +} + +static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc, + unsigned indx, + const MDBX_val *key, + MDBX_val *data, + unsigned flags) { + MDBX_page *mp = mc->mc_pg[mc->mc_top]; + DKBUF; + mdbx_debug("add to leaf-%spage %" PRIaPGNO " index %i, data size %" PRIuPTR + " key size %" PRIuPTR " [%s]", + IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, + data ? data->iov_len : 0, key ? key->iov_len : 0, DKEY(key)); + mdbx_cassert(mc, key != NULL && data != NULL); + mdbx_cassert(mc, PAGETYPE(mp) == P_LEAF); + MDBX_page *largepage = NULL; + + const size_t room = SIZELEFT(mp); + size_t node_size = NODESIZE + key->iov_len; + if (unlikely(flags & F_BIGDATA)) { + /* Data already on overflow page. */ + STATIC_ASSERT(sizeof(pgno_t) % 2 == 0); + node_size += sizeof(pgno_t); + } else if (unlikely(node_size + data->iov_len > + mc->mc_txn->mt_env->me_nodemax)) { + const pgno_t ovpages = OVPAGES(mc->mc_txn->mt_env, data->iov_len); + /* Put data on overflow page. */ + mdbx_debug("data size is %" PRIuPTR ", node would be %" PRIuPTR + ", put data on %u-overflow page(s)", + data->iov_len, node_size + data->iov_len, ovpages); + int rc = mdbx_page_new(mc, P_OVERFLOW, ovpages, &largepage); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + mdbx_debug("allocated overflow page %" PRIaPGNO "", largepage->mp_pgno); + flags |= F_BIGDATA; + node_size += sizeof(pgno_t); + mdbx_cassert(mc, mdbx_leaf_size(mc->mc_txn->mt_env, key, data) == + EVEN(node_size) + sizeof(indx_t)); + } else { + node_size += data->iov_len; + mdbx_cassert(mc, mdbx_leaf_size(mc->mc_txn->mt_env, key, data) == + EVEN(node_size) + sizeof(indx_t)); + } + + node_size = EVEN(node_size); + mdbx_cassert(mc, room >= node_size + sizeof(indx_t)); + if (unlikely(room < node_size + sizeof(indx_t))) { + bailout: + mc->mc_txn->mt_flags |= MDBX_TXN_ERROR; + return MDBX_PAGE_FULL; + } + + const unsigned numkeys = NUMKEYS(mp); + mdbx_cassert(mc, numkeys >= indx); + if (unlikely(numkeys < indx)) + goto bailout; + + /* Move higher pointers up one slot. */ + for (unsigned i = numkeys; i > indx; --i) + mp->mp_ptrs[i] = mp->mp_ptrs[i - 1]; + + /* Adjust free space offsets. */ + const size_t ofs = mp->mp_upper - node_size; + mdbx_cassert(mc, ofs >= mp->mp_lower + sizeof(indx_t)); + mdbx_cassert(mc, ofs <= UINT16_MAX); + mp->mp_ptrs[indx] = (uint16_t)ofs; + mp->mp_upper = (uint16_t)ofs; + mp->mp_lower += sizeof(indx_t); + + /* Write the node data. */ + MDBX_node *node = NODEPTR(mp, indx); + node->mn_ksize = (uint16_t)key->iov_len; + node->mn_flags = (uint16_t)flags; + SETDSZ(node, data->iov_len); + memcpy(NODEKEY(node), key->iov_base, key->iov_len); + + void *nodedata = NODEDATA(node); + if (likely(largepage == NULL)) { + if (unlikely(flags & F_BIGDATA)) + memcpy(nodedata, data->iov_base, sizeof(pgno_t)); + else if (unlikely(flags & MDBX_RESERVE)) + data->iov_base = nodedata; + else if (likely(nodedata != data->iov_base)) + memcpy(nodedata, data->iov_base, data->iov_len); + } else { + memcpy(nodedata, &largepage->mp_pgno, sizeof(pgno_t)); + nodedata = PAGEDATA(largepage); + if (unlikely(flags & MDBX_RESERVE)) + data->iov_base = nodedata; + else if (likely(nodedata != data->iov_base)) + memcpy(nodedata, data->iov_base, data->iov_len); + } + + mdbx_cassert(mc, + mp->mp_upper >= mp->mp_lower && + PAGEHDRSZ + mp->mp_upper <= mc->mc_txn->mt_env->me_psize); + return MDBX_SUCCESS; +} + +#if 0 /* Add a node to the page pointed to by the cursor. * Set MDBX_TXN_ERROR on failure. * @@ -8699,35 +8910,16 @@ static int mdbx_node_add(MDBX_cursor *mc, unsigned indx, const MDBX_val *key, MDBX_page *mp = mc->mc_pg[mc->mc_top]; MDBX_page *ofp = NULL; /* overflow page */ void *ndata; - DKBUF; mdbx_cassert(mc, mp->mp_upper >= mp->mp_lower); + DKBUF; mdbx_debug("add to %s %spage %" PRIaPGNO " index %i, data size %" PRIuPTR " key size %" PRIuPTR " [%s]", IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, data ? data->iov_len : 0, key ? key->iov_len : 0, DKEY(key)); - if (IS_LEAF2(mp)) { - mdbx_cassert(mc, key); - /* Move higher keys up one slot. */ - const int ksize = mc->mc_db->md_xsize; - char *const ptr = LEAF2KEY(mp, indx, ksize); - const int diff = NUMKEYS(mp) - indx; - if (diff > 0) - memmove(ptr + ksize, ptr, diff * ksize); - /* insert new key */ - memcpy(ptr, key->iov_base, ksize); - - /* Just using these for counting */ - mdbx_cassert(mc, UINT16_MAX - mp->mp_lower >= (int)sizeof(indx_t)); - mp->mp_lower += sizeof(indx_t); - mdbx_cassert(mc, mp->mp_upper >= ksize - sizeof(indx_t)); - mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t)); - return MDBX_SUCCESS; - } - room = (intptr_t)SIZELEFT(mp) - (intptr_t)sizeof(indx_t); if (key != NULL) node_size += key->iov_len; @@ -8787,7 +8979,7 @@ update: if (IS_LEAF(mp)) { ndata = NODEDATA(node); - if (unlikely(ofp == NULL)) { + if (likely(ofp == NULL)) { if (unlikely(F_ISSET(flags, F_BIGDATA))) memcpy(ndata, data->iov_base, sizeof(pgno_t)); else if (F_ISSET(flags, MDBX_RESERVE)) @@ -8815,6 +9007,7 @@ full: mc->mc_txn->mt_flags |= MDBX_TXN_ERROR; return MDBX_PAGE_FULL; } +#endif /* Delete the specified node from a page. * [in] mc Cursor pointing to the node to delete. @@ -9278,7 +9471,7 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) { /* Shift node contents if EVEN(key length) changed. */ if (delta) { - if (delta > 0 && SIZELEFT(mp) < delta) { + if (SIZELEFT(mp) < delta) { pgno_t pgno; /* not enough space left, do a delete and split */ mdbx_debug("Not enough room, delta = %d, splitting...", delta); @@ -9308,9 +9501,7 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) { if (node->mn_ksize != key->iov_len) node->mn_ksize = (uint16_t)key->iov_len; - if (key->iov_len) - memcpy(NODEKEY(node), key->iov_base, key->iov_len); - + memcpy(NODEKEY(node), key->iov_base, key->iov_len); return MDBX_SUCCESS; } @@ -9318,155 +9509,183 @@ static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst); /* Move a node from csrc to cdst. */ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) { - MDBX_node *srcnode; - MDBX_val key, data; - pgno_t srcpg; - MDBX_cursor mn; int rc; - unsigned flags; - DKBUF; - mdbx_cassert(csrc, IS_LEAF(csrc->mc_pg[csrc->mc_top]) == - IS_LEAF(cdst->mc_pg[cdst->mc_top])); /* Mark src and dst as dirty. */ if (unlikely((rc = mdbx_page_touch(csrc)) || (rc = mdbx_page_touch(cdst)))) return rc; - if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { - key.iov_len = csrc->mc_db->md_xsize; - key.iov_base = LEAF2KEY(csrc->mc_pg[csrc->mc_top], - csrc->mc_ki[csrc->mc_top], key.iov_len); - data.iov_len = 0; - data.iov_base = NULL; - srcpg = 0; - flags = 0; - } else { - srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]); - mdbx_cassert(csrc, !((size_t)srcnode & 1)); - srcpg = NODEPGNO(srcnode); - flags = srcnode->mn_flags; - if (csrc->mc_ki[csrc->mc_top] == 0 && - IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { - unsigned snum = csrc->mc_snum; - MDBX_node *s2; + MDBX_page *const psrc = csrc->mc_pg[csrc->mc_top]; + MDBX_page *const pdst = cdst->mc_pg[cdst->mc_top]; + mdbx_cassert(csrc, PAGETYPE(psrc) == PAGETYPE(pdst)); + mdbx_cassert(csrc, csrc->mc_dbi == cdst->mc_dbi); + if (unlikely(PAGETYPE(psrc) != PAGETYPE(pdst))) { + bailout: + csrc->mc_txn->mt_flags |= MDBX_TXN_ERROR; + return MDBX_PROBLEM; + } + + MDBX_val key4move; + switch (PAGETYPE(psrc)) { + case P_BRANCH: { + const MDBX_node *srcnode = NODEPTR(psrc, csrc->mc_ki[csrc->mc_top]); + mdbx_cassert(csrc, srcnode->mn_flags == 0); + const pgno_t srcpg = NODEPGNO(srcnode); + key4move.iov_len = NODEKSZ(srcnode); + key4move.iov_base = NODEKEY(srcnode); + if (csrc->mc_ki[csrc->mc_top] == 0) { + const uint16_t snum = csrc->mc_snum; + mdbx_cassert(csrc, snum > 0); /* must find the lowest key below src */ rc = mdbx_page_search_lowest(csrc); + MDBX_page *psrc2 = csrc->mc_pg[csrc->mc_top]; if (unlikely(rc)) return rc; - if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { - key.iov_len = csrc->mc_db->md_xsize; - key.iov_base = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.iov_len); + mdbx_cassert(csrc, IS_LEAF(psrc2)); + if (unlikely(!IS_LEAF(psrc2))) + goto bailout; + if (IS_LEAF2(psrc2)) { + key4move.iov_len = csrc->mc_db->md_xsize; + key4move.iov_base = LEAF2KEY(psrc2, 0, key4move.iov_len); } else { - s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0); + const MDBX_node *s2 = NODEPTR(psrc2, 0); + key4move.iov_len = NODEKSZ(s2); + key4move.iov_base = NODEKEY(s2); + } + csrc->mc_snum = snum; + csrc->mc_top = snum - 1; + csrc->mc_ki[csrc->mc_top] = 0; + /* paranoia */ + mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]); + mdbx_cassert(csrc, IS_BRANCH(psrc)); + if (unlikely(!IS_BRANCH(psrc))) + goto bailout; + } + + if (cdst->mc_ki[cdst->mc_top] == 0) { + const uint16_t snum = cdst->mc_snum; + mdbx_cassert(csrc, snum > 0); + MDBX_cursor mn; + mdbx_cursor_copy(cdst, &mn); + mn.mc_xcursor = NULL; + /* must find the lowest key below dst */ + rc = mdbx_page_search_lowest(&mn); + if (unlikely(rc)) + return rc; + MDBX_page *const pdst2 = mn.mc_pg[mn.mc_top]; + mdbx_cassert(cdst, IS_LEAF(pdst2)); + if (unlikely(!IS_LEAF(pdst2))) + goto bailout; + MDBX_val key; + if (IS_LEAF2(pdst2)) { + key.iov_len = mn.mc_db->md_xsize; + key.iov_base = LEAF2KEY(pdst2, 0, key.iov_len); + } else { + MDBX_node *s2 = NODEPTR(pdst2, 0); key.iov_len = NODEKSZ(s2); key.iov_base = NODEKEY(s2); } - mdbx_cassert(csrc, snum >= 1 && snum <= UINT16_MAX); - csrc->mc_snum = (uint16_t)snum--; - csrc->mc_top = (uint16_t)snum; - } else { - key.iov_len = NODEKSZ(srcnode); - key.iov_base = NODEKEY(srcnode); + mn.mc_snum = snum; + mn.mc_top = snum - 1; + mn.mc_ki[mn.mc_top] = 0; + rc = mdbx_update_key(&mn, &key); + if (unlikely(rc)) + return rc; } + + mdbx_debug("moving %s-node %u [%s] on page %" PRIaPGNO + " to node %u on page %" PRIaPGNO, + "branch", csrc->mc_ki[csrc->mc_top], DKEY(&key4move), + psrc->mp_pgno, cdst->mc_ki[cdst->mc_top], pdst->mp_pgno); + /* Add the node to the destination page. */ + rc = + mdbx_node_add_branch(cdst, cdst->mc_ki[cdst->mc_top], &key4move, srcpg); + } break; + + case P_LEAF: { + const MDBX_node *srcnode = NODEPTR(psrc, csrc->mc_ki[csrc->mc_top]); + MDBX_val data; data.iov_len = NODEDSZ(srcnode); data.iov_base = NODEDATA(srcnode); - } - mn.mc_xcursor = NULL; - if (IS_BRANCH(cdst->mc_pg[cdst->mc_top]) && cdst->mc_ki[cdst->mc_top] == 0) { - unsigned snum = cdst->mc_snum; - MDBX_node *s2; - MDBX_val bkey; - /* must find the lowest key below dst */ - mdbx_cursor_copy(cdst, &mn); - rc = mdbx_page_search_lowest(&mn); - if (unlikely(rc)) - return rc; - if (IS_LEAF2(mn.mc_pg[mn.mc_top])) { - bkey.iov_len = mn.mc_db->md_xsize; - bkey.iov_base = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, bkey.iov_len); - } else { - s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0); - bkey.iov_len = NODEKSZ(s2); - bkey.iov_base = NODEKEY(s2); - } - mdbx_cassert(csrc, snum >= 1 && snum <= UINT16_MAX); - mn.mc_snum = (uint16_t)snum--; - mn.mc_top = (uint16_t)snum; - mn.mc_ki[snum] = 0; - rc = mdbx_update_key(&mn, &bkey); - if (unlikely(rc)) - return rc; + key4move.iov_len = NODEKSZ(srcnode); + key4move.iov_base = NODEKEY(srcnode); + mdbx_debug("moving %s-node %u [%s] on page %" PRIaPGNO + " to node %u on page %" PRIaPGNO, + "leaf", csrc->mc_ki[csrc->mc_top], DKEY(&key4move), + psrc->mp_pgno, cdst->mc_ki[cdst->mc_top], pdst->mp_pgno); + /* Add the node to the destination page. */ + rc = mdbx_node_add_leaf(cdst, cdst->mc_ki[cdst->mc_top], &key4move, &data, + srcnode->mn_flags); + } break; + + case P_LEAF | P_LEAF2: { + key4move.iov_len = csrc->mc_db->md_xsize; + key4move.iov_base = + LEAF2KEY(psrc, csrc->mc_ki[csrc->mc_top], key4move.iov_len); + mdbx_debug("moving %s-node %u [%s] on page %" PRIaPGNO + " to node %u on page %" PRIaPGNO, + "leaf2", csrc->mc_ki[csrc->mc_top], DKEY(&key4move), + psrc->mp_pgno, cdst->mc_ki[cdst->mc_top], pdst->mp_pgno); + /* Add the node to the destination page. */ + rc = mdbx_node_add_leaf2(cdst, cdst->mc_ki[cdst->mc_top], &key4move); + } break; + + default: + goto bailout; } - mdbx_debug("moving %s node %u [%s] on page %" PRIaPGNO - " to node %u on page %" PRIaPGNO "", - IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch", - csrc->mc_ki[csrc->mc_top], DKEY(&key), - csrc->mc_pg[csrc->mc_top]->mp_pgno, cdst->mc_ki[cdst->mc_top], - cdst->mc_pg[cdst->mc_top]->mp_pgno); - - /* Add the node to the destination page. */ - rc = mdbx_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, - IS_LEAF(cdst->mc_pg[cdst->mc_top]) ? &data : NULL, srcpg, - flags); if (unlikely(rc != MDBX_SUCCESS)) return rc; /* Delete the node from the source page. */ - mdbx_node_del(csrc, key.iov_len); + mdbx_node_del(csrc, key4move.iov_len); + + mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]); + mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]); { /* Adjust other cursors pointing to mp */ MDBX_cursor *m2, *m3; - MDBX_dbi dbi = csrc->mc_dbi; - MDBX_page *mpd, *mps; - - mps = csrc->mc_pg[csrc->mc_top]; - /* If we're adding on the left, bump others up */ + const MDBX_dbi dbi = csrc->mc_dbi; if (fromleft) { - mpd = cdst->mc_pg[csrc->mc_top]; + /* If we're adding on the left, bump others up */ for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) { - if (csrc->mc_flags & C_SUB) - m3 = &m2->mc_xcursor->mx_cursor; - else - m3 = m2; + m3 = (csrc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2; if (!(m3->mc_flags & C_INITIALIZED) || m3->mc_top < csrc->mc_top) continue; - if (m3 != cdst && m3->mc_pg[csrc->mc_top] == mpd && + if (m3 != cdst && m3->mc_pg[csrc->mc_top] == pdst && m3->mc_ki[csrc->mc_top] >= cdst->mc_ki[csrc->mc_top]) { m3->mc_ki[csrc->mc_top]++; } - if (m3 != csrc && m3->mc_pg[csrc->mc_top] == mps && + if (m3 != csrc && m3->mc_pg[csrc->mc_top] == psrc && m3->mc_ki[csrc->mc_top] == csrc->mc_ki[csrc->mc_top]) { - m3->mc_pg[csrc->mc_top] = cdst->mc_pg[cdst->mc_top]; + m3->mc_pg[csrc->mc_top] = pdst; m3->mc_ki[csrc->mc_top] = cdst->mc_ki[cdst->mc_top]; + mdbx_cassert(csrc, csrc->mc_top > 0); m3->mc_ki[csrc->mc_top - 1]++; } - if (XCURSOR_INITED(m3) && IS_LEAF(mps)) + if (XCURSOR_INITED(m3) && IS_LEAF(psrc)) XCURSOR_REFRESH(m3, m3->mc_pg[csrc->mc_top], m3->mc_ki[csrc->mc_top]); } - } else - /* Adding on the right, bump others down */ - { + } else { + /* Adding on the right, bump others down */ for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) { - if (csrc->mc_flags & C_SUB) - m3 = &m2->mc_xcursor->mx_cursor; - else - m3 = m2; + m3 = (csrc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2; if (m3 == csrc) continue; if (!(m3->mc_flags & C_INITIALIZED) || m3->mc_top < csrc->mc_top) continue; - if (m3->mc_pg[csrc->mc_top] == mps) { + if (m3->mc_pg[csrc->mc_top] == psrc) { if (!m3->mc_ki[csrc->mc_top]) { - m3->mc_pg[csrc->mc_top] = cdst->mc_pg[cdst->mc_top]; + m3->mc_pg[csrc->mc_top] = pdst; m3->mc_ki[csrc->mc_top] = cdst->mc_ki[cdst->mc_top]; + mdbx_cassert(csrc, csrc->mc_top > 0); m3->mc_ki[csrc->mc_top - 1]--; } else { m3->mc_ki[csrc->mc_top]--; } - if (XCURSOR_INITED(m3) && IS_LEAF(mps)) + if (XCURSOR_INITED(m3) && IS_LEAF(psrc)) XCURSOR_REFRESH(m3, m3->mc_pg[csrc->mc_top], m3->mc_ki[csrc->mc_top]); } @@ -9476,17 +9695,23 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) { /* Update the parent separators. */ if (csrc->mc_ki[csrc->mc_top] == 0) { + mdbx_cassert(csrc, csrc->mc_top > 0); if (csrc->mc_ki[csrc->mc_top - 1] != 0) { - if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { - key.iov_base = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.iov_len); + MDBX_val key; + if (IS_LEAF2(psrc)) { + key.iov_len = psrc->mp_leaf2_ksize; + key.iov_base = LEAF2KEY(psrc, 0, key.iov_len); } else { - srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0); + MDBX_node *srcnode = NODEPTR(psrc, 0); key.iov_len = NODEKSZ(srcnode); key.iov_base = NODEKEY(srcnode); } mdbx_debug("update separator for source page %" PRIaPGNO " to [%s]", - csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key)); + psrc->mp_pgno, DKEY(&key)); + MDBX_cursor mn; mdbx_cursor_copy(csrc, &mn); + mn.mc_xcursor = NULL; + mdbx_cassert(csrc, mn.mc_snum > 0); mn.mc_snum--; mn.mc_top--; /* We want mdbx_rebalance to find mn when doing fixups */ @@ -9494,10 +9719,9 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) { if (unlikely(rc != MDBX_SUCCESS)) return rc; } - if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) { - MDBX_val nullkey; - indx_t ix = csrc->mc_ki[csrc->mc_top]; - nullkey.iov_len = 0; + if (IS_BRANCH(psrc)) { + const MDBX_val nullkey = {0, 0}; + const indx_t ix = csrc->mc_ki[csrc->mc_top]; csrc->mc_ki[csrc->mc_top] = 0; rc = mdbx_update_key(csrc, &nullkey); csrc->mc_ki[csrc->mc_top] = ix; @@ -9506,17 +9730,23 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) { } if (cdst->mc_ki[cdst->mc_top] == 0) { + mdbx_cassert(cdst, cdst->mc_top > 0); if (cdst->mc_ki[cdst->mc_top - 1] != 0) { - if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { - key.iov_base = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, key.iov_len); + MDBX_val key; + if (IS_LEAF2(psrc)) { + key.iov_len = pdst->mp_leaf2_ksize; + key.iov_base = LEAF2KEY(pdst, 0, key.iov_len); } else { - srcnode = NODEPTR(cdst->mc_pg[cdst->mc_top], 0); + MDBX_node *srcnode = NODEPTR(pdst, 0); key.iov_len = NODEKSZ(srcnode); key.iov_base = NODEKEY(srcnode); } mdbx_debug("update separator for destination page %" PRIaPGNO " to [%s]", - cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key)); + pdst->mp_pgno, DKEY(&key)); + MDBX_cursor mn; mdbx_cursor_copy(cdst, &mn); + mn.mc_xcursor = NULL; + mdbx_cassert(cdst, mn.mc_snum > 0); mn.mc_snum--; mn.mc_top--; /* We want mdbx_rebalance to find mn when doing fixups */ @@ -9524,10 +9754,9 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) { if (unlikely(rc != MDBX_SUCCESS)) return rc; } - if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) { - MDBX_val nullkey; - indx_t ix = cdst->mc_ki[cdst->mc_top]; - nullkey.iov_len = 0; + if (IS_BRANCH(pdst)) { + const MDBX_val nullkey = {0, 0}; + const indx_t ix = cdst->mc_ki[cdst->mc_top]; cdst->mc_ki[cdst->mc_top] = 0; rc = mdbx_update_key(cdst, &nullkey); cdst->mc_ki[cdst->mc_top] = ix; @@ -9548,47 +9777,41 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) { * * Returns 0 on success, non-zero on failure. */ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { - MDBX_page *psrc, *pdst; MDBX_node *srcnode; - MDBX_val key, data; - unsigned nkeys; + MDBX_val key; int rc; - unsigned i, j; - - psrc = csrc->mc_pg[csrc->mc_top]; - pdst = cdst->mc_pg[cdst->mc_top]; - - mdbx_debug("merging page %" PRIaPGNO " into %" PRIaPGNO "", psrc->mp_pgno, - pdst->mp_pgno); - - mdbx_cassert(csrc, IS_LEAF(psrc) == IS_LEAF(pdst)); - mdbx_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */ - mdbx_cassert(csrc, cdst->mc_snum > 1); /* Mark dst as dirty. */ if (unlikely(rc = mdbx_page_touch(cdst))) return rc; - /* get dst page again now that we've touched it. */ - pdst = cdst->mc_pg[cdst->mc_top]; + MDBX_page *const psrc = csrc->mc_pg[csrc->mc_top]; + MDBX_page *const pdst = cdst->mc_pg[cdst->mc_top]; + + mdbx_debug("merging page %" PRIaPGNO " into %" PRIaPGNO "", psrc->mp_pgno, + pdst->mp_pgno); + + mdbx_cassert(csrc, PAGETYPE(psrc) == PAGETYPE(pdst)); + mdbx_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */ + mdbx_cassert(cdst, cdst->mc_snum > 1); /* Move all nodes from src to dst */ - j = nkeys = NUMKEYS(pdst); + const unsigned nkeys = NUMKEYS(pdst); + unsigned j = nkeys; if (IS_LEAF2(psrc)) { key.iov_len = csrc->mc_db->md_xsize; key.iov_base = PAGEDATA(psrc); - for (i = 0; i < NUMKEYS(psrc); i++, j++) { - rc = mdbx_node_add(cdst, j, &key, NULL, 0, 0); + for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) { + rc = mdbx_node_add_leaf2(cdst, j, &key); if (unlikely(rc != MDBX_SUCCESS)) return rc; key.iov_base = (char *)key.iov_base + key.iov_len; } } else { - for (i = 0; i < NUMKEYS(psrc); i++, j++) { + for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) { srcnode = NODEPTR(psrc, i); if (i == 0 && IS_BRANCH(psrc)) { MDBX_cursor mn; - MDBX_node *s2; mdbx_cursor_copy(csrc, &mn); mn.mc_xcursor = NULL; /* must find the lowest key below src */ @@ -9599,7 +9822,7 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { key.iov_len = mn.mc_db->md_xsize; key.iov_base = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, key.iov_len); } else { - s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0); + MDBX_node *s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0); key.iov_len = NODEKSZ(s2); key.iov_base = NODEKEY(s2); } @@ -9608,14 +9831,15 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { key.iov_base = NODEKEY(srcnode); } - MDBX_val *pdata = NULL; if (IS_LEAF(psrc)) { + MDBX_val data; data.iov_len = NODEDSZ(srcnode); data.iov_base = NODEDATA(srcnode); - pdata = &data; + rc = mdbx_node_add_leaf(cdst, j, &key, &data, srcnode->mn_flags); + } else { + mdbx_cassert(csrc, srcnode->mn_flags == 0); + rc = mdbx_node_add_branch(cdst, j, &key, NODEPGNO(srcnode)); } - rc = mdbx_node_add(cdst, j, &key, pdata, NODEPGNO(srcnode), - srcnode->mn_flags); if (unlikely(rc != MDBX_SUCCESS)) return rc; } @@ -9625,12 +9849,15 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { pdst->mp_pgno, NUMKEYS(pdst), (float)PAGEFILL(cdst->mc_txn->mt_env, pdst) / 10); + mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]); + mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]); + /* Unlink the src page from parent and add to free list. */ csrc->mc_top--; mdbx_node_del(csrc, 0); if (csrc->mc_ki[csrc->mc_top] == 0) { - key.iov_len = 0; - rc = mdbx_update_key(csrc, &key); + const MDBX_val nullkey = {0, 0}; + rc = mdbx_update_key(csrc, &nullkey); if (unlikely(rc)) { csrc->mc_top++; return rc; @@ -9638,7 +9865,6 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) { } csrc->mc_top++; - psrc = csrc->mc_pg[csrc->mc_top]; /* If not operating on FreeDB, allow this page to be reused * in this txn. Otherwise just add to free list. */ rc = mdbx_page_loose(csrc, psrc); @@ -9856,8 +10082,8 @@ static int mdbx_rebalance(MDBX_cursor *mc) { rc = mdbx_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL); if (unlikely(rc)) return rc; - mdbx_cassert(mc, IS_LEAF(mn.mc_pg[mn.mc_top]) == - IS_LEAF(mc->mc_pg[mc->mc_top])); + mdbx_cassert(mc, PAGETYPE(mn.mc_pg[mn.mc_top]) == + PAGETYPE(mc->mc_pg[mc->mc_top])); mn.mc_ki[mn.mc_top] = 0; mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]); fromleft = 0; @@ -9869,8 +10095,8 @@ static int mdbx_rebalance(MDBX_cursor *mc) { rc = mdbx_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL); if (unlikely(rc)) return rc; - mdbx_cassert(mc, IS_LEAF(mn.mc_pg[mn.mc_top]) == - IS_LEAF(mc->mc_pg[mc->mc_top])); + mdbx_cassert(mc, PAGETYPE(mn.mc_pg[mn.mc_top]) == + PAGETYPE(mc->mc_pg[mc->mc_top])); mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1; mc->mc_ki[mc->mc_top] = 0; fromleft = 1; @@ -10127,7 +10353,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, new_root = mc->mc_db->md_depth++; /* Add left (implicit) pointer. */ - if (unlikely((rc = mdbx_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != + if (unlikely((rc = mdbx_node_add_branch(mc, 0, NULL, mp->mp_pgno)) != MDBX_SUCCESS)) { /* undo the pre-push */ mc->mc_pg[0] = mc->mc_pg[1]; @@ -10333,7 +10559,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, } } else { mn.mc_top--; - rc = mdbx_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0); + rc = mdbx_node_add_branch(&mn, mn.mc_ki[ptop], &sepkey, rp->mp_pgno); mn.mc_top++; } if (unlikely(rc != MDBX_SUCCESS)) { @@ -10346,7 +10572,23 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, if (nflags & MDBX_APPEND) { mc->mc_pg[mc->mc_top] = rp; mc->mc_ki[mc->mc_top] = 0; - rc = mdbx_node_add(mc, 0, newkey, newdata, newpgno, nflags); + switch (PAGETYPE(rp)) { + case P_BRANCH: { + mdbx_cassert(mc, nflags == 0); + rc = mdbx_node_add_branch(mc, 0, newkey, newpgno); + } break; + case P_LEAF: { + mdbx_cassert(mc, newpgno == 0); + rc = mdbx_node_add_leaf(mc, 0, newkey, newdata, nflags); + } break; + case P_LEAF | P_LEAF2: { + mdbx_cassert(mc, nflags == 0); + mdbx_cassert(mc, newpgno == 0); + rc = mdbx_node_add_leaf2(mc, 0, newkey); + } break; + default: + rc = MDBX_CORRUPTED; + } if (rc) goto done; for (i = 0; i < mc->mc_top; i++) @@ -10381,14 +10623,30 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey, flags = node->mn_flags; } - if (!IS_LEAF(mp) && n == 0) { - /* First branch index doesn't need key data. */ - rkey.iov_len = 0; + switch (PAGETYPE(rp)) { + case P_BRANCH: { + mdbx_cassert(mc, flags == 0); + if (n == 0) { + /* First branch index doesn't need key data. */ + rkey.iov_len = 0; + } + rc = mdbx_node_add_branch(mc, n, &rkey, pgno); + } break; + case P_LEAF: { + mdbx_cassert(mc, pgno == 0); + rc = mdbx_node_add_leaf(mc, n, &rkey, rdata, flags); + } break; + /* case P_LEAF | P_LEAF2: { + mdbx_cassert(mc, flags == 0); + mdbx_cassert(mc, gno == 0); + rc = mdbx_node_add_leaf2(mc, n, &rkey); + } break; */ + default: + rc = MDBX_CORRUPTED; } - - rc = mdbx_node_add(mc, n, &rkey, rdata, pgno, flags); if (rc) goto done; + if (i == nkeys) { i = 0; n = 0;