mdbx: minor refine page_split().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-07-23 12:14:01 +03:00
parent cc51a7f76e
commit c4dd83fbdf

View File

@ -3913,10 +3913,10 @@ static int __must_check_result mdbx_page_merge(MDBX_cursor *csrc,
MDBX_cursor *cdst);
#define MDBX_SPLIT_REPLACE MDBX_APPENDDUP /* newkey is not new */
static int __must_check_result mdbx_page_split(MDBX_cursor *mc,
const MDBX_val *const newkey,
MDBX_val *const newdata,
pgno_t newpgno, unsigned nflags);
static int __must_check_result page_split(MDBX_cursor *mc,
const MDBX_val *const newkey,
MDBX_val *const newdata,
pgno_t newpgno, const unsigned naf);
static bool meta_checktxnid(const MDBX_env *env, const volatile MDBX_meta *meta,
bool report);
@ -16299,23 +16299,22 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
rdata = data;
new_sub:;
unsigned nflags = flags & NODE_ADD_FLAGS;
const unsigned naf = flags & NODE_ADD_FLAGS;
size_t nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->iov_len
: leaf_size(env, key, rdata);
if (page_room(mc->mc_pg[mc->mc_top]) < nsize) {
if (!insert_key)
nflags |= MDBX_SPLIT_REPLACE;
rc = mdbx_page_split(mc, key, rdata, P_INVALID, nflags);
rc = page_split(mc, key, rdata, P_INVALID,
insert_key ? naf : naf | MDBX_SPLIT_REPLACE);
if (rc == MDBX_SUCCESS && mdbx_audit_enabled())
rc = insert_key ? mdbx_cursor_check(mc) : mdbx_cursor_check_updating(mc);
} else {
/* There is room already in this leaf page. */
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
mdbx_cassert(mc, (nflags & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) == 0 &&
mdbx_cassert(mc, !(naf & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) &&
rdata->iov_len == 0);
rc = mdbx_node_add_leaf2(mc, mc->mc_ki[mc->mc_top], key);
} else
rc = mdbx_node_add_leaf(mc, mc->mc_ki[mc->mc_top], key, rdata, nflags);
rc = mdbx_node_add_leaf(mc, mc->mc_ki[mc->mc_top], key, rdata, naf);
if (likely(rc == 0)) {
/* Adjust other cursors pointing to mp */
const MDBX_dbi dbi = mc->mc_dbi;
@ -17434,10 +17433,10 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) {
mdbx_debug("Not enough room, delta = %zd, splitting...", delta);
pgno_t pgno = node_pgno(node);
mdbx_node_del(mc, 0);
int rc = mdbx_page_split(mc, key, NULL, pgno, MDBX_SPLIT_REPLACE);
if (rc == MDBX_SUCCESS && mdbx_audit_enabled())
rc = mdbx_cursor_check_updating(mc);
return rc;
int err = page_split(mc, key, NULL, pgno, MDBX_SPLIT_REPLACE);
if (err == MDBX_SUCCESS && mdbx_audit_enabled())
err = mdbx_cursor_check_updating(mc);
return err;
}
nkeys = page_numkeys(mp);
@ -18950,11 +18949,11 @@ static int mdbx_del0(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
* [in] newkey The key for the newly inserted node.
* [in] newdata The data for the newly inserted node.
* [in] newpgno The page number, if the new node is a branch node.
* [in] nflags The NODE_ADD_FLAGS for the new node.
* [in] naf The NODE_ADD_FLAGS for the new node.
* Returns 0 on success, non-zero on failure. */
static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
MDBX_val *const newdata, pgno_t newpgno,
unsigned nflags) {
static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
MDBX_val *const newdata, pgno_t newpgno,
const unsigned naf) {
unsigned flags;
int rc = MDBX_SUCCESS, foliage = 0;
unsigned i, ptop;
@ -19046,7 +19045,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
unsigned split_indx =
(newindx < nkeys)
? /* split at the middle */ (nkeys + 1) / 2
? /* split at the middle */ (nkeys + 1) >> 1
: /* split at the end (i.e. like append-mode ) */ nkeys - minkeys + 1;
mdbx_assert(env, split_indx >= minkeys && split_indx <= nkeys - minkeys + 1);
@ -19054,7 +19053,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
/* It is reasonable and possible to split the page at the begin */
if (unlikely(newindx < minkeys)) {
split_indx = minkeys;
if (newindx == 0 && foliage == 0 && !(nflags & MDBX_SPLIT_REPLACE)) {
if (newindx == 0 && foliage == 0 && !(naf & MDBX_SPLIT_REPLACE)) {
split_indx = 0;
/* Checking for ability of splitting by the left-side insertion
* of a pure page with the new key */
@ -19098,7 +19097,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
char *split, *ins;
unsigned lsize, rsize, ksize;
/* Move half of the keys to the right sibling */
const int x = mc->mc_ki[mc->mc_top] - split_indx;
const int distance = mc->mc_ki[mc->mc_top] - split_indx;
ksize = mc->mc_db->md_xsize;
split = page_leaf2key(mp, split_indx, ksize);
rsize = (nkeys - split_indx) * ksize;
@ -19113,7 +19112,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
sister->mp_upper -= (indx_t)(rsize - lsize);
sepkey.iov_len = ksize;
sepkey.iov_base = (newindx != split_indx) ? split : newkey->iov_base;
if (x < 0) {
if (distance < 0) {
mdbx_cassert(mc, ksize >= sizeof(indx_t));
ins = page_leaf2key(mp, mc->mc_ki[mc->mc_top], ksize);
memcpy(sister->mp_ptrs, split, rsize);
@ -19125,16 +19124,16 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
mdbx_cassert(mc, mp->mp_upper >= ksize - sizeof(indx_t));
mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t));
} else {
memcpy(sister->mp_ptrs, split, x * ksize);
ins = page_leaf2key(sister, x, ksize);
memcpy(sister->mp_ptrs, split, distance * ksize);
ins = page_leaf2key(sister, distance, ksize);
memcpy(ins, newkey->iov_base, ksize);
memcpy(ins + ksize, split + x * ksize, rsize - x * ksize);
memcpy(ins + ksize, split + distance * ksize, rsize - distance * ksize);
mdbx_cassert(mc, UINT16_MAX - sister->mp_lower >= (int)sizeof(indx_t));
sister->mp_lower += sizeof(indx_t);
mdbx_cassert(mc, sister->mp_upper >= ksize - sizeof(indx_t));
sister->mp_upper -= (indx_t)(ksize - sizeof(indx_t));
mdbx_cassert(mc, x <= (int)UINT16_MAX);
mc->mc_ki[mc->mc_top] = (indx_t)x;
mdbx_cassert(mc, distance <= (int)UINT16_MAX);
mc->mc_ki[mc->mc_top] = (indx_t)distance;
}
if (mdbx_audit_enabled()) {
@ -19158,11 +19157,11 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
: branch_size(env, newkey);
/* prepare to insert */
for (unsigned j = i = 0; i < nkeys; ++i, ++j) {
tmp_ki_copy->mp_ptrs[j] = 0;
j += (i == newindx);
tmp_ki_copy->mp_ptrs[j] = mp->mp_ptrs[i];
}
for (i = 0; i < newindx; ++i)
tmp_ki_copy->mp_ptrs[i] = mp->mp_ptrs[i];
tmp_ki_copy->mp_ptrs[i] = (indx_t)-1;
while (++i <= nkeys)
tmp_ki_copy->mp_ptrs[i] = mp->mp_ptrs[i - 1];
tmp_ki_copy->mp_pgno = mp->mp_pgno;
tmp_ki_copy->mp_flags = mp->mp_flags;
tmp_ki_copy->mp_txnid = INVALID_TXNID;
@ -19184,20 +19183,22 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
* будет в каждом ключе, в худшем случае кроме одного, который может быть
* нулевого размера. */
if (newindx == split_indx && split_indx + minkeys <= nkeys)
split_indx += 1;
if (newindx == split_indx && nkeys >= 5) {
STATIC_ASSERT(P_BRANCH == 1);
split_indx += mp->mp_flags & P_BRANCH;
}
mdbx_assert(env,
split_indx >= minkeys && split_indx <= nkeys - minkeys + 1);
split_indx >= minkeys && split_indx <= nkeys + 1 - minkeys);
const unsigned dim_nodes =
(newindx >= split_indx) ? split_indx : nkeys - split_indx;
const unsigned dim_used = (sizeof(indx_t) + NODESIZE + 1) * dim_nodes;
if (new_size >= dim_used) {
/* Find split point */
/* Search for best acceptable split point */
i = (newindx < split_indx) ? 0 : nkeys;
int dir = (newindx < split_indx) ? 1 : -1;
size_t before = 0, after = new_size + page_used(env, mp);
unsigned best_split = split_indx;
unsigned best_offset = INT_MAX;
unsigned best_shift = INT_MAX;
mdbx_trace("seek separator from %u, step %i, default %u, new-idx %u, "
"new-size %zu",
@ -19223,11 +19224,13 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
if (before <= max_space && after <= max_space) {
const unsigned split = i + (dir > 0);
if (split >= minkeys && split <= nkeys + 1 - minkeys) {
const unsigned offset = branchless_abs(split_indx - split);
if (offset >= best_offset)
const unsigned shift = branchless_abs(split_indx - split);
if (shift >= best_shift)
break;
best_offset = offset;
best_shift = shift;
best_split = split;
if (!best_shift)
break;
}
}
i += dir;
@ -19237,10 +19240,9 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
mdbx_trace("chosen %u", split_indx);
}
mdbx_assert(env,
split_indx >= minkeys && split_indx <= nkeys - minkeys + 1);
split_indx >= minkeys && split_indx <= nkeys + 1 - minkeys);
sepkey.iov_len = newkey->iov_len;
sepkey.iov_base = newkey->iov_base;
sepkey = *newkey;
if (split_indx != newindx) {
MDBX_node *node =
(MDBX_node *)((char *)mp + tmp_ki_copy->mp_ptrs[split_indx] +
@ -19265,7 +19267,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
did_split_parent = true;
/* We want other splits to find mn when doing fixups */
WITH_CURSOR_TRACKING(
mn, rc = mdbx_page_split(&mn, &sepkey, NULL, sister->mp_pgno, 0));
mn, rc = page_split(&mn, &sepkey, NULL, sister->mp_pgno, 0));
if (unlikely(rc != MDBX_SUCCESS))
goto done;
mdbx_cassert(mc, (int)mc->mc_snum - snum == mc->mc_db->md_depth - depth);
@ -19349,10 +19351,10 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
switch (PAGETYPE_WHOLE(sister)) {
case P_LEAF: {
mdbx_cassert(mc, newpgno == 0 || newpgno == P_INVALID);
rc = mdbx_node_add_leaf(mc, 0, newkey, newdata, nflags);
rc = mdbx_node_add_leaf(mc, 0, newkey, newdata, naf);
} break;
case P_LEAF | P_LEAF2: {
mdbx_cassert(mc, (nflags & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) == 0);
mdbx_cassert(mc, (naf & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) == 0);
mdbx_cassert(mc, newpgno == 0 || newpgno == P_INVALID);
rc = mdbx_node_add_leaf2(mc, 0, newkey);
} break;
@ -19389,19 +19391,18 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
mc->mc_pg[mc->mc_top] = sister;
i = split_indx;
unsigned n = 0;
pgno_t pgno = 0;
do {
mdbx_trace("i %u, nkeys %u => n %u, rp #%u", i, nkeys, n,
sister->mp_pgno);
pgno_t pgno = 0;
MDBX_val *rdata = NULL;
if (i == newindx) {
rkey.iov_base = newkey->iov_base;
rkey.iov_len = newkey->iov_len;
rkey = *newkey;
if (IS_LEAF(mp))
rdata = newdata;
else
pgno = newpgno;
flags = nflags;
flags = naf;
/* Update index for the new key. */
mc->mc_ki[mc->mc_top] = (indx_t)n;
} else {
@ -19506,14 +19507,14 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey,
m3->mc_ki[k + 1] = m3->mc_ki[k];
m3->mc_pg[k + 1] = m3->mc_pg[k];
}
m3->mc_ki[0] = (m3->mc_ki[0] >= nkeys) ? 1 : 0;
m3->mc_ki[0] = m3->mc_ki[0] >= nkeys;
m3->mc_pg[0] = mc->mc_pg[0];
m3->mc_snum++;
m3->mc_top++;
}
if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp && !pure_left) {
if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDBX_SPLIT_REPLACE))
if (m3->mc_ki[mc->mc_top] >= newindx && !(naf & MDBX_SPLIT_REPLACE))
m3->mc_ki[mc->mc_top]++;
if (m3->mc_ki[mc->mc_top] >= nkeys) {
m3->mc_pg[mc->mc_top] = sister;
@ -19544,7 +19545,7 @@ done:
else {
if (mdbx_audit_enabled())
rc = mdbx_cursor_check_updating(mc);
if (unlikely(nflags & MDBX_RESERVE)) {
if (unlikely(naf & MDBX_RESERVE)) {
MDBX_node *node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (!(node_flags(node) & F_BIGDATA))
newdata->iov_base = node_data(node);