diff --git a/src/core.c b/src/core.c index 2a123239..8276f143 100644 --- a/src/core.c +++ b/src/core.c @@ -660,6 +660,7 @@ page_room(const MDBX_page *mp) { return mp->mp_upper - mp->mp_lower; } +/* Maximum free space in an empty page */ MDBX_NOTHROW_PURE_FUNCTION static __always_inline unsigned page_space(const MDBX_env *env) { STATIC_ASSERT(PAGEHDRSZ % 2 == 0); @@ -17309,6 +17310,7 @@ static int mdbx_node_move(MDBX_cursor *csrc, MDBX_cursor *cdst, bool fromleft) { } break; default: + assert(false); goto bailout; } @@ -18627,6 +18629,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, (newindx < nkeys) ? /* split at the middle */ (nkeys + 1) / 2 : /* split at the end (i.e. like append-mode ) */ nkeys - minkeys + 1; + mdbx_assert(env, split_indx >= minkeys && split_indx <= nkeys - minkeys + 1); mdbx_cassert(mc, !IS_BRANCH(mp) || newindx > 0); /* It is reasonable and possible to split the page at the begin */ @@ -18724,11 +18727,6 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, goto done; } } else { - /* Maximum free space in an empty page */ - const unsigned max_space = page_space(env); - const size_t new_size = IS_LEAF(mp) ? leaf_size(env, newkey, newdata) - : branch_size(env, newkey); - /* grab a page to hold a temporary copy */ tmp_ki_copy = mdbx_page_malloc(mc->mc_txn, 1); if (unlikely(tmp_ki_copy == NULL)) { @@ -18736,6 +18734,10 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, goto done; } + const unsigned max_space = page_space(env); + const size_t new_size = IS_LEAF(mp) ? leaf_size(env, newkey, newdata) + : branch_size(env, newkey); + /* prepare to insert */ for (unsigned j = i = 0; i < nkeys; ++i, ++j) { tmp_ki_copy->mp_ptrs[j] = 0; @@ -18748,34 +18750,35 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, tmp_ki_copy->mp_lower = 0; tmp_ki_copy->mp_upper = (indx_t)max_space; - /* When items are relatively large the split point needs - * to be checked, because being off-by-one will make the - * difference between success or failure in mdbx_node_add. + /* Добавляемый узел может не поместиться в страницу-половину вместе + * с количественной половиной узлов из исходной страницы. В худшем случае, + * в страницу-половину с добавляемым узлом могут попасть самые больше узлы + * из исходной страницы, а другую половину только узлы с самыми короткими + * ключами и с пустыми данными. Поэтому, чтобы найти подходящую границу + * разреза требуется итерировать узлы и считая их объем. * - * It's also relevant if a page happens to be laid out - * such that one half of its nodes are all "small" and - * the other half of its nodes are "large". If the new - * item is also "large" and falls on the half with - * "large" nodes, it also may not fit. - * - * As a final tweak, if the new item goes on the last - * spot on the page (and thus, onto the new page), bias - * the split so the new page is emptier than the old page. - * This yields better packing during sequential inserts. */ + * Однако, при простом количественном делении (без учета размера ключей + * и данных) на страницах-половинах будет примерно вдвое меньше узлов. + * Поэтому добавляемый узел точно поместится, если его размер не больше + * чем место "освобождающееся" от заголовков узлов, которые переедут + * в другую страницу-половину. Кроме этого, как минимум по одному байту + * будет в каждом ключе, в худшем случае кроме одного, который может быть + * нулевого размера. */ - if (nkeys < 32 || new_size > max_space / 16) { + if (newindx == split_indx && split_indx + minkeys <= nkeys) + split_indx += 1; + mdbx_assert(env, + split_indx >= minkeys && split_indx <= nkeys - minkeys + 1); + const unsigned dim_nodes = + (newindx >= split_indx) ? split_indx : nkeys - split_indx; + const unsigned dim_used = (sizeof(indx_t) + NODESIZE + 1) * dim_nodes; + if (new_size >= dim_used) { /* Find split point */ - int dir; - if (newindx <= split_indx) { - i = 0; - dir = 1; - } else { - i = nkeys; - dir = -1; - } + i = (newindx < split_indx) ? 0 : nkeys; + int dir = (newindx < split_indx) ? 1 : -1; size_t before = 0, after = new_size + page_used(env, mp); - int best = split_indx; - int best_offset = nkeys + 1; + unsigned best_split = split_indx; + unsigned best_offset = INT_MAX; mdbx_trace("seek separator from %u, step %i, default %u, new-idx %u, " "new-size %zu", @@ -18788,8 +18791,8 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, (MDBX_node *)((char *)mp + tmp_ki_copy->mp_ptrs[i] + PAGEHDRSZ); size = NODESIZE + node_ks(node) + sizeof(indx_t); if (IS_LEAF(mp)) - size += F_ISSET(node_flags(node), F_BIGDATA) ? sizeof(pgno_t) - : node_ds(node); + size += (node_flags(node) & F_BIGDATA) ? sizeof(pgno_t) + : node_ds(node); size = EVEN(size); } @@ -18799,21 +18802,23 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *const newkey, size, before, after, max_space); if (before <= max_space && after <= max_space) { - int offset = branchless_abs(split_indx - i); - if (offset >= best_offset) - break; - best_offset = offset; - best = i; + const unsigned split = i + (dir > 0); + if (split >= minkeys && split <= nkeys + 1 - minkeys) { + const unsigned offset = branchless_abs(split_indx - split); + if (offset >= best_offset) + break; + best_offset = offset; + best_split = split; + } } i += dir; } while (i < nkeys); - split_indx = best + (dir > 0); - split_indx = (split_indx <= nkeys - minkeys + 1) ? split_indx - : nkeys - minkeys + 1; - split_indx = (split_indx >= minkeys) ? split_indx : minkeys; + split_indx = best_split; mdbx_trace("chosen %u", split_indx); } + mdbx_assert(env, + split_indx >= minkeys && split_indx <= nkeys - minkeys + 1); sepkey.iov_len = newkey->iov_len; sepkey.iov_base = newkey->iov_base;