diff --git a/src/core.c b/src/core.c index 30123880..28c6271c 100644 --- a/src/core.c +++ b/src/core.c @@ -3913,10 +3913,10 @@ static int __must_check_result mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst); #define MDBX_SPLIT_REPLACE MDBX_APPENDDUP /* newkey is not new */ -static int __must_check_result mdbx_page_split(MDBX_cursor *mc, - const MDBX_val *const newkey, - MDBX_val *const newdata, - pgno_t newpgno, unsigned nflags); +static int __must_check_result page_split(MDBX_cursor *mc, + const MDBX_val *const newkey, + MDBX_val *const newdata, + pgno_t newpgno, const unsigned naf); static bool meta_checktxnid(const MDBX_env *env, const volatile MDBX_meta *meta, bool report); @@ -16299,23 +16299,22 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data, rdata = data; new_sub:; - unsigned nflags = flags & NODE_ADD_FLAGS; + const unsigned naf = flags & NODE_ADD_FLAGS; size_t nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->iov_len : leaf_size(env, key, rdata); if (page_room(mc->mc_pg[mc->mc_top]) < nsize) { - if (!insert_key) - nflags |= MDBX_SPLIT_REPLACE; - rc = mdbx_page_split(mc, key, rdata, P_INVALID, nflags); + rc = page_split(mc, key, rdata, P_INVALID, + insert_key ? naf : naf | MDBX_SPLIT_REPLACE); if (rc == MDBX_SUCCESS && mdbx_audit_enabled()) rc = insert_key ? mdbx_cursor_check(mc) : mdbx_cursor_check_updating(mc); } else { /* There is room already in this leaf page. */ if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { - mdbx_cassert(mc, (nflags & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) == 0 && + mdbx_cassert(mc, !(naf & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) && rdata->iov_len == 0); rc = mdbx_node_add_leaf2(mc, mc->mc_ki[mc->mc_top], key); } else - rc = mdbx_node_add_leaf(mc, mc->mc_ki[mc->mc_top], key, rdata, nflags); + rc = mdbx_node_add_leaf(mc, mc->mc_ki[mc->mc_top], key, rdata, naf); if (likely(rc == 0)) { /* Adjust other cursors pointing to mp */ const MDBX_dbi dbi = mc->mc_dbi; @@ -17434,10 +17433,10 @@ static int mdbx_update_key(MDBX_cursor *mc, const MDBX_val *key) { mdbx_debug("Not enough room, delta = %zd, splitting...", delta); pgno_t pgno = node_pgno(node); mdbx_node_del(mc, 0); - int rc = mdbx_page_split(mc, key, NULL, pgno, MDBX_SPLIT_REPLACE); - if (rc == MDBX_SUCCESS && mdbx_audit_enabled()) - rc = mdbx_cursor_check_updating(mc); - return rc; + int err = page_split(mc, key, NULL, pgno, MDBX_SPLIT_REPLACE); + if (err == MDBX_SUCCESS && mdbx_audit_enabled()) + err = mdbx_cursor_check_updating(mc); + return err; } nkeys = page_numkeys(mp); @@ -18950,11 +18949,11 @@ static int mdbx_del0(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, * [in] newkey The key for the newly inserted node. * [in] newdata The data for the newly inserted node. * [in] newpgno The page number, if the new node is a branch node. - * [in] nflags The NODE_ADD_FLAGS for the new node. + * [in] naf The NODE_ADD_FLAGS for the new node. * Returns 0 on success, non-zero on failure. */ -static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, - MDBX_val *const newdata, pgno_t newpgno, - unsigned nflags) { +static int page_split(MDBX_cursor *mc, const MDBX_val *const newkey, + MDBX_val *const newdata, pgno_t newpgno, + const unsigned naf) { unsigned flags; int rc = MDBX_SUCCESS, foliage = 0; unsigned i, ptop; @@ -19046,7 +19045,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, unsigned split_indx = (newindx < nkeys) - ? /* split at the middle */ (nkeys + 1) / 2 + ? /* split at the middle */ (nkeys + 1) >> 1 : /* split at the end (i.e. like append-mode ) */ nkeys - minkeys + 1; mdbx_assert(env, split_indx >= minkeys && split_indx <= nkeys - minkeys + 1); @@ -19054,7 +19053,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, /* It is reasonable and possible to split the page at the begin */ if (unlikely(newindx < minkeys)) { split_indx = minkeys; - if (newindx == 0 && foliage == 0 && !(nflags & MDBX_SPLIT_REPLACE)) { + if (newindx == 0 && foliage == 0 && !(naf & MDBX_SPLIT_REPLACE)) { split_indx = 0; /* Checking for ability of splitting by the left-side insertion * of a pure page with the new key */ @@ -19098,7 +19097,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, char *split, *ins; unsigned lsize, rsize, ksize; /* Move half of the keys to the right sibling */ - const int x = mc->mc_ki[mc->mc_top] - split_indx; + const int distance = mc->mc_ki[mc->mc_top] - split_indx; ksize = mc->mc_db->md_xsize; split = page_leaf2key(mp, split_indx, ksize); rsize = (nkeys - split_indx) * ksize; @@ -19113,7 +19112,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, sister->mp_upper -= (indx_t)(rsize - lsize); sepkey.iov_len = ksize; sepkey.iov_base = (newindx != split_indx) ? split : newkey->iov_base; - if (x < 0) { + if (distance < 0) { mdbx_cassert(mc, ksize >= sizeof(indx_t)); ins = page_leaf2key(mp, mc->mc_ki[mc->mc_top], ksize); memcpy(sister->mp_ptrs, split, rsize); @@ -19125,16 +19124,16 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, mdbx_cassert(mc, mp->mp_upper >= ksize - sizeof(indx_t)); mp->mp_upper -= (indx_t)(ksize - sizeof(indx_t)); } else { - memcpy(sister->mp_ptrs, split, x * ksize); - ins = page_leaf2key(sister, x, ksize); + memcpy(sister->mp_ptrs, split, distance * ksize); + ins = page_leaf2key(sister, distance, ksize); memcpy(ins, newkey->iov_base, ksize); - memcpy(ins + ksize, split + x * ksize, rsize - x * ksize); + memcpy(ins + ksize, split + distance * ksize, rsize - distance * ksize); mdbx_cassert(mc, UINT16_MAX - sister->mp_lower >= (int)sizeof(indx_t)); sister->mp_lower += sizeof(indx_t); mdbx_cassert(mc, sister->mp_upper >= ksize - sizeof(indx_t)); sister->mp_upper -= (indx_t)(ksize - sizeof(indx_t)); - mdbx_cassert(mc, x <= (int)UINT16_MAX); - mc->mc_ki[mc->mc_top] = (indx_t)x; + mdbx_cassert(mc, distance <= (int)UINT16_MAX); + mc->mc_ki[mc->mc_top] = (indx_t)distance; } if (mdbx_audit_enabled()) { @@ -19158,11 +19157,11 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, : branch_size(env, newkey); /* prepare to insert */ - for (unsigned j = i = 0; i < nkeys; ++i, ++j) { - tmp_ki_copy->mp_ptrs[j] = 0; - j += (i == newindx); - tmp_ki_copy->mp_ptrs[j] = mp->mp_ptrs[i]; - } + for (i = 0; i < newindx; ++i) + tmp_ki_copy->mp_ptrs[i] = mp->mp_ptrs[i]; + tmp_ki_copy->mp_ptrs[i] = (indx_t)-1; + while (++i <= nkeys) + tmp_ki_copy->mp_ptrs[i] = mp->mp_ptrs[i - 1]; tmp_ki_copy->mp_pgno = mp->mp_pgno; tmp_ki_copy->mp_flags = mp->mp_flags; tmp_ki_copy->mp_txnid = INVALID_TXNID; @@ -19184,20 +19183,22 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, * будет в каждом ключе, в худшем случае кроме одного, который может быть * нулевого размера. */ - if (newindx == split_indx && split_indx + minkeys <= nkeys) - split_indx += 1; + if (newindx == split_indx && nkeys >= 5) { + STATIC_ASSERT(P_BRANCH == 1); + split_indx += mp->mp_flags & P_BRANCH; + } mdbx_assert(env, - split_indx >= minkeys && split_indx <= nkeys - minkeys + 1); + split_indx >= minkeys && split_indx <= nkeys + 1 - minkeys); const unsigned dim_nodes = (newindx >= split_indx) ? split_indx : nkeys - split_indx; const unsigned dim_used = (sizeof(indx_t) + NODESIZE + 1) * dim_nodes; if (new_size >= dim_used) { - /* Find split point */ + /* Search for best acceptable split point */ i = (newindx < split_indx) ? 0 : nkeys; int dir = (newindx < split_indx) ? 1 : -1; size_t before = 0, after = new_size + page_used(env, mp); unsigned best_split = split_indx; - unsigned best_offset = INT_MAX; + unsigned best_shift = INT_MAX; mdbx_trace("seek separator from %u, step %i, default %u, new-idx %u, " "new-size %zu", @@ -19223,11 +19224,13 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, if (before <= max_space && after <= max_space) { const unsigned split = i + (dir > 0); if (split >= minkeys && split <= nkeys + 1 - minkeys) { - const unsigned offset = branchless_abs(split_indx - split); - if (offset >= best_offset) + const unsigned shift = branchless_abs(split_indx - split); + if (shift >= best_shift) break; - best_offset = offset; + best_shift = shift; best_split = split; + if (!best_shift) + break; } } i += dir; @@ -19237,10 +19240,9 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, mdbx_trace("chosen %u", split_indx); } mdbx_assert(env, - split_indx >= minkeys && split_indx <= nkeys - minkeys + 1); + split_indx >= minkeys && split_indx <= nkeys + 1 - minkeys); - sepkey.iov_len = newkey->iov_len; - sepkey.iov_base = newkey->iov_base; + sepkey = *newkey; if (split_indx != newindx) { MDBX_node *node = (MDBX_node *)((char *)mp + tmp_ki_copy->mp_ptrs[split_indx] + @@ -19265,7 +19267,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, did_split_parent = true; /* We want other splits to find mn when doing fixups */ WITH_CURSOR_TRACKING( - mn, rc = mdbx_page_split(&mn, &sepkey, NULL, sister->mp_pgno, 0)); + mn, rc = page_split(&mn, &sepkey, NULL, sister->mp_pgno, 0)); if (unlikely(rc != MDBX_SUCCESS)) goto done; mdbx_cassert(mc, (int)mc->mc_snum - snum == mc->mc_db->md_depth - depth); @@ -19349,10 +19351,10 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, switch (PAGETYPE_WHOLE(sister)) { case P_LEAF: { mdbx_cassert(mc, newpgno == 0 || newpgno == P_INVALID); - rc = mdbx_node_add_leaf(mc, 0, newkey, newdata, nflags); + rc = mdbx_node_add_leaf(mc, 0, newkey, newdata, naf); } break; case P_LEAF | P_LEAF2: { - mdbx_cassert(mc, (nflags & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) == 0); + mdbx_cassert(mc, (naf & (F_BIGDATA | F_SUBDATA | F_DUPDATA)) == 0); mdbx_cassert(mc, newpgno == 0 || newpgno == P_INVALID); rc = mdbx_node_add_leaf2(mc, 0, newkey); } break; @@ -19389,19 +19391,18 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, mc->mc_pg[mc->mc_top] = sister; i = split_indx; unsigned n = 0; - pgno_t pgno = 0; do { mdbx_trace("i %u, nkeys %u => n %u, rp #%u", i, nkeys, n, sister->mp_pgno); + pgno_t pgno = 0; MDBX_val *rdata = NULL; if (i == newindx) { - rkey.iov_base = newkey->iov_base; - rkey.iov_len = newkey->iov_len; + rkey = *newkey; if (IS_LEAF(mp)) rdata = newdata; else pgno = newpgno; - flags = nflags; + flags = naf; /* Update index for the new key. */ mc->mc_ki[mc->mc_top] = (indx_t)n; } else { @@ -19506,14 +19507,14 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, m3->mc_ki[k + 1] = m3->mc_ki[k]; m3->mc_pg[k + 1] = m3->mc_pg[k]; } - m3->mc_ki[0] = (m3->mc_ki[0] >= nkeys) ? 1 : 0; + m3->mc_ki[0] = m3->mc_ki[0] >= nkeys; m3->mc_pg[0] = mc->mc_pg[0]; m3->mc_snum++; m3->mc_top++; } if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp && !pure_left) { - if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDBX_SPLIT_REPLACE)) + if (m3->mc_ki[mc->mc_top] >= newindx && !(naf & MDBX_SPLIT_REPLACE)) m3->mc_ki[mc->mc_top]++; if (m3->mc_ki[mc->mc_top] >= nkeys) { m3->mc_pg[mc->mc_top] = sister; @@ -19544,7 +19545,7 @@ done: else { if (mdbx_audit_enabled()) rc = mdbx_cursor_check_updating(mc); - if (unlikely(nflags & MDBX_RESERVE)) { + if (unlikely(naf & MDBX_RESERVE)) { MDBX_node *node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); if (!(node_flags(node) & F_BIGDATA)) newdata->iov_base = node_data(node);