mdbx: rework move-node and split add-node.

This commit is contained in:
Leo Yuriev 2018-09-06 17:10:59 +03:00
parent 771ac1928b
commit 6206b67d32
2 changed files with 456 additions and 198 deletions

View File

@ -1058,7 +1058,7 @@ static __inline unsigned mdbx_log2(size_t value) {
#define NUMKEYS(p) ((unsigned)(p)->mp_lower >> 1)
/* The amount of space remaining in the page */
#define SIZELEFT(p) (indx_t)((p)->mp_upper - (p)->mp_lower)
#define SIZELEFT(p) ((indx_t)((p)->mp_upper - (p)->mp_lower))
/* The percentage of space used in the page, in tenths of a percent. */
#define PAGEFILL(env, p) \

View File

@ -1072,10 +1072,20 @@ static int __must_check_result mdbx_sync_locked(MDBX_env *env, unsigned flags,
static void mdbx_env_close0(MDBX_env *env);
static MDBX_node *mdbx_node_search(MDBX_cursor *mc, MDBX_val *key, int *exactp);
static int __must_check_result mdbx_node_add(MDBX_cursor *mc, unsigned indx,
const MDBX_val *key,
MDBX_val *data, pgno_t pgno,
unsigned flags);
static int __must_check_result mdbx_node_add_branch(MDBX_cursor *mc,
unsigned indx,
const MDBX_val *key,
pgno_t pgno);
static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
unsigned indx,
const MDBX_val *key,
MDBX_val *data,
unsigned flags);
static int __must_check_result mdbx_node_add_leaf2(MDBX_cursor *mc,
unsigned indx,
const MDBX_val *key);
static void mdbx_node_del(MDBX_cursor *mc, size_t ksize);
static void mdbx_node_shrink(MDBX_page *mp, unsigned indx);
static int __must_check_result mdbx_node_move(MDBX_cursor *csrc,
@ -5242,7 +5252,7 @@ int __cold mdbx_env_get_maxkeysize(MDBX_env *env) {
}
#define mdbx_nodemax(pagesize) \
(((((pagesize)-PAGEHDRSZ) / MDBX_MINKEYS) & -(intptr_t)2) - sizeof(indx_t))
(((((pagesize)-PAGEHDRSZ) / MDBX_MINKEYS) & ~(uintptr_t)1) - sizeof(indx_t))
#define mdbx_maxkey(nodemax) (((nodemax)-NODESIZE - sizeof(MDBX_db)) / 2)
@ -5267,7 +5277,7 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) {
STATIC_ASSERT(mdbx_nodemax(MIN_PAGESIZE) > 42);
STATIC_ASSERT(mdbx_nodemax(MAX_PAGESIZE) < UINT16_MAX);
const intptr_t nodemax = mdbx_nodemax(pagesize);
mdbx_ensure(env, nodemax > 42 && nodemax < UINT16_MAX);
mdbx_ensure(env, nodemax > 42 && nodemax < UINT16_MAX && nodemax % 2 == 0);
env->me_nodemax = (unsigned)nodemax;
STATIC_ASSERT(mdbx_maxkey(MIN_PAGESIZE) > 42);
@ -5275,7 +5285,8 @@ static void __cold mdbx_setup_pagesize(MDBX_env *env, const size_t pagesize) {
STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) > 42);
STATIC_ASSERT(mdbx_maxkey(MAX_PAGESIZE) < MAX_PAGESIZE);
const intptr_t maxkey_limit = mdbx_maxkey(env->me_nodemax);
mdbx_ensure(env, maxkey_limit > 42 && (size_t)maxkey_limit < pagesize);
mdbx_ensure(env, maxkey_limit > 42 && (size_t)maxkey_limit < pagesize &&
maxkey_limit % 2 == 0);
env->me_maxkey_limit = (unsigned)maxkey_limit;
env->me_psize2log = mdbx_log2(pagesize);
@ -8293,8 +8304,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
memcpy(olddata.iov_base, data->iov_base, data->iov_len);
else {
mdbx_cassert(mc, NUMKEYS(mc->mc_pg[mc->mc_top]) == 1);
mdbx_cassert(mc, IS_LEAF(mc->mc_pg[mc->mc_top]) &&
!IS_LEAF2(mc->mc_pg[mc->mc_top]));
mdbx_cassert(mc, PAGETYPE(mc->mc_pg[mc->mc_top]) == P_LEAF);
mdbx_cassert(mc, NODEDSZ(leaf) == 0);
mdbx_cassert(mc, leaf->mn_flags == 0);
mdbx_cassert(mc, key->iov_len < UINT16_MAX);
@ -8323,7 +8333,7 @@ new_sub:
rc = mdbx_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
/* There is room already in this leaf page. */
rc = mdbx_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
rc = mdbx_node_add_leaf(mc, mc->mc_ki[mc->mc_top], key, rdata, nflags);
if (likely(rc == 0)) {
/* Adjust other cursors pointing to mp */
MDBX_cursor *m2, *m3;
@ -8636,12 +8646,10 @@ static int mdbx_page_new(MDBX_cursor *mc, unsigned flags, unsigned num,
* Returns The number of bytes needed to store the node. */
static __inline size_t mdbx_leaf_size(MDBX_env *env, const MDBX_val *key,
const MDBX_val *data) {
size_t sz;
sz = LEAFSIZE(key, data);
size_t sz = LEAFSIZE(key, data);
if (sz > env->me_nodemax) {
/* put on overflow page */
sz -= data->iov_len - sizeof(pgno_t);
sz = sz - data->iov_len + sizeof(pgno_t);
}
return EVEN(sz + sizeof(indx_t));
@ -8660,20 +8668,223 @@ static __inline size_t mdbx_leaf_size(MDBX_env *env, const MDBX_val *key,
*
* Returns The number of bytes needed to store the node. */
static __inline size_t mdbx_branch_size(MDBX_env *env, const MDBX_val *key) {
size_t sz;
sz = INDXSIZE(key);
size_t sz = INDXSIZE(key);
if (unlikely(sz > env->me_nodemax)) {
/* put on overflow page */
/* not implemented */
mdbx_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __FUNCTION__,
__LINE__);
sz -= key->iov_len - sizeof(pgno_t);
sz = sz - key->iov_len + sizeof(pgno_t);
}
return sz + sizeof(indx_t);
}
static int __must_check_result mdbx_node_add_leaf2(MDBX_cursor *mc,
unsigned indx,
const MDBX_val *key) {
MDBX_page *mp = mc->mc_pg[mc->mc_top];
DKBUF;
mdbx_debug("add to leaf2-%spage %" PRIaPGNO " index %i, "
" key size %" PRIuPTR " [%s]",
IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx,
key ? key->iov_len : 0, DKEY(key));
mdbx_cassert(mc, key);
mdbx_cassert(mc, PAGETYPE(mp) == (P_LEAF | P_LEAF2));
const unsigned ksize = mc->mc_db->md_xsize;
mdbx_cassert(mc, ksize == key->iov_len);
const int room = SIZELEFT(mp);
const int entry_size = ksize + sizeof(indx_t);
mdbx_cassert(mc, room >= entry_size);
if (unlikely(room < entry_size)) {
bailout:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return MDBX_PAGE_FULL;
}
char *const ptr = LEAF2KEY(mp, indx, ksize);
mdbx_cassert(mc, NUMKEYS(mp) >= indx);
if (unlikely(NUMKEYS(mp) < indx))
goto bailout;
const unsigned diff = NUMKEYS(mp) - indx;
if (likely(diff > 0))
/* Move higher keys up one slot. */
memmove(ptr + ksize, ptr, diff * ksize);
/* insert new key */
memcpy(ptr, key->iov_base, ksize);
/* Just using these for counting */
mdbx_cassert(mc, UINT16_MAX - mp->mp_lower >= (int)sizeof(indx_t));
mp->mp_lower += sizeof(indx_t);
mdbx_cassert(mc, mp->mp_upper >= ksize - sizeof(indx_t));
mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t));
mdbx_cassert(mc,
mp->mp_upper >= mp->mp_lower &&
PAGEHDRSZ + mp->mp_upper <= mc->mc_txn->mt_env->me_psize);
return MDBX_SUCCESS;
}
static int __must_check_result mdbx_node_add_branch(MDBX_cursor *mc,
unsigned indx,
const MDBX_val *key,
pgno_t pgno) {
MDBX_page *mp = mc->mc_pg[mc->mc_top];
DKBUF;
mdbx_debug("add to branch-%spage %" PRIaPGNO " index %i, node-pgno %" PRIaPGNO
" key size %" PRIuPTR " [%s]",
IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, pgno,
key ? key->iov_len : 0, DKEY(key));
mdbx_cassert(mc, PAGETYPE(mp) == P_BRANCH);
STATIC_ASSERT(NODESIZE % 2 == 0);
const size_t room = SIZELEFT(mp);
const size_t node_size =
likely(key != NULL) ? NODESIZE + EVEN(key->iov_len) : NODESIZE;
mdbx_cassert(mc, mdbx_branch_size(mc->mc_txn->mt_env, key) ==
node_size + sizeof(indx_t));
mdbx_cassert(mc, room >= node_size + sizeof(indx_t));
if (unlikely(room < node_size + sizeof(indx_t))) {
bailout:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return MDBX_PAGE_FULL;
}
const unsigned numkeys = NUMKEYS(mp);
mdbx_cassert(mc, numkeys >= indx);
if (unlikely(numkeys < indx))
goto bailout;
/* Move higher pointers up one slot. */
for (unsigned i = numkeys; i > indx; --i)
mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
/* Adjust free space offsets. */
const size_t ofs = mp->mp_upper - node_size;
mdbx_cassert(mc, ofs >= mp->mp_lower + sizeof(indx_t));
mdbx_cassert(mc, ofs <= UINT16_MAX);
mp->mp_ptrs[indx] = (uint16_t)ofs;
mp->mp_upper = (uint16_t)ofs;
mp->mp_lower += sizeof(indx_t);
/* Write the node data. */
MDBX_node *node = NODEPTR(mp, indx);
SETPGNO(node, pgno);
node->mn_ksize = 0;
node->mn_flags = 0;
if (likely(key != NULL)) {
node->mn_ksize = (uint16_t)key->iov_len;
memcpy(NODEKEY(node), key->iov_base, key->iov_len);
}
mdbx_cassert(mc,
mp->mp_upper >= mp->mp_lower &&
PAGEHDRSZ + mp->mp_upper <= mc->mc_txn->mt_env->me_psize);
return MDBX_SUCCESS;
}
static int __must_check_result mdbx_node_add_leaf(MDBX_cursor *mc,
unsigned indx,
const MDBX_val *key,
MDBX_val *data,
unsigned flags) {
MDBX_page *mp = mc->mc_pg[mc->mc_top];
DKBUF;
mdbx_debug("add to leaf-%spage %" PRIaPGNO " index %i, data size %" PRIuPTR
" key size %" PRIuPTR " [%s]",
IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx,
data ? data->iov_len : 0, key ? key->iov_len : 0, DKEY(key));
mdbx_cassert(mc, key != NULL && data != NULL);
mdbx_cassert(mc, PAGETYPE(mp) == P_LEAF);
MDBX_page *largepage = NULL;
const size_t room = SIZELEFT(mp);
size_t node_size = NODESIZE + key->iov_len;
if (unlikely(flags & F_BIGDATA)) {
/* Data already on overflow page. */
STATIC_ASSERT(sizeof(pgno_t) % 2 == 0);
node_size += sizeof(pgno_t);
} else if (unlikely(node_size + data->iov_len >
mc->mc_txn->mt_env->me_nodemax)) {
const pgno_t ovpages = OVPAGES(mc->mc_txn->mt_env, data->iov_len);
/* Put data on overflow page. */
mdbx_debug("data size is %" PRIuPTR ", node would be %" PRIuPTR
", put data on %u-overflow page(s)",
data->iov_len, node_size + data->iov_len, ovpages);
int rc = mdbx_page_new(mc, P_OVERFLOW, ovpages, &largepage);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
mdbx_debug("allocated overflow page %" PRIaPGNO "", largepage->mp_pgno);
flags |= F_BIGDATA;
node_size += sizeof(pgno_t);
mdbx_cassert(mc, mdbx_leaf_size(mc->mc_txn->mt_env, key, data) ==
EVEN(node_size) + sizeof(indx_t));
} else {
node_size += data->iov_len;
mdbx_cassert(mc, mdbx_leaf_size(mc->mc_txn->mt_env, key, data) ==
EVEN(node_size) + sizeof(indx_t));
}
node_size = EVEN(node_size);
mdbx_cassert(mc, room >= node_size + sizeof(indx_t));
if (unlikely(room < node_size + sizeof(indx_t))) {
bailout:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return MDBX_PAGE_FULL;
}
const unsigned numkeys = NUMKEYS(mp);
mdbx_cassert(mc, numkeys >= indx);
if (unlikely(numkeys < indx))
goto bailout;
/* Move higher pointers up one slot. */
for (unsigned i = numkeys; i > indx; --i)
mp->mp_ptrs[i] = mp->mp_ptrs[i - 1];
/* Adjust free space offsets. */
const size_t ofs = mp->mp_upper - node_size;
mdbx_cassert(mc, ofs >= mp->mp_lower + sizeof(indx_t));
mdbx_cassert(mc, ofs <= UINT16_MAX);
mp->mp_ptrs[indx] = (uint16_t)ofs;
mp->mp_upper = (uint16_t)ofs;
mp->mp_lower += sizeof(indx_t);
/* Write the node data. */
MDBX_node *node = NODEPTR(mp, indx);
node->mn_ksize = (uint16_t)key->iov_len;
node->mn_flags = (uint16_t)flags;
SETDSZ(node, data->iov_len);
memcpy(NODEKEY(node), key->iov_base, key->iov_len);
void *nodedata = NODEDATA(node);
if (likely(largepage == NULL)) {
if (unlikely(flags & F_BIGDATA))
memcpy(nodedata, data->iov_base, sizeof(pgno_t));
else if (unlikely(flags & MDBX_RESERVE))
data->iov_base = nodedata;
else if (likely(nodedata != data->iov_base))
memcpy(nodedata, data->iov_base, data->iov_len);
} else {
memcpy(nodedata, &largepage->mp_pgno, sizeof(pgno_t));
nodedata = PAGEDATA(largepage);
if (unlikely(flags & MDBX_RESERVE))
data->iov_base = nodedata;
else if (likely(nodedata != data->iov_base))
memcpy(nodedata, data->iov_base, data->iov_len);
}
mdbx_cassert(mc,
mp->mp_upper >= mp->mp_lower &&
PAGEHDRSZ + mp->mp_upper <= mc->mc_txn->mt_env->me_psize);
return MDBX_SUCCESS;
}
#if 0
/* Add a node to the page pointed to by the cursor.
* Set MDBX_TXN_ERROR on failure.
*
@ -8699,35 +8910,16 @@ static int mdbx_node_add(MDBX_cursor *mc, unsigned indx, const MDBX_val *key,
MDBX_page *mp = mc->mc_pg[mc->mc_top];
MDBX_page *ofp = NULL; /* overflow page */
void *ndata;
DKBUF;
mdbx_cassert(mc, mp->mp_upper >= mp->mp_lower);
DKBUF;
mdbx_debug("add to %s %spage %" PRIaPGNO " index %i, data size %" PRIuPTR
" key size %" PRIuPTR " [%s]",
IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno, indx, data ? data->iov_len : 0,
key ? key->iov_len : 0, DKEY(key));
if (IS_LEAF2(mp)) {
mdbx_cassert(mc, key);
/* Move higher keys up one slot. */
const int ksize = mc->mc_db->md_xsize;
char *const ptr = LEAF2KEY(mp, indx, ksize);
const int diff = NUMKEYS(mp) - indx;
if (diff > 0)
memmove(ptr + ksize, ptr, diff * ksize);
/* insert new key */
memcpy(ptr, key->iov_base, ksize);
/* Just using these for counting */
mdbx_cassert(mc, UINT16_MAX - mp->mp_lower >= (int)sizeof(indx_t));
mp->mp_lower += sizeof(indx_t);
mdbx_cassert(mc, mp->mp_upper >= ksize - sizeof(indx_t));
mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t));
return MDBX_SUCCESS;
}
room = (intptr_t)SIZELEFT(mp) - (intptr_t)sizeof(indx_t);
if (key != NULL)
node_size += key->iov_len;
@ -8787,7 +8979,7 @@ update:
if (IS_LEAF(mp)) {
ndata = NODEDATA(node);
if (unlikely(ofp == NULL)) {
if (likely(ofp == NULL)) {
if (unlikely(F_ISSET(flags, F_BIGDATA)))
memcpy(ndata, data->iov_base, sizeof(pgno_t));
else if (F_ISSET(flags, MDBX_RESERVE))
@ -8815,6 +9007,7 @@ full:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return MDBX_PAGE_FULL;
}
#endif
/* Delete the specified node from a page.
* [in] mc Cursor pointing to the node to delete.
@ -9278,7 +9471,7 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) {
/* Shift node contents if EVEN(key length) changed. */
if (delta) {
if (delta > 0 && SIZELEFT(mp) < delta) {
if (SIZELEFT(mp) < delta) {
pgno_t pgno;
/* not enough space left, do a delete and split */
mdbx_debug("Not enough room, delta = %d, splitting...", delta);
@ -9308,9 +9501,7 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) {
if (node->mn_ksize != key->iov_len)
node->mn_ksize = (uint16_t)key->iov_len;
if (key->iov_len)
memcpy(NODEKEY(node), key->iov_base, key->iov_len);
memcpy(NODEKEY(node), key->iov_base, key->iov_len);
return MDBX_SUCCESS;
}
@ -9318,155 +9509,183 @@ static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst);
/* Move a node from csrc to cdst. */
static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) {
MDBX_node *srcnode;
MDBX_val key, data;
pgno_t srcpg;
MDBX_cursor mn;
int rc;
unsigned flags;
DKBUF;
mdbx_cassert(csrc, IS_LEAF(csrc->mc_pg[csrc->mc_top]) ==
IS_LEAF(cdst->mc_pg[cdst->mc_top]));
/* Mark src and dst as dirty. */
if (unlikely((rc = mdbx_page_touch(csrc)) || (rc = mdbx_page_touch(cdst))))
return rc;
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.iov_len = csrc->mc_db->md_xsize;
key.iov_base = LEAF2KEY(csrc->mc_pg[csrc->mc_top],
csrc->mc_ki[csrc->mc_top], key.iov_len);
data.iov_len = 0;
data.iov_base = NULL;
srcpg = 0;
flags = 0;
} else {
srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
mdbx_cassert(csrc, !((size_t)srcnode & 1));
srcpg = NODEPGNO(srcnode);
flags = srcnode->mn_flags;
if (csrc->mc_ki[csrc->mc_top] == 0 &&
IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
unsigned snum = csrc->mc_snum;
MDBX_node *s2;
MDBX_page *const psrc = csrc->mc_pg[csrc->mc_top];
MDBX_page *const pdst = cdst->mc_pg[cdst->mc_top];
mdbx_cassert(csrc, PAGETYPE(psrc) == PAGETYPE(pdst));
mdbx_cassert(csrc, csrc->mc_dbi == cdst->mc_dbi);
if (unlikely(PAGETYPE(psrc) != PAGETYPE(pdst))) {
bailout:
csrc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return MDBX_PROBLEM;
}
MDBX_val key4move;
switch (PAGETYPE(psrc)) {
case P_BRANCH: {
const MDBX_node *srcnode = NODEPTR(psrc, csrc->mc_ki[csrc->mc_top]);
mdbx_cassert(csrc, srcnode->mn_flags == 0);
const pgno_t srcpg = NODEPGNO(srcnode);
key4move.iov_len = NODEKSZ(srcnode);
key4move.iov_base = NODEKEY(srcnode);
if (csrc->mc_ki[csrc->mc_top] == 0) {
const uint16_t snum = csrc->mc_snum;
mdbx_cassert(csrc, snum > 0);
/* must find the lowest key below src */
rc = mdbx_page_search_lowest(csrc);
MDBX_page *psrc2 = csrc->mc_pg[csrc->mc_top];
if (unlikely(rc))
return rc;
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.iov_len = csrc->mc_db->md_xsize;
key.iov_base = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.iov_len);
mdbx_cassert(csrc, IS_LEAF(psrc2));
if (unlikely(!IS_LEAF(psrc2)))
goto bailout;
if (IS_LEAF2(psrc2)) {
key4move.iov_len = csrc->mc_db->md_xsize;
key4move.iov_base = LEAF2KEY(psrc2, 0, key4move.iov_len);
} else {
s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
const MDBX_node *s2 = NODEPTR(psrc2, 0);
key4move.iov_len = NODEKSZ(s2);
key4move.iov_base = NODEKEY(s2);
}
csrc->mc_snum = snum;
csrc->mc_top = snum - 1;
csrc->mc_ki[csrc->mc_top] = 0;
/* paranoia */
mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]);
mdbx_cassert(csrc, IS_BRANCH(psrc));
if (unlikely(!IS_BRANCH(psrc)))
goto bailout;
}
if (cdst->mc_ki[cdst->mc_top] == 0) {
const uint16_t snum = cdst->mc_snum;
mdbx_cassert(csrc, snum > 0);
MDBX_cursor mn;
mdbx_cursor_copy(cdst, &mn);
mn.mc_xcursor = NULL;
/* must find the lowest key below dst */
rc = mdbx_page_search_lowest(&mn);
if (unlikely(rc))
return rc;
MDBX_page *const pdst2 = mn.mc_pg[mn.mc_top];
mdbx_cassert(cdst, IS_LEAF(pdst2));
if (unlikely(!IS_LEAF(pdst2)))
goto bailout;
MDBX_val key;
if (IS_LEAF2(pdst2)) {
key.iov_len = mn.mc_db->md_xsize;
key.iov_base = LEAF2KEY(pdst2, 0, key.iov_len);
} else {
MDBX_node *s2 = NODEPTR(pdst2, 0);
key.iov_len = NODEKSZ(s2);
key.iov_base = NODEKEY(s2);
}
mdbx_cassert(csrc, snum >= 1 && snum <= UINT16_MAX);
csrc->mc_snum = (uint16_t)snum--;
csrc->mc_top = (uint16_t)snum;
} else {
key.iov_len = NODEKSZ(srcnode);
key.iov_base = NODEKEY(srcnode);
mn.mc_snum = snum;
mn.mc_top = snum - 1;
mn.mc_ki[mn.mc_top] = 0;
rc = mdbx_update_key(&mn, &key);
if (unlikely(rc))
return rc;
}
mdbx_debug("moving %s-node %u [%s] on page %" PRIaPGNO
" to node %u on page %" PRIaPGNO,
"branch", csrc->mc_ki[csrc->mc_top], DKEY(&key4move),
psrc->mp_pgno, cdst->mc_ki[cdst->mc_top], pdst->mp_pgno);
/* Add the node to the destination page. */
rc =
mdbx_node_add_branch(cdst, cdst->mc_ki[cdst->mc_top], &key4move, srcpg);
} break;
case P_LEAF: {
const MDBX_node *srcnode = NODEPTR(psrc, csrc->mc_ki[csrc->mc_top]);
MDBX_val data;
data.iov_len = NODEDSZ(srcnode);
data.iov_base = NODEDATA(srcnode);
}
mn.mc_xcursor = NULL;
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top]) && cdst->mc_ki[cdst->mc_top] == 0) {
unsigned snum = cdst->mc_snum;
MDBX_node *s2;
MDBX_val bkey;
/* must find the lowest key below dst */
mdbx_cursor_copy(cdst, &mn);
rc = mdbx_page_search_lowest(&mn);
if (unlikely(rc))
return rc;
if (IS_LEAF2(mn.mc_pg[mn.mc_top])) {
bkey.iov_len = mn.mc_db->md_xsize;
bkey.iov_base = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, bkey.iov_len);
} else {
s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0);
bkey.iov_len = NODEKSZ(s2);
bkey.iov_base = NODEKEY(s2);
}
mdbx_cassert(csrc, snum >= 1 && snum <= UINT16_MAX);
mn.mc_snum = (uint16_t)snum--;
mn.mc_top = (uint16_t)snum;
mn.mc_ki[snum] = 0;
rc = mdbx_update_key(&mn, &bkey);
if (unlikely(rc))
return rc;
key4move.iov_len = NODEKSZ(srcnode);
key4move.iov_base = NODEKEY(srcnode);
mdbx_debug("moving %s-node %u [%s] on page %" PRIaPGNO
" to node %u on page %" PRIaPGNO,
"leaf", csrc->mc_ki[csrc->mc_top], DKEY(&key4move),
psrc->mp_pgno, cdst->mc_ki[cdst->mc_top], pdst->mp_pgno);
/* Add the node to the destination page. */
rc = mdbx_node_add_leaf(cdst, cdst->mc_ki[cdst->mc_top], &key4move, &data,
srcnode->mn_flags);
} break;
case P_LEAF | P_LEAF2: {
key4move.iov_len = csrc->mc_db->md_xsize;
key4move.iov_base =
LEAF2KEY(psrc, csrc->mc_ki[csrc->mc_top], key4move.iov_len);
mdbx_debug("moving %s-node %u [%s] on page %" PRIaPGNO
" to node %u on page %" PRIaPGNO,
"leaf2", csrc->mc_ki[csrc->mc_top], DKEY(&key4move),
psrc->mp_pgno, cdst->mc_ki[cdst->mc_top], pdst->mp_pgno);
/* Add the node to the destination page. */
rc = mdbx_node_add_leaf2(cdst, cdst->mc_ki[cdst->mc_top], &key4move);
} break;
default:
goto bailout;
}
mdbx_debug("moving %s node %u [%s] on page %" PRIaPGNO
" to node %u on page %" PRIaPGNO "",
IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch",
csrc->mc_ki[csrc->mc_top], DKEY(&key),
csrc->mc_pg[csrc->mc_top]->mp_pgno, cdst->mc_ki[cdst->mc_top],
cdst->mc_pg[cdst->mc_top]->mp_pgno);
/* Add the node to the destination page. */
rc = mdbx_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key,
IS_LEAF(cdst->mc_pg[cdst->mc_top]) ? &data : NULL, srcpg,
flags);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
/* Delete the node from the source page. */
mdbx_node_del(csrc, key.iov_len);
mdbx_node_del(csrc, key4move.iov_len);
mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]);
mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]);
{
/* Adjust other cursors pointing to mp */
MDBX_cursor *m2, *m3;
MDBX_dbi dbi = csrc->mc_dbi;
MDBX_page *mpd, *mps;
mps = csrc->mc_pg[csrc->mc_top];
/* If we're adding on the left, bump others up */
const MDBX_dbi dbi = csrc->mc_dbi;
if (fromleft) {
mpd = cdst->mc_pg[csrc->mc_top];
/* If we're adding on the left, bump others up */
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
if (csrc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
m3 = (csrc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (!(m3->mc_flags & C_INITIALIZED) || m3->mc_top < csrc->mc_top)
continue;
if (m3 != cdst && m3->mc_pg[csrc->mc_top] == mpd &&
if (m3 != cdst && m3->mc_pg[csrc->mc_top] == pdst &&
m3->mc_ki[csrc->mc_top] >= cdst->mc_ki[csrc->mc_top]) {
m3->mc_ki[csrc->mc_top]++;
}
if (m3 != csrc && m3->mc_pg[csrc->mc_top] == mps &&
if (m3 != csrc && m3->mc_pg[csrc->mc_top] == psrc &&
m3->mc_ki[csrc->mc_top] == csrc->mc_ki[csrc->mc_top]) {
m3->mc_pg[csrc->mc_top] = cdst->mc_pg[cdst->mc_top];
m3->mc_pg[csrc->mc_top] = pdst;
m3->mc_ki[csrc->mc_top] = cdst->mc_ki[cdst->mc_top];
mdbx_cassert(csrc, csrc->mc_top > 0);
m3->mc_ki[csrc->mc_top - 1]++;
}
if (XCURSOR_INITED(m3) && IS_LEAF(mps))
if (XCURSOR_INITED(m3) && IS_LEAF(psrc))
XCURSOR_REFRESH(m3, m3->mc_pg[csrc->mc_top], m3->mc_ki[csrc->mc_top]);
}
} else
/* Adding on the right, bump others down */
{
} else {
/* Adding on the right, bump others down */
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
if (csrc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
m3 = (csrc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (m3 == csrc)
continue;
if (!(m3->mc_flags & C_INITIALIZED) || m3->mc_top < csrc->mc_top)
continue;
if (m3->mc_pg[csrc->mc_top] == mps) {
if (m3->mc_pg[csrc->mc_top] == psrc) {
if (!m3->mc_ki[csrc->mc_top]) {
m3->mc_pg[csrc->mc_top] = cdst->mc_pg[cdst->mc_top];
m3->mc_pg[csrc->mc_top] = pdst;
m3->mc_ki[csrc->mc_top] = cdst->mc_ki[cdst->mc_top];
mdbx_cassert(csrc, csrc->mc_top > 0);
m3->mc_ki[csrc->mc_top - 1]--;
} else {
m3->mc_ki[csrc->mc_top]--;
}
if (XCURSOR_INITED(m3) && IS_LEAF(mps))
if (XCURSOR_INITED(m3) && IS_LEAF(psrc))
XCURSOR_REFRESH(m3, m3->mc_pg[csrc->mc_top],
m3->mc_ki[csrc->mc_top]);
}
@ -9476,17 +9695,23 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) {
/* Update the parent separators. */
if (csrc->mc_ki[csrc->mc_top] == 0) {
mdbx_cassert(csrc, csrc->mc_top > 0);
if (csrc->mc_ki[csrc->mc_top - 1] != 0) {
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.iov_base = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.iov_len);
MDBX_val key;
if (IS_LEAF2(psrc)) {
key.iov_len = psrc->mp_leaf2_ksize;
key.iov_base = LEAF2KEY(psrc, 0, key.iov_len);
} else {
srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
MDBX_node *srcnode = NODEPTR(psrc, 0);
key.iov_len = NODEKSZ(srcnode);
key.iov_base = NODEKEY(srcnode);
}
mdbx_debug("update separator for source page %" PRIaPGNO " to [%s]",
csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key));
psrc->mp_pgno, DKEY(&key));
MDBX_cursor mn;
mdbx_cursor_copy(csrc, &mn);
mn.mc_xcursor = NULL;
mdbx_cassert(csrc, mn.mc_snum > 0);
mn.mc_snum--;
mn.mc_top--;
/* We want mdbx_rebalance to find mn when doing fixups */
@ -9494,10 +9719,9 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) {
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}
if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
MDBX_val nullkey;
indx_t ix = csrc->mc_ki[csrc->mc_top];
nullkey.iov_len = 0;
if (IS_BRANCH(psrc)) {
const MDBX_val nullkey = {0, 0};
const indx_t ix = csrc->mc_ki[csrc->mc_top];
csrc->mc_ki[csrc->mc_top] = 0;
rc = mdbx_update_key(csrc, &nullkey);
csrc->mc_ki[csrc->mc_top] = ix;
@ -9506,17 +9730,23 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) {
}
if (cdst->mc_ki[cdst->mc_top] == 0) {
mdbx_cassert(cdst, cdst->mc_top > 0);
if (cdst->mc_ki[cdst->mc_top - 1] != 0) {
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.iov_base = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, key.iov_len);
MDBX_val key;
if (IS_LEAF2(psrc)) {
key.iov_len = pdst->mp_leaf2_ksize;
key.iov_base = LEAF2KEY(pdst, 0, key.iov_len);
} else {
srcnode = NODEPTR(cdst->mc_pg[cdst->mc_top], 0);
MDBX_node *srcnode = NODEPTR(pdst, 0);
key.iov_len = NODEKSZ(srcnode);
key.iov_base = NODEKEY(srcnode);
}
mdbx_debug("update separator for destination page %" PRIaPGNO " to [%s]",
cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key));
pdst->mp_pgno, DKEY(&key));
MDBX_cursor mn;
mdbx_cursor_copy(cdst, &mn);
mn.mc_xcursor = NULL;
mdbx_cassert(cdst, mn.mc_snum > 0);
mn.mc_snum--;
mn.mc_top--;
/* We want mdbx_rebalance to find mn when doing fixups */
@ -9524,10 +9754,9 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) {
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
MDBX_val nullkey;
indx_t ix = cdst->mc_ki[cdst->mc_top];
nullkey.iov_len = 0;
if (IS_BRANCH(pdst)) {
const MDBX_val nullkey = {0, 0};
const indx_t ix = cdst->mc_ki[cdst->mc_top];
cdst->mc_ki[cdst->mc_top] = 0;
rc = mdbx_update_key(cdst, &nullkey);
cdst->mc_ki[cdst->mc_top] = ix;
@ -9548,47 +9777,41 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, int fromleft) {
*
* Returns 0 on success, non-zero on failure. */
static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
MDBX_page *psrc, *pdst;
MDBX_node *srcnode;
MDBX_val key, data;
unsigned nkeys;
MDBX_val key;
int rc;
unsigned i, j;
psrc = csrc->mc_pg[csrc->mc_top];
pdst = cdst->mc_pg[cdst->mc_top];
mdbx_debug("merging page %" PRIaPGNO " into %" PRIaPGNO "", psrc->mp_pgno,
pdst->mp_pgno);
mdbx_cassert(csrc, IS_LEAF(psrc) == IS_LEAF(pdst));
mdbx_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */
mdbx_cassert(csrc, cdst->mc_snum > 1);
/* Mark dst as dirty. */
if (unlikely(rc = mdbx_page_touch(cdst)))
return rc;
/* get dst page again now that we've touched it. */
pdst = cdst->mc_pg[cdst->mc_top];
MDBX_page *const psrc = csrc->mc_pg[csrc->mc_top];
MDBX_page *const pdst = cdst->mc_pg[cdst->mc_top];
mdbx_debug("merging page %" PRIaPGNO " into %" PRIaPGNO "", psrc->mp_pgno,
pdst->mp_pgno);
mdbx_cassert(csrc, PAGETYPE(psrc) == PAGETYPE(pdst));
mdbx_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */
mdbx_cassert(cdst, cdst->mc_snum > 1);
/* Move all nodes from src to dst */
j = nkeys = NUMKEYS(pdst);
const unsigned nkeys = NUMKEYS(pdst);
unsigned j = nkeys;
if (IS_LEAF2(psrc)) {
key.iov_len = csrc->mc_db->md_xsize;
key.iov_base = PAGEDATA(psrc);
for (i = 0; i < NUMKEYS(psrc); i++, j++) {
rc = mdbx_node_add(cdst, j, &key, NULL, 0, 0);
for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) {
rc = mdbx_node_add_leaf2(cdst, j, &key);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
key.iov_base = (char *)key.iov_base + key.iov_len;
}
} else {
for (i = 0; i < NUMKEYS(psrc); i++, j++) {
for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) {
srcnode = NODEPTR(psrc, i);
if (i == 0 && IS_BRANCH(psrc)) {
MDBX_cursor mn;
MDBX_node *s2;
mdbx_cursor_copy(csrc, &mn);
mn.mc_xcursor = NULL;
/* must find the lowest key below src */
@ -9599,7 +9822,7 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
key.iov_len = mn.mc_db->md_xsize;
key.iov_base = LEAF2KEY(mn.mc_pg[mn.mc_top], 0, key.iov_len);
} else {
s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0);
MDBX_node *s2 = NODEPTR(mn.mc_pg[mn.mc_top], 0);
key.iov_len = NODEKSZ(s2);
key.iov_base = NODEKEY(s2);
}
@ -9608,14 +9831,15 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
key.iov_base = NODEKEY(srcnode);
}
MDBX_val *pdata = NULL;
if (IS_LEAF(psrc)) {
MDBX_val data;
data.iov_len = NODEDSZ(srcnode);
data.iov_base = NODEDATA(srcnode);
pdata = &data;
rc = mdbx_node_add_leaf(cdst, j, &key, &data, srcnode->mn_flags);
} else {
mdbx_cassert(csrc, srcnode->mn_flags == 0);
rc = mdbx_node_add_branch(cdst, j, &key, NODEPGNO(srcnode));
}
rc = mdbx_node_add(cdst, j, &key, pdata, NODEPGNO(srcnode),
srcnode->mn_flags);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}
@ -9625,12 +9849,15 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
pdst->mp_pgno, NUMKEYS(pdst),
(float)PAGEFILL(cdst->mc_txn->mt_env, pdst) / 10);
mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]);
mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]);
/* Unlink the src page from parent and add to free list. */
csrc->mc_top--;
mdbx_node_del(csrc, 0);
if (csrc->mc_ki[csrc->mc_top] == 0) {
key.iov_len = 0;
rc = mdbx_update_key(csrc, &key);
const MDBX_val nullkey = {0, 0};
rc = mdbx_update_key(csrc, &nullkey);
if (unlikely(rc)) {
csrc->mc_top++;
return rc;
@ -9638,7 +9865,6 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
}
csrc->mc_top++;
psrc = csrc->mc_pg[csrc->mc_top];
/* If not operating on FreeDB, allow this page to be reused
* in this txn. Otherwise just add to free list. */
rc = mdbx_page_loose(csrc, psrc);
@ -9856,8 +10082,8 @@ static int mdbx_rebalance(MDBX_cursor *mc) {
rc = mdbx_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL);
if (unlikely(rc))
return rc;
mdbx_cassert(mc, IS_LEAF(mn.mc_pg[mn.mc_top]) ==
IS_LEAF(mc->mc_pg[mc->mc_top]));
mdbx_cassert(mc, PAGETYPE(mn.mc_pg[mn.mc_top]) ==
PAGETYPE(mc->mc_pg[mc->mc_top]));
mn.mc_ki[mn.mc_top] = 0;
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]);
fromleft = 0;
@ -9869,8 +10095,8 @@ static int mdbx_rebalance(MDBX_cursor *mc) {
rc = mdbx_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL);
if (unlikely(rc))
return rc;
mdbx_cassert(mc, IS_LEAF(mn.mc_pg[mn.mc_top]) ==
IS_LEAF(mc->mc_pg[mc->mc_top]));
mdbx_cassert(mc, PAGETYPE(mn.mc_pg[mn.mc_top]) ==
PAGETYPE(mc->mc_pg[mc->mc_top]));
mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1;
mc->mc_ki[mc->mc_top] = 0;
fromleft = 1;
@ -10127,7 +10353,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
new_root = mc->mc_db->md_depth++;
/* Add left (implicit) pointer. */
if (unlikely((rc = mdbx_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0)) !=
if (unlikely((rc = mdbx_node_add_branch(mc, 0, NULL, mp->mp_pgno)) !=
MDBX_SUCCESS)) {
/* undo the pre-push */
mc->mc_pg[0] = mc->mc_pg[1];
@ -10333,7 +10559,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
}
} else {
mn.mc_top--;
rc = mdbx_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
rc = mdbx_node_add_branch(&mn, mn.mc_ki[ptop], &sepkey, rp->mp_pgno);
mn.mc_top++;
}
if (unlikely(rc != MDBX_SUCCESS)) {
@ -10346,7 +10572,23 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
if (nflags & MDBX_APPEND) {
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[mc->mc_top] = 0;
rc = mdbx_node_add(mc, 0, newkey, newdata, newpgno, nflags);
switch (PAGETYPE(rp)) {
case P_BRANCH: {
mdbx_cassert(mc, nflags == 0);
rc = mdbx_node_add_branch(mc, 0, newkey, newpgno);
} break;
case P_LEAF: {
mdbx_cassert(mc, newpgno == 0);
rc = mdbx_node_add_leaf(mc, 0, newkey, newdata, nflags);
} break;
case P_LEAF | P_LEAF2: {
mdbx_cassert(mc, nflags == 0);
mdbx_cassert(mc, newpgno == 0);
rc = mdbx_node_add_leaf2(mc, 0, newkey);
} break;
default:
rc = MDBX_CORRUPTED;
}
if (rc)
goto done;
for (i = 0; i < mc->mc_top; i++)
@ -10381,14 +10623,30 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
flags = node->mn_flags;
}
if (!IS_LEAF(mp) && n == 0) {
/* First branch index doesn't need key data. */
rkey.iov_len = 0;
switch (PAGETYPE(rp)) {
case P_BRANCH: {
mdbx_cassert(mc, flags == 0);
if (n == 0) {
/* First branch index doesn't need key data. */
rkey.iov_len = 0;
}
rc = mdbx_node_add_branch(mc, n, &rkey, pgno);
} break;
case P_LEAF: {
mdbx_cassert(mc, pgno == 0);
rc = mdbx_node_add_leaf(mc, n, &rkey, rdata, flags);
} break;
/* case P_LEAF | P_LEAF2: {
mdbx_cassert(mc, flags == 0);
mdbx_cassert(mc, gno == 0);
rc = mdbx_node_add_leaf2(mc, n, &rkey);
} break; */
default:
rc = MDBX_CORRUPTED;
}
rc = mdbx_node_add(mc, n, &rkey, rdata, pgno, flags);
if (rc)
goto done;
if (i == nkeys) {
i = 0;
n = 0;