/// \copyright SPDX-License-Identifier: Apache-2.0 /// \note Please refer to the COPYRIGHT file for explanations license change, /// credits and acknowledgments. /// \author Леонид Юрьев aka Leonid Yuriev \date 2015-2024 #include "internals.h" __cold int cursor_check(const MDBX_cursor *mc) { if (!mc->txn->tw.dirtylist) { cASSERT(mc, (mc->txn->flags & MDBX_WRITEMAP) != 0 && !MDBX_AVOID_MSYNC); } else { cASSERT(mc, (mc->txn->flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC); cASSERT(mc, mc->txn->tw.dirtyroom + mc->txn->tw.dirtylist->length == (mc->txn->parent ? mc->txn->parent->tw.dirtyroom : mc->txn->env->options.dp_limit)); } cASSERT(mc, (mc->checking & z_updating) ? mc->top + 1 <= mc->tree->height : mc->top + 1 == mc->tree->height); if (unlikely((mc->checking & z_updating) ? mc->top + 1 > mc->tree->height : mc->top + 1 != mc->tree->height)) return MDBX_CURSOR_FULL; if (is_pointed(mc) && (mc->checking & z_updating) == 0) { const page_t *mp = mc->pg[mc->top]; const size_t nkeys = page_numkeys(mp); if (!is_hollow(mc)) { cASSERT(mc, mc->ki[mc->top] < nkeys); if (mc->ki[mc->top] >= nkeys) return MDBX_CURSOR_FULL; } if (inner_pointed(mc)) { cASSERT(mc, is_filled(mc)); if (!is_filled(mc)) return MDBX_CURSOR_FULL; } } for (intptr_t n = 0; n <= mc->top; ++n) { page_t *mp = mc->pg[n]; const size_t nkeys = page_numkeys(mp); const bool expect_branch = (n < mc->tree->height - 1) ? true : false; const bool expect_nested_leaf = (n + 1 == mc->tree->height - 1) ? true : false; const bool branch = is_branch(mp) ? true : false; cASSERT(mc, branch == expect_branch); if (unlikely(branch != expect_branch)) return MDBX_CURSOR_FULL; if ((mc->checking & z_updating) == 0) { cASSERT(mc, nkeys > mc->ki[n] || (!branch && nkeys == mc->ki[n] && (mc->flags & z_hollow) != 0)); if (unlikely(nkeys <= mc->ki[n] && !(!branch && nkeys == mc->ki[n] && (mc->flags & z_hollow) != 0))) return MDBX_CURSOR_FULL; } else { cASSERT(mc, nkeys + 1 >= mc->ki[n]); if (unlikely(nkeys + 1 < mc->ki[n])) return MDBX_CURSOR_FULL; } int err = page_check(mc, mp); if (unlikely(err != MDBX_SUCCESS)) return err; for (size_t i = 0; i < nkeys; ++i) { if (branch) { node_t *node = page_node(mp, i); cASSERT(mc, node_flags(node) == 0); if (unlikely(node_flags(node) != 0)) return MDBX_CURSOR_FULL; pgno_t pgno = node_pgno(node); page_t *np; err = page_get(mc, pgno, &np, mp->txnid); cASSERT(mc, err == MDBX_SUCCESS); if (unlikely(err != MDBX_SUCCESS)) return err; const bool nested_leaf = is_leaf(np) ? true : false; cASSERT(mc, nested_leaf == expect_nested_leaf); if (unlikely(nested_leaf != expect_nested_leaf)) return MDBX_CURSOR_FULL; err = page_check(mc, np); if (unlikely(err != MDBX_SUCCESS)) return err; } } } return MDBX_SUCCESS; } __cold int cursor_check_updating(MDBX_cursor *mc) { const uint8_t checking = mc->checking; mc->checking |= z_updating; const int rc = cursor_check(mc); mc->checking = checking; return rc; } bool cursor_is_tracked(const MDBX_cursor *mc) { for (MDBX_cursor *scan = mc->txn->cursors[cursor_dbi(mc)]; scan; scan = scan->next) if (mc == ((mc->flags & z_inner) ? &scan->subcur->cursor : scan)) return true; return false; } /*----------------------------------------------------------------------------*/ static int touch_dbi(MDBX_cursor *mc) { cASSERT(mc, (mc->flags & z_inner) == 0); cASSERT(mc, (*cursor_dbi_state(mc) & DBI_DIRTY) == 0); *cursor_dbi_state(mc) |= DBI_DIRTY; mc->txn->flags |= MDBX_TXN_DIRTY; if (!cursor_is_core(mc)) { /* Touch DB record of named DB */ cursor_couple_t cx; int rc = dbi_check(mc->txn, MAIN_DBI); if (unlikely(rc != MDBX_SUCCESS)) return rc; rc = cursor_init(&cx.outer, mc->txn, MAIN_DBI); if (unlikely(rc != MDBX_SUCCESS)) return rc; mc->txn->dbi_state[MAIN_DBI] |= DBI_DIRTY; rc = tree_search(&cx.outer, &container_of(mc->clc, kvx_t, clc)->name, Z_MODIFY); if (unlikely(rc != MDBX_SUCCESS)) return rc; } return MDBX_SUCCESS; } __hot int cursor_touch(MDBX_cursor *const mc, const MDBX_val *key, const MDBX_val *data) { cASSERT(mc, (mc->txn->flags & MDBX_TXN_RDONLY) == 0); cASSERT(mc, is_pointed(mc) || mc->tree->height == 0); cASSERT(mc, cursor_is_tracked(mc)); cASSERT(mc, F_ISSET(dbi_state(mc->txn, FREE_DBI), DBI_LINDO | DBI_VALID)); cASSERT(mc, F_ISSET(dbi_state(mc->txn, MAIN_DBI), DBI_LINDO | DBI_VALID)); if ((mc->flags & z_inner) == 0) { MDBX_txn *const txn = mc->txn; dpl_lru_turn(txn); if (unlikely((*cursor_dbi_state(mc) & DBI_DIRTY) == 0)) { int err = touch_dbi(mc); if (unlikely(err != MDBX_SUCCESS)) return err; } /* Estimate how much space this operation will take: */ /* 1) Max b-tree height, reasonable enough with including dups' sub-tree */ size_t need = CURSOR_STACK_SIZE + 3; /* 2) GC/FreeDB for any payload */ if (!cursor_is_gc(mc)) { need += txn->dbs[FREE_DBI].height + (size_t)3; /* 3) Named DBs also dirty the main DB */ if (cursor_is_main(mc)) need += txn->dbs[MAIN_DBI].height + (size_t)3; } #if xMDBX_DEBUG_SPILLING != 2 /* production mode */ /* 4) Double the page chain estimation * for extensively splitting, rebalance and merging */ need += need; /* 5) Factor the key+data which to be put in */ need += bytes2pgno(txn->env, node_size(key, data)) + (size_t)1; #else /* debug mode */ (void)key; (void)data; txn->env->debug_dirtied_est = ++need; txn->env->debug_dirtied_act = 0; #endif /* xMDBX_DEBUG_SPILLING == 2 */ int err = txn_spill(txn, mc, need); if (unlikely(err != MDBX_SUCCESS)) return err; } if (likely(mc->top >= 0) && !is_modifable(mc->txn, mc->pg[mc->top])) { const int8_t top = mc->top; mc->top = 0; do { int err = page_touch(mc); if (unlikely(err != MDBX_SUCCESS)) return err; mc->top += 1; } while (mc->top <= top); mc->top = top; } return MDBX_SUCCESS; } /*----------------------------------------------------------------------------*/ int cursor_shadow(MDBX_cursor *parent_cursor, MDBX_txn *nested_txn, const size_t dbi) { tASSERT(nested_txn, dbi > FREE_DBI && dbi < nested_txn->n_dbi); const size_t size = parent_cursor->subcur ? sizeof(MDBX_cursor) + sizeof(subcur_t) : sizeof(MDBX_cursor); for (MDBX_cursor *bk; parent_cursor; parent_cursor = bk->next) { cASSERT(parent_cursor, parent_cursor != parent_cursor->next); bk = parent_cursor; if (parent_cursor->signature != cur_signature_live) continue; bk = osal_malloc(size); if (unlikely(!bk)) return MDBX_ENOMEM; #if MDBX_DEBUG memset(bk, 0xCD, size); VALGRIND_MAKE_MEM_UNDEFINED(bk, size); #endif /* MDBX_DEBUG */ *bk = *parent_cursor; parent_cursor->backup = bk; /* Kill pointers into src to reduce abuse: The * user may not use mc until dst ends. But we need a valid * txn pointer here for cursor fixups to keep working. */ parent_cursor->txn = nested_txn; parent_cursor->tree = &nested_txn->dbs[dbi]; parent_cursor->dbi_state = &nested_txn->dbi_state[dbi]; subcur_t *mx = parent_cursor->subcur; if (mx != nullptr) { *(subcur_t *)(bk + 1) = *mx; mx->cursor.txn = nested_txn; mx->cursor.dbi_state = parent_cursor->dbi_state; } parent_cursor->next = nested_txn->cursors[dbi]; nested_txn->cursors[dbi] = parent_cursor; } return MDBX_SUCCESS; } void cursor_eot(MDBX_cursor *mc, const bool merge) { const unsigned stage = mc->signature; MDBX_cursor *const bk = mc->backup; ENSURE(mc->txn->env, stage == cur_signature_live || (stage == cur_signature_wait4eot && bk)); if (bk) { subcur_t *mx = mc->subcur; cASSERT(mc, mc->txn->parent != nullptr); /* Zap: Using uninitialized memory '*mc->backup'. */ MDBX_SUPPRESS_GOOFY_MSVC_ANALYZER(6001); ENSURE(mc->txn->env, bk->signature == cur_signature_live); cASSERT(mc, mx == bk->subcur); if (merge) { /* Update pointers to parent txn */ mc->next = bk->next; mc->backup = bk->backup; mc->txn = bk->txn; mc->tree = bk->tree; mc->dbi_state = bk->dbi_state; if (mx) { mx->cursor.txn = mc->txn; mx->cursor.dbi_state = mc->dbi_state; } } else { /* Restore from backup, i.e. rollback/abort nested txn */ *mc = *bk; if (mx) *mx = *(subcur_t *)(bk + 1); } if (stage == cur_signature_wait4eot /* Cursor was closed by user */) mc->signature = stage /* Promote closed state to parent txn */; bk->signature = 0; osal_free(bk); } else { ENSURE(mc->txn->env, stage == cur_signature_live); mc->signature = cur_signature_ready4dispose /* Cursor may be reused */; mc->next = mc; } } /*----------------------------------------------------------------------------*/ static __always_inline int couple_init(cursor_couple_t *couple, const MDBX_txn *const txn, tree_t *const tree, kvx_t *const kvx, uint8_t *const dbi_state) { VALGRIND_MAKE_MEM_UNDEFINED(couple, sizeof(cursor_couple_t)); tASSERT(txn, F_ISSET(*dbi_state, DBI_VALID | DBI_LINDO)); couple->outer.signature = cur_signature_live; couple->outer.next = &couple->outer; couple->outer.backup = nullptr; couple->outer.txn = (MDBX_txn *)txn; couple->outer.tree = tree; couple->outer.clc = &kvx->clc; couple->outer.dbi_state = dbi_state; couple->outer.top_and_flags = z_fresh_mark; STATIC_ASSERT((int)z_branch == P_BRANCH && (int)z_leaf == P_LEAF && (int)z_largepage == P_LARGE && (int)z_dupfix == P_DUPFIX); couple->outer.checking = (AUDIT_ENABLED() || (txn->env->flags & MDBX_VALIDATION)) ? z_pagecheck | z_leaf : z_leaf; couple->outer.subcur = nullptr; if (tree->flags & MDBX_DUPSORT) { couple->inner.cursor.signature = cur_signature_live; subcur_t *const mx = couple->outer.subcur = &couple->inner; mx->cursor.subcur = nullptr; mx->cursor.next = &mx->cursor; mx->cursor.txn = (MDBX_txn *)txn; mx->cursor.tree = &mx->nested_tree; mx->cursor.clc = ptr_disp(couple->outer.clc, sizeof(clc_t)); tASSERT(txn, &mx->cursor.clc->k == &kvx->clc.v); mx->cursor.dbi_state = dbi_state; mx->cursor.top_and_flags = z_fresh_mark | z_inner; STATIC_ASSERT(MDBX_DUPFIXED * 2 == P_DUPFIX); mx->cursor.checking = couple->outer.checking + ((tree->flags & MDBX_DUPFIXED) << 1); } if (unlikely(*dbi_state & DBI_STALE)) return tbl_fetch(couple->outer.txn, cursor_dbi(&couple->outer)); if (unlikely(kvx->clc.k.lmax == 0)) return tbl_setup(txn->env, kvx, tree); return MDBX_SUCCESS; } __cold int cursor_init4walk(cursor_couple_t *couple, const MDBX_txn *const txn, tree_t *const tree, kvx_t *const kvx) { return couple_init(couple, txn, tree, kvx, txn->dbi_state); } int cursor_init(MDBX_cursor *mc, const MDBX_txn *txn, size_t dbi) { STATIC_ASSERT(offsetof(cursor_couple_t, outer) == 0); int rc = dbi_check(txn, dbi); if (likely(rc == MDBX_SUCCESS)) rc = couple_init(container_of(mc, cursor_couple_t, outer), txn, &txn->dbs[dbi], &txn->env->kvs[dbi], &txn->dbi_state[dbi]); return rc; } __cold static int unexpected_dupsort(MDBX_cursor *mc) { ERROR("unexpected dupsort-page/node for non-dupsort db/cursor (dbi %zu)", cursor_dbi(mc)); mc->txn->flags |= MDBX_TXN_ERROR; be_poor(mc); return MDBX_CORRUPTED; } int cursor_dupsort_setup(MDBX_cursor *mc, const node_t *node, const page_t *mp) { cASSERT(mc, is_pointed(mc)); subcur_t *mx = mc->subcur; if (!MDBX_DISABLE_VALIDATION && unlikely(mx == nullptr)) return unexpected_dupsort(mc); const uint8_t flags = node_flags(node); switch (flags) { default: ERROR("invalid node flags %u", flags); goto bailout; case N_DUP | N_TREE: if (!MDBX_DISABLE_VALIDATION && unlikely(node_ds(node) != sizeof(tree_t))) { ERROR("invalid nested-db record size (%zu, expect %zu)", node_ds(node), sizeof(tree_t)); goto bailout; } memcpy(&mx->nested_tree, node_data(node), sizeof(tree_t)); const txnid_t pp_txnid = mp->txnid; if (!MDBX_DISABLE_VALIDATION && unlikely(mx->nested_tree.mod_txnid > pp_txnid)) { ERROR("nested-db.mod_txnid (%" PRIaTXN ") > page-txnid (%" PRIaTXN ")", mx->nested_tree.mod_txnid, pp_txnid); goto bailout; } mx->cursor.top_and_flags = z_fresh_mark | z_inner; break; case N_DUP: if (!MDBX_DISABLE_VALIDATION && unlikely(node_ds(node) <= PAGEHDRSZ)) { ERROR("invalid nested-page size %zu", node_ds(node)); goto bailout; } page_t *sp = node_data(node); mx->nested_tree.height = 1; mx->nested_tree.branch_pages = 0; mx->nested_tree.leaf_pages = 1; mx->nested_tree.large_pages = 0; mx->nested_tree.items = page_numkeys(sp); mx->nested_tree.root = 0; mx->nested_tree.mod_txnid = mp->txnid; mx->cursor.top_and_flags = z_inner; mx->cursor.pg[0] = sp; mx->cursor.ki[0] = 0; mx->nested_tree.flags = flags_db2sub(mc->tree->flags); mx->nested_tree.dupfix_size = (mc->tree->flags & MDBX_DUPFIXED) ? sp->dupfix_ksize : 0; break; } if (unlikely(mx->nested_tree.dupfix_size != mc->tree->dupfix_size)) { if (!MDBX_DISABLE_VALIDATION && unlikely(mc->tree->dupfix_size != 0)) { ERROR("cursor mismatched nested-db dupfix_size %u", mc->tree->dupfix_size); goto bailout; } if (!MDBX_DISABLE_VALIDATION && unlikely((mc->tree->flags & MDBX_DUPFIXED) == 0)) { ERROR("mismatched nested-db flags %u", mc->tree->flags); goto bailout; } if (!MDBX_DISABLE_VALIDATION && unlikely(mx->nested_tree.dupfix_size < mc->clc->v.lmin || mx->nested_tree.dupfix_size > mc->clc->v.lmax)) { ERROR("mismatched nested-db.dupfix_size (%u) <> min/max value-length " "(%zu/%zu)", mx->nested_tree.dupfix_size, mc->clc->v.lmin, mc->clc->v.lmax); goto bailout; } mc->tree->dupfix_size = mx->nested_tree.dupfix_size; mc->clc->v.lmin = mc->clc->v.lmax = mx->nested_tree.dupfix_size; } DEBUG("Sub-db dbi -%zu root page %" PRIaPGNO, cursor_dbi(&mx->cursor), mx->nested_tree.root); return MDBX_SUCCESS; bailout: mx->cursor.top_and_flags = z_poor_mark | z_inner; return MDBX_CORRUPTED; } /*----------------------------------------------------------------------------*/ MDBX_cursor *cursor_cpstk(const MDBX_cursor *csrc, MDBX_cursor *cdst) { cASSERT(cdst, cdst->txn == csrc->txn); cASSERT(cdst, cdst->tree == csrc->tree); cASSERT(cdst, cdst->clc == csrc->clc); cASSERT(cdst, cdst->dbi_state == csrc->dbi_state); cdst->top_and_flags = csrc->top_and_flags; for (intptr_t i = 0; i <= csrc->top; i++) { cdst->pg[i] = csrc->pg[i]; cdst->ki[i] = csrc->ki[i]; } return cdst; } static __always_inline int sibling(MDBX_cursor *mc, bool right) { if (mc->top < 1) { /* root has no siblings */ return MDBX_NOTFOUND; } cursor_pop(mc); DEBUG("parent page is page %" PRIaPGNO ", index %u", mc->pg[mc->top]->pgno, mc->ki[mc->top]); int err; if (right ? (mc->ki[mc->top] + (size_t)1 >= page_numkeys(mc->pg[mc->top])) : (mc->ki[mc->top] == 0)) { DEBUG("no more keys aside, moving to next %s sibling", right ? "right" : "left"); err = right ? cursor_sibling_right(mc) : cursor_sibling_left(mc); if (err != MDBX_SUCCESS) { if (likely(err == MDBX_NOTFOUND)) /* undo cursor_pop before returning */ mc->top += 1; return err; } } else { mc->ki[mc->top] += right ? 1 : -1; DEBUG("just moving to %s index key %u", right ? "right" : "left", mc->ki[mc->top]); } cASSERT(mc, is_branch(mc->pg[mc->top])); page_t *mp = mc->pg[mc->top]; const node_t *node = page_node(mp, mc->ki[mc->top]); err = page_get(mc, node_pgno(node), &mp, mp->txnid); if (likely(err == MDBX_SUCCESS)) { err = cursor_push(mc, mp, right ? 0 : (indx_t)page_numkeys(mp) - 1); if (likely(err == MDBX_SUCCESS)) return err; } be_poor(mc); return err; } __hot int cursor_sibling_left(MDBX_cursor *mc) { int err = sibling(mc, false); if (likely(err != MDBX_NOTFOUND)) return err; cASSERT(mc, mc->top >= 0); size_t nkeys = page_numkeys(mc->pg[mc->top]); cASSERT(mc, nkeys > 0); mc->ki[mc->top] = 0; return MDBX_NOTFOUND; } __hot int cursor_sibling_right(MDBX_cursor *mc) { int err = sibling(mc, true); if (likely(err != MDBX_NOTFOUND)) return err; cASSERT(mc, mc->top >= 0); size_t nkeys = page_numkeys(mc->pg[mc->top]); cASSERT(mc, nkeys > 0); mc->ki[mc->top] = (indx_t)nkeys - 1; mc->flags = z_eof_soft | z_eof_hard | (mc->flags & z_clear_mask); inner_gone(mc); return MDBX_NOTFOUND; } /*----------------------------------------------------------------------------*/ /* Функция-шаблон: Приземляет курсор на данные в текущей позиции. * В том числе, загружает данные во вложенный курсор при его наличии. */ static __always_inline int cursor_bring(const bool inner, const bool tend2first, MDBX_cursor *__restrict mc, MDBX_val *__restrict key, MDBX_val *__restrict data, bool eof) { if (inner) { cASSERT(mc, !data && !mc->subcur && (mc->flags & z_inner) != 0); } else { cASSERT(mc, (mc->flags & z_inner) == 0); } const page_t *mp = mc->pg[mc->top]; if (!MDBX_DISABLE_VALIDATION && unlikely(!check_leaf_type(mc, mp))) { ERROR("unexpected leaf-page #%" PRIaPGNO " type 0x%x seen by cursor", mp->pgno, mp->flags); return MDBX_CORRUPTED; } const size_t nkeys = page_numkeys(mp); cASSERT(mc, nkeys > 0); const size_t ki = mc->ki[mc->top]; cASSERT(mc, nkeys > ki); cASSERT(mc, !eof || ki == nkeys - 1); if (inner && is_dupfix_leaf(mp)) { be_filled(mc); if (eof) mc->flags |= z_eof_soft; if (likely(key)) *key = page_dupfix_key(mp, ki, mc->tree->dupfix_size); return MDBX_SUCCESS; } const node_t *__restrict node = page_node(mp, ki); if (!inner && (node_flags(node) & N_DUP)) { int err = cursor_dupsort_setup(mc, node, mp); if (unlikely(err != MDBX_SUCCESS)) return err; MDBX_ANALYSIS_ASSUME(mc->subcur != nullptr); if (node_flags(node) & N_TREE) { err = tend2first ? inner_first(&mc->subcur->cursor, data) : inner_last(&mc->subcur->cursor, data); if (unlikely(err != MDBX_SUCCESS)) return err; } else { if (!tend2first) { mc->subcur->cursor.ki[0] = (indx_t)mc->subcur->nested_tree.items - 1; mc->subcur->cursor.flags |= z_eof_soft; } if (data) { const page_t *inner_mp = mc->subcur->cursor.pg[0]; cASSERT(mc, is_subpage(inner_mp) && is_leaf(inner_mp)); const size_t inner_ki = mc->subcur->cursor.ki[0]; if (is_dupfix_leaf(inner_mp)) *data = page_dupfix_key(inner_mp, inner_ki, mc->tree->dupfix_size); else *data = get_key(page_node(inner_mp, inner_ki)); } } be_filled(mc); } else { if (!inner) inner_gone(mc); if (data) { int err = node_read(mc, node, data, mp); if (unlikely(err != MDBX_SUCCESS)) return err; } be_filled(mc); if (eof) mc->flags |= z_eof_soft; } get_key_optional(node, key); return MDBX_SUCCESS; } /* Функция-шаблон: Устанавливает курсор в начало или конец. */ static __always_inline int cursor_brim(const bool inner, const bool tend2first, MDBX_cursor *__restrict mc, MDBX_val *__restrict key, MDBX_val *__restrict data) { if (mc->top != 0) { int err = tree_search(mc, nullptr, tend2first ? Z_FIRST : Z_LAST); if (unlikely(err != MDBX_SUCCESS)) return err; } const size_t nkeys = page_numkeys(mc->pg[mc->top]); cASSERT(mc, nkeys > 0); mc->ki[mc->top] = tend2first ? 0 : nkeys - 1; return cursor_bring(inner, tend2first, mc, key, data, !tend2first); } __hot int inner_first(MDBX_cursor *mc, MDBX_val *data) { return cursor_brim(true, true, mc, data, nullptr); } __hot int inner_last(MDBX_cursor *mc, MDBX_val *data) { return cursor_brim(true, false, mc, data, nullptr); } __hot int outer_first(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data) { return cursor_brim(false, true, mc, key, data); } __hot int outer_last(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data) { return cursor_brim(false, false, mc, key, data); } /*----------------------------------------------------------------------------*/ /* Функция-шаблон: Передвигает курсор на одну позицию. * При необходимости управляет вложенным курсором. */ static __always_inline int cursor_step(const bool inner, const bool forward, MDBX_cursor *__restrict mc, MDBX_val *__restrict key, MDBX_val *__restrict data, MDBX_cursor_op op) { if (forward) { if (inner) cASSERT(mc, op == MDBX_NEXT); else cASSERT(mc, op == MDBX_NEXT || op == MDBX_NEXT_DUP || op == MDBX_NEXT_NODUP); } else { if (inner) cASSERT(mc, op == MDBX_PREV); else cASSERT(mc, op == MDBX_PREV || op == MDBX_PREV_DUP || op == MDBX_PREV_NODUP); } if (inner) { cASSERT(mc, !data && !mc->subcur && (mc->flags & z_inner) != 0); } else { cASSERT(mc, (mc->flags & z_inner) == 0); } if (unlikely(is_poor(mc))) { int state = mc->flags; if (state & z_fresh) { if (forward) return inner ? inner_first(mc, key) : outer_first(mc, key, data); else return inner ? inner_last(mc, key) : outer_last(mc, key, data); } mc->flags = inner ? z_inner | z_poor_mark : z_poor_mark; return (state & z_after_delete) ? MDBX_NOTFOUND : MDBX_ENODATA; } const page_t *mp = mc->pg[mc->top]; const intptr_t nkeys = page_numkeys(mp); cASSERT(mc, nkeys > 0); intptr_t ki = mc->ki[mc->top]; const uint8_t state = mc->flags & (z_after_delete | z_hollow | z_eof_hard | z_eof_soft); if (likely(state == 0)) { cASSERT(mc, ki < nkeys); if (!inner && op != (forward ? MDBX_NEXT_NODUP : MDBX_PREV_NODUP)) { int err = MDBX_NOTFOUND; if (inner_pointed(mc)) { err = forward ? inner_next(&mc->subcur->cursor, data) : inner_prev(&mc->subcur->cursor, data); if (likely(err == MDBX_SUCCESS)) { get_key_optional(page_node(mp, ki), key); return MDBX_SUCCESS; } if (unlikely(err != MDBX_NOTFOUND && err != MDBX_ENODATA)) { cASSERT(mc, !inner_pointed(mc)); return err; } cASSERT(mc, !forward || (mc->subcur->cursor.flags & z_eof_soft)); } if (op == (forward ? MDBX_NEXT_DUP : MDBX_PREV_DUP)) return err; } if (!inner) inner_gone(mc); } else { if (mc->flags & z_hollow) { cASSERT(mc, !inner_pointed(mc)); return MDBX_ENODATA; } if (!inner && op == (forward ? MDBX_NEXT_DUP : MDBX_PREV_DUP)) return MDBX_NOTFOUND; if (forward) { if (state & z_after_delete) { if (ki < nkeys) goto bring; } else { cASSERT(mc, state & (z_eof_soft | z_eof_hard)); return MDBX_NOTFOUND; } } else if (state & z_eof_hard) { mc->ki[mc->top] = (indx_t)nkeys - 1; goto bring; } } DEBUG("turn-%s: top page was %" PRIaPGNO " in cursor %p, ki %zi of %zi", forward ? "next" : "prev", mp->pgno, __Wpedantic_format_voidptr(mc), ki, nkeys); if (forward) { if (likely(++ki < nkeys)) mc->ki[mc->top] = (indx_t)ki; else { DEBUG("%s", "=====> move to next sibling page"); int err = cursor_sibling_right(mc); if (unlikely(err != MDBX_SUCCESS)) return err; mp = mc->pg[mc->top]; DEBUG("next page is %" PRIaPGNO ", key index %u", mp->pgno, mc->ki[mc->top]); } } else { if (likely(--ki >= 0)) mc->ki[mc->top] = (indx_t)ki; else { DEBUG("%s", "=====> move to prev sibling page"); int err = cursor_sibling_left(mc); if (unlikely(err != MDBX_SUCCESS)) return err; mp = mc->pg[mc->top]; DEBUG("prev page is %" PRIaPGNO ", key index %u", mp->pgno, mc->ki[mc->top]); } } DEBUG("==> cursor points to page %" PRIaPGNO " with %zu keys, key index %u", mp->pgno, page_numkeys(mp), mc->ki[mc->top]); bring: return cursor_bring(inner, forward, mc, key, data, false); } __hot int inner_next(MDBX_cursor *mc, MDBX_val *data) { return cursor_step(true, true, mc, data, nullptr, MDBX_NEXT); } __hot int inner_prev(MDBX_cursor *mc, MDBX_val *data) { return cursor_step(true, false, mc, data, nullptr, MDBX_PREV); } __hot int outer_next(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, MDBX_cursor_op op) { return cursor_step(false, true, mc, key, data, op); } __hot int outer_prev(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, MDBX_cursor_op op) { return cursor_step(false, false, mc, key, data, op); } /*----------------------------------------------------------------------------*/ __hot int cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data, unsigned flags) { int err; DKBUF_DEBUG; MDBX_env *const env = mc->txn->env; if (LOG_ENABLED(MDBX_LOG_DEBUG) && (flags & MDBX_RESERVE)) data->iov_base = nullptr; DEBUG("==> put db %d key [%s], size %" PRIuPTR ", data [%s] size %" PRIuPTR, cursor_dbi_dbg(mc), DKEY_DEBUG(key), key->iov_len, DVAL_DEBUG(data), data->iov_len); if ((flags & MDBX_CURRENT) != 0 && (mc->flags & z_inner) == 0) { if (unlikely(flags & (MDBX_APPEND | MDBX_NOOVERWRITE))) return MDBX_EINVAL; /* Запрошено обновление текущей записи, на которой сейчас стоит курсор. * Проверяем что переданный ключ совпадает со значением в текущей позиции * курсора. Здесь проще вызвать cursor_ops(), так как для обслуживания * таблиц с MDBX_DUPSORT также требуется текущий размер данных. */ MDBX_val current_key, current_data; err = cursor_ops(mc, ¤t_key, ¤t_data, MDBX_GET_CURRENT); if (unlikely(err != MDBX_SUCCESS)) return err; if (mc->clc->k.cmp(key, ¤t_key) != 0) return MDBX_EKEYMISMATCH; if (unlikely((flags & MDBX_MULTIPLE))) goto drop_current; if (mc->subcur) { node_t *node = page_node(mc->pg[mc->top], mc->ki[mc->top]); if (node_flags(node) & N_DUP) { cASSERT(mc, inner_pointed(mc)); /* Если за ключом более одного значения, либо если размер данных * отличается, то вместо обновления требуется удаление и * последующая вставка. */ if (mc->subcur->nested_tree.items > 1 || current_data.iov_len != data->iov_len) { drop_current: err = cursor_del(mc, flags & MDBX_ALLDUPS); if (unlikely(err != MDBX_SUCCESS)) return err; flags -= MDBX_CURRENT; goto skip_check_samedata; } } else if (unlikely(node_size(key, data) > env->leaf_nodemax)) { /* Уже есть пара key-value хранящаяся в обычном узле. Новые данные * слишком большие для размещения в обычном узле вместе с ключом, но * могут быть размещены в вложенном дереве. Удаляем узел со старыми * данными, чтобы при помещении новых создать вложенное дерево. */ err = cursor_del(mc, 0); if (unlikely(err != MDBX_SUCCESS)) return err; flags -= MDBX_CURRENT; goto skip_check_samedata; } } if (!(flags & MDBX_RESERVE) && unlikely(cmp_lenfast(¤t_data, data) == 0)) return MDBX_SUCCESS /* the same data, nothing to update */; skip_check_samedata:; } int rc = MDBX_SUCCESS; if (mc->tree->height == 0) { /* new database, cursor has nothing to point to */ cASSERT(mc, is_poor(mc)); rc = MDBX_NO_ROOT; } else if ((flags & MDBX_CURRENT) == 0) { bool exact = false; MDBX_val last_key, old_data; if ((flags & MDBX_APPEND) && mc->tree->items > 0) { old_data.iov_base = nullptr; old_data.iov_len = 0; rc = (mc->flags & z_inner) ? inner_last(mc, &last_key) : outer_last(mc, &last_key, &old_data); if (likely(rc == MDBX_SUCCESS)) { const int cmp = mc->clc->k.cmp(key, &last_key); if (likely(cmp > 0)) { mc->ki[mc->top]++; /* step forward for appending */ rc = MDBX_NOTFOUND; } else if (unlikely(cmp != 0)) { /* new-key < last-key */ return MDBX_EKEYMISMATCH; } else { rc = MDBX_SUCCESS; exact = true; } } } else { csr_t csr = /* olddata may not be updated in case DUPFIX-page of dupfix-table */ cursor_seek(mc, (MDBX_val *)key, &old_data, MDBX_SET); rc = csr.err; exact = csr.exact; } if (likely(rc == MDBX_SUCCESS)) { if (exact) { if (unlikely(flags & MDBX_NOOVERWRITE)) { DEBUG("duplicate key [%s]", DKEY_DEBUG(key)); *data = old_data; return MDBX_KEYEXIST; } if (unlikely(mc->flags & z_inner)) { /* nested subtree of DUPSORT-database with the same key, * nothing to update */ eASSERT(env, data->iov_len == 0 && (old_data.iov_len == 0 || /* olddata may not be updated in case DUPFIX-page of dupfix-table */ (mc->tree->flags & MDBX_DUPFIXED))); return MDBX_SUCCESS; } if (unlikely(flags & MDBX_ALLDUPS) && inner_pointed(mc)) { err = cursor_del(mc, MDBX_ALLDUPS); if (unlikely(err != MDBX_SUCCESS)) return err; flags -= MDBX_ALLDUPS; cASSERT(mc, mc->top + 1 == mc->tree->height); rc = (mc->top >= 0) ? MDBX_NOTFOUND : MDBX_NO_ROOT; exact = false; } else if (!(flags & (MDBX_RESERVE | MDBX_MULTIPLE))) { /* checking for early exit without dirtying pages */ if (unlikely(eq_fast(data, &old_data))) { cASSERT(mc, mc->clc->v.cmp(data, &old_data) == 0); if (mc->subcur) { if (flags & MDBX_NODUPDATA) return MDBX_KEYEXIST; if (flags & MDBX_APPENDDUP) return MDBX_EKEYMISMATCH; } /* the same data, nothing to update */ return MDBX_SUCCESS; } cASSERT(mc, mc->clc->v.cmp(data, &old_data) != 0); } } } else if (unlikely(rc != MDBX_NOTFOUND)) return rc; } mc->flags &= ~z_after_delete; MDBX_val xdata, *ref_data = data; size_t *batch_dupfix_done = nullptr, batch_dupfix_given = 0; if (unlikely(flags & MDBX_MULTIPLE)) { batch_dupfix_given = data[1].iov_len; batch_dupfix_done = &data[1].iov_len; *batch_dupfix_done = 0; } /* Cursor is positioned, check for room in the dirty list */ err = cursor_touch(mc, key, ref_data); if (unlikely(err)) return err; if (unlikely(rc == MDBX_NO_ROOT)) { /* new database, write a root leaf page */ DEBUG("%s", "allocating new root leaf page"); pgr_t npr = page_new(mc, P_LEAF); if (unlikely(npr.err != MDBX_SUCCESS)) return npr.err; npr.err = cursor_push(mc, npr.page, 0); if (unlikely(npr.err != MDBX_SUCCESS)) return npr.err; mc->tree->root = npr.page->pgno; mc->tree->height++; if (mc->tree->flags & MDBX_INTEGERKEY) { assert(key->iov_len >= mc->clc->k.lmin && key->iov_len <= mc->clc->k.lmax); mc->clc->k.lmin = mc->clc->k.lmax = key->iov_len; } if (mc->tree->flags & (MDBX_INTEGERDUP | MDBX_DUPFIXED)) { assert(data->iov_len >= mc->clc->v.lmin && data->iov_len <= mc->clc->v.lmax); assert(mc->subcur != nullptr); mc->tree->dupfix_size = /* mc->subcur->nested_tree.dupfix_size = */ (unsigned)(mc->clc->v.lmin = mc->clc->v.lmax = data->iov_len); cASSERT(mc, mc->clc->v.lmin == mc->subcur->cursor.clc->k.lmin); cASSERT(mc, mc->clc->v.lmax == mc->subcur->cursor.clc->k.lmax); if (mc->flags & z_inner) npr.page->flags |= P_DUPFIX; } } MDBX_val old_singledup, old_data; tree_t nested_dupdb; page_t *sub_root = nullptr; bool insert_key, insert_data; uint16_t fp_flags = P_LEAF; page_t *fp = env->page_auxbuf; fp->txnid = mc->txn->front_txnid; insert_key = insert_data = (rc != MDBX_SUCCESS); old_singledup.iov_base = nullptr; old_singledup.iov_len = 0; if (insert_key) { /* The key does not exist */ DEBUG("inserting key at index %i", mc->ki[mc->top]); if ((mc->tree->flags & MDBX_DUPSORT) && node_size(key, data) > env->leaf_nodemax) { /* Too big for a node, insert in sub-DB. Set up an empty * "old sub-page" for convert_to_subtree to expand to a full page. */ fp->dupfix_ksize = (mc->tree->flags & MDBX_DUPFIXED) ? (uint16_t)data->iov_len : 0; fp->lower = fp->upper = 0; old_data.iov_len = PAGEHDRSZ; goto convert_to_subtree; } } else { /* there's only a key anyway, so this is a no-op */ if (is_dupfix_leaf(mc->pg[mc->top])) { size_t ksize = mc->tree->dupfix_size; if (unlikely(key->iov_len != ksize)) return MDBX_BAD_VALSIZE; void *ptr = page_dupfix_ptr(mc->pg[mc->top], mc->ki[mc->top], ksize); memcpy(ptr, key->iov_base, ksize); fix_parent: /* if overwriting slot 0 of leaf, need to * update branch key if there is a parent page */ if (mc->top && !mc->ki[mc->top]) { size_t dtop = 1; mc->top--; /* slot 0 is always an empty key, find real slot */ while (mc->top && !mc->ki[mc->top]) { mc->top--; dtop++; } err = MDBX_SUCCESS; if (mc->ki[mc->top]) err = tree_propagate_key(mc, key); cASSERT(mc, mc->top + dtop < UINT16_MAX); mc->top += (uint8_t)dtop; if (unlikely(err != MDBX_SUCCESS)) return err; } if (AUDIT_ENABLED()) { err = cursor_check(mc); if (unlikely(err != MDBX_SUCCESS)) return err; } return MDBX_SUCCESS; } more: if (AUDIT_ENABLED()) { err = cursor_check(mc); if (unlikely(err != MDBX_SUCCESS)) return err; } node_t *const node = page_node(mc->pg[mc->top], mc->ki[mc->top]); /* Large/Overflow page overwrites need special handling */ if (unlikely(node_flags(node) & N_BIG)) { const size_t dpages = (node_size(key, data) > env->leaf_nodemax) ? largechunk_npages(env, data->iov_len) : 0; const pgno_t pgno = node_largedata_pgno(node); pgr_t lp = page_get_large(mc, pgno, mc->pg[mc->top]->txnid); if (unlikely(lp.err != MDBX_SUCCESS)) return lp.err; cASSERT(mc, page_type(lp.page) == P_LARGE); /* Is the ov page from this txn (or a parent) and big enough? */ const size_t ovpages = lp.page->pages; const size_t extra_threshold = (mc->tree == &mc->txn->dbs[FREE_DBI]) ? 1 : /* LY: add configurable threshold to keep reserve space */ 0; if (!is_frozen(mc->txn, lp.page) && ovpages >= dpages && ovpages <= dpages + extra_threshold) { /* yes, overwrite it. */ if (!is_modifable(mc->txn, lp.page)) { if (is_spilled(mc->txn, lp.page)) { lp = /* TODO: avoid search and get txn & spill-index from page_result */ page_unspill(mc->txn, lp.page); if (unlikely(lp.err)) return lp.err; } else { if (unlikely(!mc->txn->parent)) { ERROR("Unexpected not frozen/modifiable/spilled but shadowed %s " "page %" PRIaPGNO " mod-txnid %" PRIaTXN "," " without parent transaction, current txn %" PRIaTXN " front %" PRIaTXN, "large/overflow", pgno, lp.page->txnid, mc->txn->txnid, mc->txn->front_txnid); return MDBX_PROBLEM; } /* It is writable only in a parent txn */ page_t *np = page_shadow_alloc(mc->txn, ovpages); if (unlikely(!np)) return MDBX_ENOMEM; memcpy(np, lp.page, PAGEHDRSZ); /* Copy header of page */ err = page_dirty(mc->txn, lp.page = np, ovpages); if (unlikely(err != MDBX_SUCCESS)) return err; #if MDBX_ENABLE_PGOP_STAT mc->txn->env->lck->pgops.clone.weak += ovpages; #endif /* MDBX_ENABLE_PGOP_STAT */ cASSERT(mc, dpl_check(mc->txn)); } } node_set_ds(node, data->iov_len); if (flags & MDBX_RESERVE) data->iov_base = page_data(lp.page); else memcpy(page_data(lp.page), data->iov_base, data->iov_len); if (AUDIT_ENABLED()) { err = cursor_check(mc); if (unlikely(err != MDBX_SUCCESS)) return err; } return MDBX_SUCCESS; } if ((err = page_retire(mc, lp.page)) != MDBX_SUCCESS) return err; } else { old_data.iov_len = node_ds(node); old_data.iov_base = node_data(node); cASSERT(mc, ptr_disp(old_data.iov_base, old_data.iov_len) <= ptr_disp(mc->pg[mc->top], env->ps)); /* DB has dups? */ if (mc->tree->flags & MDBX_DUPSORT) { /* Prepare (sub-)page/sub-DB to accept the new item, if needed. * fp: old sub-page or a header faking it. * mp: new (sub-)page. * xdata: node data with new sub-page or sub-DB. */ size_t growth = 0; /* growth in page size.*/ page_t *mp = fp = xdata.iov_base = env->page_auxbuf; mp->pgno = mc->pg[mc->top]->pgno; /* Was a single item before, must convert now */ if (!(node_flags(node) & N_DUP)) { /* does data match? */ if (flags & MDBX_APPENDDUP) { const int cmp = mc->clc->v.cmp(data, &old_data); cASSERT(mc, cmp != 0 || eq_fast(data, &old_data)); if (unlikely(cmp <= 0)) return MDBX_EKEYMISMATCH; } else if (eq_fast(data, &old_data)) { cASSERT(mc, mc->clc->v.cmp(data, &old_data) == 0); if (flags & MDBX_NODUPDATA) return MDBX_KEYEXIST; /* data is match exactly byte-to-byte, nothing to update */ rc = MDBX_SUCCESS; if (unlikely(batch_dupfix_done)) goto batch_dupfix_continue; return rc; } /* Just overwrite the current item */ if (flags & MDBX_CURRENT) { cASSERT(mc, node_size(key, data) <= env->leaf_nodemax); goto current; } /* Back up original data item */ memcpy(old_singledup.iov_base = fp + 1, old_data.iov_base, old_singledup.iov_len = old_data.iov_len); /* Make sub-page header for the dup items, with dummy body */ fp->flags = P_LEAF | P_SUBP; fp->lower = 0; xdata.iov_len = PAGEHDRSZ + old_data.iov_len + data->iov_len; if (mc->tree->flags & MDBX_DUPFIXED) { fp->flags |= P_DUPFIX; fp->dupfix_ksize = (uint16_t)data->iov_len; /* Будем создавать DUPFIX-страницу, как минимум с двумя элементами. * При коротких значениях и наличии свободного места можно сделать * некоторое резервирование места, чтобы при последующих добавлениях * не сразу расширять созданную под-страницу. * Резервирование в целом сомнительно (см ниже), но может сработать * в плюс (а если в минус то несущественный) при коротких ключах. */ xdata.iov_len += page_subleaf2_reserve( env, page_room(mc->pg[mc->top]) + old_data.iov_len, xdata.iov_len, data->iov_len); cASSERT(mc, (xdata.iov_len & 1) == 0); } else { xdata.iov_len += 2 * (sizeof(indx_t) + NODESIZE) + (old_data.iov_len & 1) + (data->iov_len & 1); } cASSERT(mc, (xdata.iov_len & 1) == 0); fp->upper = (uint16_t)(xdata.iov_len - PAGEHDRSZ); old_data.iov_len = xdata.iov_len; /* pretend olddata is fp */ } else if (node_flags(node) & N_TREE) { /* Data is on sub-DB, just store it */ flags |= N_DUP | N_TREE; goto dupsort_put; } else { /* Data is on sub-page */ fp = old_data.iov_base; switch (flags) { default: growth = is_dupfix_leaf(fp) ? fp->dupfix_ksize : (node_size(data, nullptr) + sizeof(indx_t)); if (page_room(fp) >= growth) { /* На текущей под-странице есть место для добавления элемента. * Оптимальнее продолжить использовать эту страницу, ибо * добавление вложенного дерева увеличит WAF на одну страницу. */ goto continue_subpage; } /* На текущей под-странице нет места для еще одного элемента. * Можно либо увеличить эту под-страницу, либо вынести куст * значений во вложенное дерево. * * Продолжать использовать текущую под-страницу возможно * только пока и если размер после добавления элемента будет * меньше leaf_nodemax. Соответственно, при превышении * просто сразу переходим на вложенное дерево. */ xdata.iov_len = old_data.iov_len + (growth += growth & 1); if (xdata.iov_len > env->subpage_limit) goto convert_to_subtree; /* Можно либо увеличить под-страницу, в том числе с некоторым * запасом, либо перейти на вложенное поддерево. * * Резервирование места на под-странице представляется сомнительным: * - Резервирование увеличит рыхлость страниц, в том числе * вероятность разделения основной/гнездовой страницы; * - Сложно предсказать полезный размер резервирования, * особенно для не-MDBX_DUPFIXED; * - Наличие резерва позволяет съекономить только на перемещении * части элементов основной/гнездовой страницы при последующих * добавлениях в нее элементов. Причем после первого изменения * размера под-страницы, её тело будет примыкать * к неиспользуемому месту на основной/гнездовой странице, * поэтому последующие последовательные добавления потребуют * только передвижения в entries[]. * * Соответственно, более важным/определяющим представляется * своевременный переход к вложеному дереву, но тут достаточно * сложный конфликт интересов: * - При склонности к переходу к вложенным деревьям, суммарно * в БД будет большее кол-во более рыхлых страниц. Это увеличит * WAF, а также RAF при последовательных чтениях большой БД. * Однако, при коротких ключах и большом кол-ве * дубликатов/мультизначений, плотность ключей в листовых * страницах основного дерева будет выше. Соответственно, будет * пропорционально меньше branch-страниц. Поэтому будет выше * вероятность оседания/не-вымывания страниц основного дерева из * LRU-кэша, а также попадания в write-back кэш при записи. * - Наоботот, при склонности к использованию под-страниц, будут * наблюдаться обратные эффекты. Плюс некоторые накладные расходы * на лишнее копирование данных под-страниц в сценариях * нескольких обонвлений дубликатов одного куста в одной * транзакции. * * Суммарно наиболее рациональным представляется такая тактика: * - Вводим три порога subpage_limit, subpage_room_threshold * и subpage_reserve_prereq, которые могут быть * заданы/скорректированы пользователем в ‰ от leaf_nodemax; * - Используем под-страницу пока её размер меньше subpage_limit * и на основной/гнездовой странице не-менее * subpage_room_threshold свободного места; * - Резервируем место только для 1-3 коротких dupfix-элементов, * расширяя размер под-страницы на размер кэш-линии ЦПУ, но * только если на странице не менее subpage_reserve_prereq * свободного места. * - По-умолчанию устанавливаем: * subpage_limit = leaf_nodemax (1000‰); * subpage_room_threshold = 0; * subpage_reserve_prereq = leaf_nodemax (1000‰). */ if (is_dupfix_leaf(fp)) growth += page_subleaf2_reserve( env, page_room(mc->pg[mc->top]) + old_data.iov_len, xdata.iov_len, data->iov_len); else { /* TODO: Если добавить возможность для пользователя задавать * min/max размеров ключей/данных, то здесь разумно реализовать * тактику резервирования подобную dupfixed. */ } break; case MDBX_CURRENT | MDBX_NODUPDATA: case MDBX_CURRENT: continue_subpage: fp->txnid = mc->txn->front_txnid; fp->pgno = mp->pgno; mc->subcur->cursor.pg[0] = fp; flags |= N_DUP; goto dupsort_put; } xdata.iov_len = old_data.iov_len + growth; cASSERT(mc, (xdata.iov_len & 1) == 0); } fp_flags = fp->flags; if (xdata.iov_len > env->subpage_limit || node_size_len(node_ks(node), xdata.iov_len) > env->leaf_nodemax || (env->subpage_room_threshold && page_room(mc->pg[mc->top]) + node_size_len(node_ks(node), old_data.iov_len) < env->subpage_room_threshold + node_size_len(node_ks(node), xdata.iov_len))) { /* Too big for a sub-page, convert to sub-DB */ convert_to_subtree: fp_flags &= ~P_SUBP; nested_dupdb.dupfix_size = 0; nested_dupdb.flags = flags_db2sub(mc->tree->flags); if (mc->tree->flags & MDBX_DUPFIXED) { fp_flags |= P_DUPFIX; nested_dupdb.dupfix_size = fp->dupfix_ksize; } nested_dupdb.height = 1; nested_dupdb.branch_pages = 0; nested_dupdb.leaf_pages = 1; nested_dupdb.large_pages = 0; nested_dupdb.items = page_numkeys(fp); xdata.iov_len = sizeof(nested_dupdb); xdata.iov_base = &nested_dupdb; const pgr_t par = gc_alloc_single(mc); mp = par.page; if (unlikely(par.err != MDBX_SUCCESS)) return par.err; mc->tree->leaf_pages += 1; cASSERT(mc, env->ps > old_data.iov_len); growth = env->ps - (unsigned)old_data.iov_len; cASSERT(mc, (growth & 1) == 0); flags |= N_DUP | N_TREE; nested_dupdb.root = mp->pgno; nested_dupdb.sequence = 0; nested_dupdb.mod_txnid = mc->txn->txnid; sub_root = mp; } if (mp != fp) { mp->flags = fp_flags; mp->txnid = mc->txn->front_txnid; mp->dupfix_ksize = fp->dupfix_ksize; mp->lower = fp->lower; cASSERT(mc, fp->upper + growth < UINT16_MAX); mp->upper = fp->upper + (indx_t)growth; if (unlikely(fp_flags & P_DUPFIX)) { memcpy(page_data(mp), page_data(fp), page_numkeys(fp) * fp->dupfix_ksize); cASSERT(mc, (((mp->dupfix_ksize & page_numkeys(mp)) ^ mp->upper) & 1) == 0); } else { cASSERT(mc, (mp->upper & 1) == 0); memcpy(ptr_disp(mp, mp->upper + PAGEHDRSZ), ptr_disp(fp, fp->upper + PAGEHDRSZ), old_data.iov_len - fp->upper - PAGEHDRSZ); memcpy(mp->entries, fp->entries, page_numkeys(fp) * sizeof(mp->entries[0])); for (size_t i = 0; i < page_numkeys(fp); i++) { cASSERT(mc, mp->entries[i] + growth <= UINT16_MAX); mp->entries[i] += (indx_t)growth; } } } if (!insert_key) node_del(mc, 0); ref_data = &xdata; flags |= N_DUP; goto insert_node; } /* MDBX passes N_TREE in 'flags' to write a DB record */ if (unlikely((node_flags(node) ^ flags) & N_TREE)) return MDBX_INCOMPATIBLE; current: if (data->iov_len == old_data.iov_len) { cASSERT(mc, EVEN_CEIL(key->iov_len) == EVEN_CEIL(node_ks(node))); /* same size, just replace it. Note that we could * also reuse this node if the new data is smaller, * but instead we opt to shrink the node in that case. */ if (flags & MDBX_RESERVE) data->iov_base = old_data.iov_base; else if (!(mc->flags & z_inner)) memcpy(old_data.iov_base, data->iov_base, data->iov_len); else { cASSERT(mc, page_numkeys(mc->pg[mc->top]) == 1); cASSERT(mc, page_type_compat(mc->pg[mc->top]) == P_LEAF); cASSERT(mc, node_ds(node) == 0); cASSERT(mc, node_flags(node) == 0); cASSERT(mc, key->iov_len < UINT16_MAX); node_set_ks(node, key->iov_len); memcpy(node_key(node), key->iov_base, key->iov_len); cASSERT(mc, ptr_disp(node_key(node), node_ds(node)) < ptr_disp(mc->pg[mc->top], env->ps)); goto fix_parent; } if (AUDIT_ENABLED()) { err = cursor_check(mc); if (unlikely(err != MDBX_SUCCESS)) return err; } return MDBX_SUCCESS; } } node_del(mc, 0); } ref_data = data; insert_node:; const unsigned naf = flags & NODE_ADD_FLAGS; size_t nsize = is_dupfix_leaf(mc->pg[mc->top]) ? key->iov_len : leaf_size(env, key, ref_data); if (page_room(mc->pg[mc->top]) < nsize) { rc = page_split(mc, key, ref_data, P_INVALID, insert_key ? naf : naf | MDBX_SPLIT_REPLACE); if (rc == MDBX_SUCCESS && AUDIT_ENABLED()) rc = insert_key ? cursor_check(mc) : cursor_check_updating(mc); } else { /* There is room already in this leaf page. */ if (is_dupfix_leaf(mc->pg[mc->top])) { cASSERT(mc, !(naf & (N_BIG | N_TREE | N_DUP)) && ref_data->iov_len == 0); rc = node_add_dupfix(mc, mc->ki[mc->top], key); } else rc = node_add_leaf(mc, mc->ki[mc->top], key, ref_data, naf); if (likely(rc == 0)) { /* Adjust other cursors pointing to mp */ page_t *const mp = mc->pg[mc->top]; const size_t dbi = cursor_dbi(mc); for (MDBX_cursor *m2 = mc->txn->cursors[dbi]; m2; m2 = m2->next) { MDBX_cursor *m3 = (mc->flags & z_inner) ? &m2->subcur->cursor : m2; if (!is_related(mc, m3) || m3->pg[mc->top] != mp) continue; if (m3->ki[mc->top] >= mc->ki[mc->top]) m3->ki[mc->top] += insert_key; if (inner_pointed(m3)) cursor_inner_refresh(m3, mp, m3->ki[mc->top]); } } } if (likely(rc == MDBX_SUCCESS)) { /* Now store the actual data in the child DB. Note that we're * storing the user data in the keys field, so there are strict * size limits on dupdata. The actual data fields of the child * DB are all zero size. */ if (flags & N_DUP) { MDBX_val empty; dupsort_put: empty.iov_len = 0; empty.iov_base = nullptr; node_t *node = page_node(mc->pg[mc->top], mc->ki[mc->top]); #define SHIFT_MDBX_NODUPDATA_TO_MDBX_NOOVERWRITE 1 STATIC_ASSERT( (MDBX_NODUPDATA >> SHIFT_MDBX_NODUPDATA_TO_MDBX_NOOVERWRITE) == MDBX_NOOVERWRITE); unsigned inner_flags = MDBX_CURRENT | ((flags & MDBX_NODUPDATA) >> SHIFT_MDBX_NODUPDATA_TO_MDBX_NOOVERWRITE); if ((flags & MDBX_CURRENT) == 0) { inner_flags -= MDBX_CURRENT; rc = cursor_dupsort_setup(mc, node, mc->pg[mc->top]); if (unlikely(rc != MDBX_SUCCESS)) goto dupsort_error; } subcur_t *const mx = mc->subcur; if (sub_root) { cASSERT(mc, mx->nested_tree.height == 1 && mx->nested_tree.root == sub_root->pgno); mx->cursor.flags = z_inner; mx->cursor.top = 0; mx->cursor.pg[0] = sub_root; mx->cursor.ki[0] = 0; } if (old_singledup.iov_base) { /* converted, write the original data first */ if (is_dupfix_leaf(mx->cursor.pg[0])) rc = node_add_dupfix(&mx->cursor, 0, &old_singledup); else rc = node_add_leaf(&mx->cursor, 0, &old_singledup, &empty, 0); if (unlikely(rc != MDBX_SUCCESS)) goto dupsort_error; mx->cursor.tree->items = 1; } if (!(node_flags(node) & N_TREE) || sub_root) { page_t *const mp = mc->pg[mc->top]; const intptr_t nkeys = page_numkeys(mp); const size_t dbi = cursor_dbi(mc); for (MDBX_cursor *m2 = mc->txn->cursors[dbi]; m2; m2 = m2->next) { if (!is_related(mc, m2) || m2->pg[mc->top] != mp) continue; if (/* пропускаем незаполненные курсоры, иначе получится что у такого курсора будет инициализирован вложенный, что антилогично и бесполезно. */ is_filled(m2) && m2->ki[mc->top] == mc->ki[mc->top]) { cASSERT(m2, m2->subcur->cursor.clc == mx->cursor.clc); m2->subcur->nested_tree = mx->nested_tree; m2->subcur->cursor.pg[0] = mx->cursor.pg[0]; if (old_singledup.iov_base) { m2->subcur->cursor.top_and_flags = z_inner; m2->subcur->cursor.ki[0] = 0; } DEBUG("Sub-dbi -%zu root page %" PRIaPGNO, cursor_dbi(&m2->subcur->cursor), m2->subcur->nested_tree.root); } else if (!insert_key && m2->ki[mc->top] < nkeys) cursor_inner_refresh(m2, mp, m2->ki[mc->top]); } } cASSERT(mc, mc->subcur->nested_tree.items < PTRDIFF_MAX); const size_t probe = (size_t)mc->subcur->nested_tree.items; #define SHIFT_MDBX_APPENDDUP_TO_MDBX_APPEND 1 STATIC_ASSERT((MDBX_APPENDDUP >> SHIFT_MDBX_APPENDDUP_TO_MDBX_APPEND) == MDBX_APPEND); inner_flags |= (flags & MDBX_APPENDDUP) >> SHIFT_MDBX_APPENDDUP_TO_MDBX_APPEND; rc = cursor_put(&mc->subcur->cursor, data, &empty, inner_flags); if (flags & N_TREE) { void *db = node_data(node); mc->subcur->nested_tree.mod_txnid = mc->txn->txnid; memcpy(db, &mc->subcur->nested_tree, sizeof(tree_t)); } insert_data = (probe != (size_t)mc->subcur->nested_tree.items); } /* Increment count unless we just replaced an existing item. */ if (insert_data) mc->tree->items++; if (insert_key) { if (unlikely(rc != MDBX_SUCCESS)) goto dupsort_error; /* If we succeeded and the key didn't exist before, * make sure the cursor is marked valid. */ be_filled(mc); } if (likely(rc == MDBX_SUCCESS)) { cASSERT(mc, is_filled(mc)); if (unlikely(batch_dupfix_done)) { batch_dupfix_continue: /* let caller know how many succeeded, if any */ if ((*batch_dupfix_done += 1) < batch_dupfix_given) { data[0].iov_base = ptr_disp(data[0].iov_base, data[0].iov_len); insert_key = insert_data = false; old_singledup.iov_base = nullptr; goto more; } } if (AUDIT_ENABLED()) rc = cursor_check(mc); } return rc; dupsort_error: if (unlikely(rc == MDBX_KEYEXIST)) { /* should not happen, we deleted that item */ ERROR("Unexpected %i error while put to nested dupsort's hive", rc); rc = MDBX_PROBLEM; } } mc->txn->flags |= MDBX_TXN_ERROR; return rc; } __hot int cursor_put_checklen(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data, unsigned flags) { cASSERT(mc, (mc->flags & z_inner) == 0); if (unlikely(key->iov_len > mc->clc->k.lmax || key->iov_len < mc->clc->k.lmin)) { cASSERT(mc, !"Invalid key-size"); return MDBX_BAD_VALSIZE; } if (unlikely(data->iov_len > mc->clc->v.lmax || data->iov_len < mc->clc->v.lmin)) { cASSERT(mc, !"Invalid data-size"); return MDBX_BAD_VALSIZE; } uint64_t aligned_keybytes, aligned_databytes; MDBX_val aligned_key, aligned_data; if (mc->tree->flags & MDBX_INTEGERKEY) { if (key->iov_len == 8) { if (unlikely(7 & (uintptr_t)key->iov_base)) { /* copy instead of return error to avoid break compatibility */ aligned_key.iov_base = bcopy_8(&aligned_keybytes, key->iov_base); aligned_key.iov_len = key->iov_len; key = &aligned_key; } } else if (key->iov_len == 4) { if (unlikely(3 & (uintptr_t)key->iov_base)) { /* copy instead of return error to avoid break compatibility */ aligned_key.iov_base = bcopy_4(&aligned_keybytes, key->iov_base); aligned_key.iov_len = key->iov_len; key = &aligned_key; } } else { cASSERT(mc, !"key-size is invalid for MDBX_INTEGERKEY"); return MDBX_BAD_VALSIZE; } } if (mc->tree->flags & MDBX_INTEGERDUP) { if (data->iov_len == 8) { if (unlikely(7 & (uintptr_t)data->iov_base)) { if (unlikely(flags & MDBX_MULTIPLE)) { /* LY: использование alignof(uint64_t) тут не подходил из-за ошибок * MSVC и некоторых других компиляторов, когда для элементов * массивов/векторов обеспечивает выравнивание только на 4-х байтовых * границу и одновременно alignof(uint64_t) == 8. */ if (MDBX_WORDBITS > 32 || (3 & (uintptr_t)data->iov_base) != 0) return MDBX_BAD_VALSIZE; } else { /* copy instead of return error to avoid break compatibility */ aligned_data.iov_base = bcopy_8(&aligned_databytes, data->iov_base); aligned_data.iov_len = data->iov_len; data = &aligned_data; } } } else if (data->iov_len == 4) { if (unlikely(3 & (uintptr_t)data->iov_base)) { if (unlikely(flags & MDBX_MULTIPLE)) return MDBX_BAD_VALSIZE; /* copy instead of return error to avoid break compatibility */ aligned_data.iov_base = bcopy_4(&aligned_databytes, data->iov_base); aligned_data.iov_len = data->iov_len; data = &aligned_data; } } else { cASSERT(mc, !"data-size is invalid for MDBX_INTEGERKEY"); return MDBX_BAD_VALSIZE; } } return cursor_put(mc, key, data, flags); } __hot int cursor_del(MDBX_cursor *mc, unsigned flags) { if (unlikely(!is_filled(mc))) return MDBX_ENODATA; int rc = cursor_touch(mc, nullptr, nullptr); if (unlikely(rc != MDBX_SUCCESS)) return rc; page_t *mp = mc->pg[mc->top]; cASSERT(mc, is_modifable(mc->txn, mp)); if (!MDBX_DISABLE_VALIDATION && unlikely(!check_leaf_type(mc, mp))) { ERROR("unexpected leaf-page #%" PRIaPGNO " type 0x%x seen by cursor", mp->pgno, mp->flags); return MDBX_CORRUPTED; } if (is_dupfix_leaf(mp)) goto del_key; node_t *node = page_node(mp, mc->ki[mc->top]); if (node_flags(node) & N_DUP) { if (flags & (MDBX_ALLDUPS | /* for compatibility */ MDBX_NODUPDATA)) { /* will subtract the final entry later */ mc->tree->items -= mc->subcur->nested_tree.items - 1; } else { if (!(node_flags(node) & N_TREE)) { page_t *sp = node_data(node); cASSERT(mc, is_subpage(sp)); sp->txnid = mp->txnid; mc->subcur->cursor.pg[0] = sp; } rc = cursor_del(&mc->subcur->cursor, 0); if (unlikely(rc != MDBX_SUCCESS)) return rc; /* If sub-DB still has entries, we're done */ if (mc->subcur->nested_tree.items) { if (node_flags(node) & N_TREE) { /* update table info */ mc->subcur->nested_tree.mod_txnid = mc->txn->txnid; memcpy(node_data(node), &mc->subcur->nested_tree, sizeof(tree_t)); } else { /* shrink sub-page */ node = node_shrink(mp, mc->ki[mc->top], node); mc->subcur->cursor.pg[0] = node_data(node); /* fix other sub-DB cursors pointed at sub-pages on this page */ for (MDBX_cursor *m2 = mc->txn->cursors[cursor_dbi(mc)]; m2; m2 = m2->next) { if (!is_related(mc, m2) || m2->pg[mc->top] != mp) continue; const node_t *inner = node; if (unlikely(m2->ki[mc->top] >= page_numkeys(mp))) { m2->flags = z_poor_mark; m2->subcur->nested_tree.root = 0; m2->subcur->cursor.top_and_flags = z_inner | z_poor_mark; continue; } if (m2->ki[mc->top] != mc->ki[mc->top]) { inner = page_node(mp, m2->ki[mc->top]); if (node_flags(inner) & N_TREE) continue; } m2->subcur->cursor.pg[0] = node_data(inner); } } mc->tree->items -= 1; cASSERT(mc, mc->tree->items > 0 && mc->tree->height > 0 && mc->tree->root != P_INVALID); return rc; } /* otherwise fall thru and delete the sub-DB */ } if ((node_flags(node) & N_TREE) && mc->subcur->cursor.tree->height) { /* add all the child DB's pages to the free list */ rc = tree_drop(&mc->subcur->cursor, false); if (unlikely(rc != MDBX_SUCCESS)) goto fail; } inner_gone(mc); } else { cASSERT(mc, !inner_pointed(mc)); /* MDBX passes N_TREE in 'flags' to delete a DB record */ if (unlikely((node_flags(node) ^ flags) & N_TREE)) return MDBX_INCOMPATIBLE; } /* add large/overflow pages to free list */ if (node_flags(node) & N_BIG) { pgr_t lp = page_get_large(mc, node_largedata_pgno(node), mp->txnid); if (unlikely((rc = lp.err) || (rc = page_retire(mc, lp.page)))) goto fail; } del_key: mc->tree->items -= 1; const MDBX_dbi dbi = cursor_dbi(mc); indx_t ki = mc->ki[mc->top]; mp = mc->pg[mc->top]; cASSERT(mc, is_leaf(mp)); node_del(mc, mc->tree->dupfix_size); /* Adjust other cursors pointing to mp */ for (MDBX_cursor *m2 = mc->txn->cursors[dbi]; m2; m2 = m2->next) { MDBX_cursor *m3 = (mc->flags & z_inner) ? &m2->subcur->cursor : m2; if (!is_related(mc, m3) || m3->pg[mc->top] != mp) continue; if (m3->ki[mc->top] == ki) { m3->flags |= z_after_delete; inner_gone(m3); } else { m3->ki[mc->top] -= m3->ki[mc->top] > ki; if (inner_pointed(m3)) cursor_inner_refresh(m3, m3->pg[mc->top], m3->ki[mc->top]); } } rc = tree_rebalance(mc); if (unlikely(rc != MDBX_SUCCESS)) goto fail; mc->flags |= z_after_delete; inner_gone(mc); if (unlikely(mc->top < 0)) { /* DB is totally empty now, just bail out. * Other cursors adjustments were already done * by rebalance and aren't needed here. */ cASSERT(mc, mc->tree->items == 0 && (mc->tree->root == P_INVALID || (is_inner(mc) && !mc->tree->root)) && mc->flags < 0); return MDBX_SUCCESS; } ki = mc->ki[mc->top]; mp = mc->pg[mc->top]; cASSERT(mc, is_leaf(mc->pg[mc->top])); size_t nkeys = page_numkeys(mp); cASSERT(mc, (mc->tree->items > 0 && nkeys > 0) || ((mc->flags & z_inner) && mc->tree->items == 0 && nkeys == 0)); /* Adjust this and other cursors pointing to mp */ const intptr_t top = /* может быть сброшен в -1 */ mc->top; for (MDBX_cursor *m2 = mc->txn->cursors[dbi]; m2; m2 = m2->next) { MDBX_cursor *m3 = (mc->flags & z_inner) ? &m2->subcur->cursor : m2; if (top > m3->top || m3->pg[top] != mp) continue; /* if m3 points past last node in page, find next sibling */ if (m3->ki[top] >= nkeys) { rc = cursor_sibling_right(m3); if (rc == MDBX_NOTFOUND) { rc = MDBX_SUCCESS; continue; } if (unlikely(rc != MDBX_SUCCESS)) goto fail; } if (/* пропускаем незаполненные курсоры, иначе получится что у такого курсора будет инициализирован вложенный, что антилогично и бесполезно. */ is_filled(m3) && m3->subcur && (m3->ki[top] >= ki || /* уже переместились вправо */ m3->pg[top] != mp)) { node = page_node(m3->pg[m3->top], m3->ki[m3->top]); /* Если это dupsort-узел, то должен быть валидный вложенный курсор. */ if (node_flags(node) & N_DUP) { /* Тут три варианта событий: * 1) Вложенный курсор уже инициализирован, у узла есть флаг N_TREE, * соответственно дубликаты вынесены в отдельное дерево с корнем * в отдельной странице = ничего корректировать не требуется. * 2) Вложенный курсор уже инициализирован, у узла нет флага N_TREE, * соответственно дубликаты размещены на вложенной sub-странице. * 3) Курсор стоял на удалённом элементе, который имел одно значение, * а после удаления переместился на следующий элемент с дубликатами. * В этом случае вложенный курсор не инициализирован и тепеь его * нужно установить на первый дубликат. */ if (is_pointed(&m3->subcur->cursor)) { if ((node_flags(node) & N_TREE) == 0) { cASSERT(m3, m3->subcur->cursor.top == 0 && m3->subcur->nested_tree.height == 1); m3->subcur->cursor.pg[0] = node_data(node); } } else { rc = cursor_dupsort_setup(m3, node, m3->pg[m3->top]); if (unlikely(rc != MDBX_SUCCESS)) goto fail; if (node_flags(node) & N_TREE) { rc = inner_first(&m3->subcur->cursor, nullptr); if (unlikely(rc != MDBX_SUCCESS)) goto fail; } } } else inner_gone(m3); } } cASSERT(mc, rc == MDBX_SUCCESS); if (AUDIT_ENABLED()) rc = cursor_check(mc); return rc; fail: mc->txn->flags |= MDBX_TXN_ERROR; return rc; } /*----------------------------------------------------------------------------*/ __hot csr_t cursor_seek(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, MDBX_cursor_op op) { DKBUF_DEBUG; csr_t ret; ret.exact = false; if (unlikely(key->iov_len < mc->clc->k.lmin || key->iov_len > mc->clc->k.lmax)) { cASSERT(mc, !"Invalid key-size"); ret.err = MDBX_BAD_VALSIZE; return ret; } MDBX_val aligned_key = *key; uint64_t aligned_key_buf; if (mc->tree->flags & MDBX_INTEGERKEY) { if (aligned_key.iov_len == 8) { if (unlikely(7 & (uintptr_t)aligned_key.iov_base)) /* copy instead of return error to avoid break compatibility */ aligned_key.iov_base = bcopy_8(&aligned_key_buf, aligned_key.iov_base); } else if (aligned_key.iov_len == 4) { if (unlikely(3 & (uintptr_t)aligned_key.iov_base)) /* copy instead of return error to avoid break compatibility */ aligned_key.iov_base = bcopy_4(&aligned_key_buf, aligned_key.iov_base); } else { cASSERT(mc, !"key-size is invalid for MDBX_INTEGERKEY"); ret.err = MDBX_BAD_VALSIZE; return ret; } } page_t *mp; node_t *node = nullptr; /* See if we're already on the right page */ if (is_pointed(mc)) { mp = mc->pg[mc->top]; cASSERT(mc, is_leaf(mp)); const size_t nkeys = page_numkeys(mp); if (unlikely(nkeys == 0)) { /* при создании первой листовой страницы */ cASSERT(mc, mc->top == 0 && mc->tree->height == 1 && mc->tree->branch_pages == 0 && mc->tree->leaf_pages == 1 && mc->ki[0] == 0); /* Логически верно, но нет смысла, ибо это мимолетная/временная * ситуация до добавления элемента выше по стеку вызовов: mc->flags |= z_eof_soft | z_hollow; */ ret.err = MDBX_NOTFOUND; return ret; } MDBX_val nodekey; if (is_dupfix_leaf(mp)) nodekey = page_dupfix_key(mp, 0, mc->tree->dupfix_size); else { node = page_node(mp, 0); nodekey = get_key(node); inner_gone(mc); } int cmp = mc->clc->k.cmp(&aligned_key, &nodekey); if (unlikely(cmp == 0)) { /* Probably happens rarely, but first node on the page * was the one we wanted. */ mc->ki[mc->top] = 0; ret.exact = true; goto got_node; } if (cmp > 0) { /* Искомый ключ больше первого на этой странице, * целевая позиция на этой странице либо правее (ближе к концу). */ if (likely(nkeys > 1)) { if (is_dupfix_leaf(mp)) { nodekey.iov_base = page_dupfix_ptr(mp, nkeys - 1, nodekey.iov_len); } else { node = page_node(mp, nkeys - 1); nodekey = get_key(node); } cmp = mc->clc->k.cmp(&aligned_key, &nodekey); if (cmp == 0) { /* last node was the one we wanted */ mc->ki[mc->top] = (indx_t)(nkeys - 1); ret.exact = true; goto got_node; } if (cmp < 0) { /* Искомый ключ между первым и последним на этой страницы, * поэтому пропускаем поиск по дереву и продолжаем только на текущей * странице. */ /* Сравниваем с текущей позицией, ибо частным сценарием является такое * совпадение, но не делаем проверку если текущая позиция является * первой/последний и соответственно такое сравнение было выше. */ if (mc->ki[mc->top] > 0 && mc->ki[mc->top] < nkeys - 1) { if (is_dupfix_leaf(mp)) { nodekey.iov_base = page_dupfix_ptr(mp, mc->ki[mc->top], nodekey.iov_len); } else { node = page_node(mp, mc->ki[mc->top]); nodekey = get_key(node); } cmp = mc->clc->k.cmp(&aligned_key, &nodekey); if (cmp == 0) { /* current node was the one we wanted */ ret.exact = true; goto got_node; } } goto search_node; } } /* Если в стеке курсора есть страницы справа, то продолжим искать там. */ cASSERT(mc, mc->tree->height > mc->top); for (intptr_t i = 0; i < mc->top; i++) if ((size_t)mc->ki[i] + 1 < page_numkeys(mc->pg[i])) goto continue_other_pages; /* Ключ больше последнего. */ mc->ki[mc->top] = (indx_t)nkeys; if (op < MDBX_SET_RANGE) { target_not_found: cASSERT(mc, op == MDBX_SET || op == MDBX_SET_KEY || op == MDBX_GET_BOTH || op == MDBX_GET_BOTH_RANGE); /* Операция предполагает поиск конкретного ключа, который не найден. * Поэтому переводим курсор в неустановленное состояние, но без сброса * top, что позволяет работать fastpath при последующем поиске по дереву * страниц. */ mc->flags = z_hollow | (mc->flags & z_clear_mask); inner_gone(mc); ret.err = MDBX_NOTFOUND; return ret; } cASSERT(mc, op == MDBX_SET_RANGE); mc->flags = z_eof_soft | z_eof_hard | (mc->flags & z_clear_mask); ret.err = MDBX_NOTFOUND; return ret; } if (mc->top == 0) { /* There are no other pages */ mc->ki[mc->top] = 0; if (op >= MDBX_SET_RANGE) goto got_node; else goto target_not_found; } } cASSERT(mc, !inner_pointed(mc)); continue_other_pages: ret.err = tree_search(mc, &aligned_key, 0); if (unlikely(ret.err != MDBX_SUCCESS)) return ret; cASSERT(mc, is_pointed(mc) && !inner_pointed(mc)); mp = mc->pg[mc->top]; MDBX_ANALYSIS_ASSUME(mp != nullptr); cASSERT(mc, is_leaf(mp)); search_node: cASSERT(mc, is_pointed(mc) && !inner_pointed(mc)); struct node_search_result nsr = node_search(mc, &aligned_key); node = nsr.node; ret.exact = nsr.exact; if (!ret.exact) { if (op < MDBX_SET_RANGE) goto target_not_found; if (node == nullptr) { DEBUG("%s", "===> inexact leaf not found, goto sibling"); ret.err = cursor_sibling_right(mc); if (unlikely(ret.err != MDBX_SUCCESS)) return ret; /* no entries matched */ mp = mc->pg[mc->top]; cASSERT(mc, is_leaf(mp)); if (!is_dupfix_leaf(mp)) node = page_node(mp, 0); } } got_node: cASSERT(mc, is_pointed(mc) && !inner_pointed(mc)); cASSERT(mc, mc->ki[mc->top] < page_numkeys(mc->pg[mc->top])); if (!MDBX_DISABLE_VALIDATION && unlikely(!check_leaf_type(mc, mp))) { ERROR("unexpected leaf-page #%" PRIaPGNO " type 0x%x seen by cursor", mp->pgno, mp->flags); ret.err = MDBX_CORRUPTED; return ret; } if (is_dupfix_leaf(mp)) { if (op >= MDBX_SET_KEY) *key = page_dupfix_key(mp, mc->ki[mc->top], mc->tree->dupfix_size); be_filled(mc); ret.err = MDBX_SUCCESS; return ret; } if (node_flags(node) & N_DUP) { ret.err = cursor_dupsort_setup(mc, node, mp); if (unlikely(ret.err != MDBX_SUCCESS)) return ret; if (op >= MDBX_SET) { MDBX_ANALYSIS_ASSUME(mc->subcur != nullptr); if (node_flags(node) & N_TREE) { ret.err = inner_first(&mc->subcur->cursor, data); if (unlikely(ret.err != MDBX_SUCCESS)) return ret; } else if (data) { const page_t *inner_mp = mc->subcur->cursor.pg[0]; cASSERT(mc, is_subpage(inner_mp) && is_leaf(inner_mp)); const size_t inner_ki = mc->subcur->cursor.ki[0]; if (is_dupfix_leaf(inner_mp)) *data = page_dupfix_key(inner_mp, inner_ki, mc->tree->dupfix_size); else *data = get_key(page_node(inner_mp, inner_ki)); } } else { MDBX_ANALYSIS_ASSUME(mc->subcur != nullptr); ret = cursor_seek(&mc->subcur->cursor, data, nullptr, MDBX_SET_RANGE); if (unlikely(ret.err != MDBX_SUCCESS)) { if (ret.err == MDBX_NOTFOUND && op < MDBX_SET_RANGE) goto target_not_found; return ret; } if (op == MDBX_GET_BOTH && !ret.exact) goto target_not_found; } } else if (likely(data)) { if (op <= MDBX_GET_BOTH_RANGE) { if (unlikely(data->iov_len < mc->clc->v.lmin || data->iov_len > mc->clc->v.lmax)) { cASSERT(mc, !"Invalid data-size"); ret.err = MDBX_BAD_VALSIZE; return ret; } MDBX_val aligned_data = *data; uint64_t aligned_databytes; if (mc->tree->flags & MDBX_INTEGERDUP) { if (aligned_data.iov_len == 8) { if (unlikely(7 & (uintptr_t)aligned_data.iov_base)) /* copy instead of return error to avoid break compatibility */ aligned_data.iov_base = bcopy_8(&aligned_databytes, aligned_data.iov_base); } else if (aligned_data.iov_len == 4) { if (unlikely(3 & (uintptr_t)aligned_data.iov_base)) /* copy instead of return error to avoid break compatibility */ aligned_data.iov_base = bcopy_4(&aligned_databytes, aligned_data.iov_base); } else { cASSERT(mc, !"data-size is invalid for MDBX_INTEGERDUP"); ret.err = MDBX_BAD_VALSIZE; return ret; } } MDBX_val actual_data; ret.err = node_read(mc, node, &actual_data, mc->pg[mc->top]); if (unlikely(ret.err != MDBX_SUCCESS)) return ret; const int cmp = mc->clc->v.cmp(&aligned_data, &actual_data); if (cmp) { if (op != MDBX_GET_BOTH_RANGE) { cASSERT(mc, op == MDBX_GET_BOTH); goto target_not_found; } if (cmp > 0) { ret.err = MDBX_NOTFOUND; return ret; } } *data = actual_data; } else { ret.err = node_read(mc, node, data, mc->pg[mc->top]); if (unlikely(ret.err != MDBX_SUCCESS)) return ret; } } /* The key already matches in all other cases */ if (op >= MDBX_SET_KEY) get_key_optional(node, key); DEBUG("==> cursor placed on key [%s], data [%s]", DKEY_DEBUG(key), DVAL_DEBUG(data)); ret.err = MDBX_SUCCESS; be_filled(mc); return ret; } __hot int cursor_ops(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, const MDBX_cursor_op op) { if (op != MDBX_GET_CURRENT) DEBUG(">> cursor %p(0x%x), ops %u, key %p, value %p", __Wpedantic_format_voidptr(mc), mc->flags, op, __Wpedantic_format_voidptr(key), __Wpedantic_format_voidptr(data)); int rc; switch (op) { case MDBX_GET_CURRENT: cASSERT(mc, (mc->flags & z_inner) == 0); if (unlikely(!is_filled(mc))) { if (is_hollow(mc)) return MDBX_ENODATA; if (mc->ki[mc->top] >= page_numkeys(mc->pg[mc->top])) return MDBX_NOTFOUND; } if (mc->flags & z_after_delete) return outer_next(mc, key, data, MDBX_NEXT_NODUP); else if (inner_pointed(mc) && (mc->subcur->cursor.flags & z_after_delete)) return outer_next(mc, key, data, MDBX_NEXT_DUP); else { const page_t *mp = mc->pg[mc->top]; const node_t *node = page_node(mp, mc->ki[mc->top]); get_key_optional(node, key); if (!data) return MDBX_SUCCESS; if (node_flags(node) & N_DUP) { if (!MDBX_DISABLE_VALIDATION && unlikely(!mc->subcur)) return unexpected_dupsort(mc); mc = &mc->subcur->cursor; if (unlikely(!is_filled(mc))) { if (is_hollow(mc)) return MDBX_ENODATA; if (mc->ki[mc->top] >= page_numkeys(mc->pg[mc->top])) return MDBX_NOTFOUND; } mp = mc->pg[mc->top]; if (is_dupfix_leaf(mp)) *data = page_dupfix_key(mp, mc->ki[mc->top], mc->tree->dupfix_size); else *data = get_key(page_node(mp, mc->ki[mc->top])); return MDBX_SUCCESS; } else { cASSERT(mc, !inner_pointed(mc)); return node_read(mc, node, data, mc->pg[mc->top]); } } case MDBX_GET_BOTH: case MDBX_GET_BOTH_RANGE: if (unlikely(data == nullptr)) return MDBX_EINVAL; if (unlikely(mc->subcur == nullptr)) return MDBX_INCOMPATIBLE; /* fall through */ __fallthrough; case MDBX_SET: case MDBX_SET_KEY: case MDBX_SET_RANGE: if (unlikely(key == nullptr)) return MDBX_EINVAL; rc = cursor_seek(mc, key, data, op).err; if (rc == MDBX_SUCCESS) cASSERT(mc, is_filled(mc)); else if (rc == MDBX_NOTFOUND && mc->tree->items) { cASSERT(mc, is_pointed(mc)); cASSERT(mc, op == MDBX_SET_RANGE || op == MDBX_GET_BOTH_RANGE || is_hollow(mc)); cASSERT(mc, op == MDBX_GET_BOTH_RANGE || inner_hollow(mc)); } else cASSERT(mc, is_poor(mc) && !is_filled(mc)); return rc; case MDBX_GET_MULTIPLE: if (unlikely(!data)) return MDBX_EINVAL; if (unlikely((mc->tree->flags & MDBX_DUPFIXED) == 0)) return MDBX_INCOMPATIBLE; if (unlikely(!is_pointed(mc))) { if (unlikely(!key)) return MDBX_EINVAL; if (unlikely((mc->flags & z_fresh) == 0)) return MDBX_ENODATA; rc = cursor_seek(mc, key, data, MDBX_SET).err; if (unlikely(rc != MDBX_SUCCESS)) return rc; } else { if (unlikely(is_eof(mc) || !inner_filled(mc))) return MDBX_ENODATA; cASSERT(mc, is_filled(mc)); if (key) { const page_t *mp = mc->pg[mc->top]; const node_t *node = page_node(mp, mc->ki[mc->top]); *key = get_key(node); } } goto fetch_multiple; case MDBX_NEXT_MULTIPLE: if (unlikely(!data)) return MDBX_EINVAL; if (unlikely(mc->subcur == nullptr)) return MDBX_INCOMPATIBLE; rc = outer_next(mc, key, data, MDBX_NEXT_DUP); if (unlikely(rc != MDBX_SUCCESS)) return rc; else { fetch_multiple: cASSERT(mc, is_filled(mc) && inner_filled(mc)); MDBX_cursor *mx = &mc->subcur->cursor; data->iov_len = page_numkeys(mx->pg[mx->top]) * mx->tree->dupfix_size; data->iov_base = page_data(mx->pg[mx->top]); mx->ki[mx->top] = (indx_t)page_numkeys(mx->pg[mx->top]) - 1; return MDBX_SUCCESS; } case MDBX_PREV_MULTIPLE: if (unlikely(!data)) return MDBX_EINVAL; if (unlikely(mc->subcur == nullptr)) return MDBX_INCOMPATIBLE; if (unlikely(!is_pointed(mc))) { if (unlikely((mc->flags & z_fresh) == 0)) return MDBX_ENODATA; rc = outer_last(mc, key, data); if (unlikely(rc != MDBX_SUCCESS)) return rc; mc->subcur->cursor.ki[mc->subcur->cursor.top] = 0; goto fetch_multiple; } if (unlikely(!is_filled(mc) || !inner_filled(mc))) return MDBX_ENODATA; rc = cursor_sibling_left(&mc->subcur->cursor); if (likely(rc == MDBX_SUCCESS)) goto fetch_multiple; return rc; case MDBX_NEXT_DUP: case MDBX_NEXT: case MDBX_NEXT_NODUP: rc = outer_next(mc, key, data, op); mc->flags &= ~z_eof_hard; ((cursor_couple_t *)mc)->inner.cursor.flags &= ~z_eof_hard; return rc; case MDBX_PREV_DUP: case MDBX_PREV: case MDBX_PREV_NODUP: return outer_prev(mc, key, data, op); case MDBX_FIRST: return outer_first(mc, key, data); case MDBX_LAST: return outer_last(mc, key, data); case MDBX_LAST_DUP: case MDBX_FIRST_DUP: if (unlikely(data == nullptr)) return MDBX_EINVAL; if (unlikely(!is_filled(mc))) return MDBX_ENODATA; else { node_t *node = page_node(mc->pg[mc->top], mc->ki[mc->top]); get_key_optional(node, key); if ((node_flags(node) & N_DUP) == 0) return node_read(mc, node, data, mc->pg[mc->top]); else if (MDBX_DISABLE_VALIDATION || likely(mc->subcur)) return ((op == MDBX_FIRST_DUP) ? inner_first : inner_last)(&mc->subcur->cursor, data); else return unexpected_dupsort(mc); } break; case MDBX_SET_UPPERBOUND: case MDBX_SET_LOWERBOUND: if (unlikely(key == nullptr || data == nullptr)) return MDBX_EINVAL; else { MDBX_val save_data = *data; csr_t csr = cursor_seek(mc, key, data, MDBX_SET_RANGE); rc = csr.err; if (rc == MDBX_SUCCESS && csr.exact && mc->subcur) { csr.exact = false; if (!save_data.iov_base) { /* Avoiding search nested dupfix hive if no data provided. * This is changes the semantic of MDBX_SET_LOWERBOUND but avoid * returning MDBX_BAD_VALSIZE. */ } else if (is_pointed(&mc->subcur->cursor)) { *data = save_data; csr = cursor_seek(&mc->subcur->cursor, data, nullptr, MDBX_SET_RANGE); rc = csr.err; if (rc == MDBX_NOTFOUND) { cASSERT(mc, !csr.exact); rc = outer_next(mc, key, data, MDBX_NEXT_NODUP); } } else { int cmp = mc->clc->v.cmp(&save_data, data); csr.exact = (cmp == 0); if (cmp > 0) rc = outer_next(mc, key, data, MDBX_NEXT_NODUP); } } if (rc == MDBX_SUCCESS && !csr.exact) rc = MDBX_RESULT_TRUE; if (unlikely(op == MDBX_SET_UPPERBOUND)) { /* minor fixups for MDBX_SET_UPPERBOUND */ if (rc == MDBX_RESULT_TRUE) /* already at great-than by MDBX_SET_LOWERBOUND */ rc = MDBX_SUCCESS; else if (rc == MDBX_SUCCESS) /* exactly match, going next */ rc = outer_next(mc, key, data, MDBX_NEXT); } } return rc; /* Doubtless API to positioning of the cursor at a specified key. */ case MDBX_TO_KEY_LESSER_THAN: case MDBX_TO_KEY_LESSER_OR_EQUAL: case MDBX_TO_KEY_EQUAL: case MDBX_TO_KEY_GREATER_OR_EQUAL: case MDBX_TO_KEY_GREATER_THAN: if (unlikely(key == nullptr)) return MDBX_EINVAL; else { csr_t csr = cursor_seek(mc, key, data, MDBX_SET_RANGE); rc = csr.err; if (csr.exact) { cASSERT(mc, csr.err == MDBX_SUCCESS); if (op == MDBX_TO_KEY_LESSER_THAN) rc = outer_prev(mc, key, data, MDBX_PREV_NODUP); else if (op == MDBX_TO_KEY_GREATER_THAN) rc = outer_next(mc, key, data, MDBX_NEXT_NODUP); } else if (op < MDBX_TO_KEY_EQUAL && (rc == MDBX_NOTFOUND || rc == MDBX_SUCCESS)) rc = outer_prev(mc, key, data, MDBX_PREV_NODUP); else if (op == MDBX_TO_KEY_EQUAL && rc == MDBX_SUCCESS) rc = MDBX_NOTFOUND; } return rc; /* Doubtless API to positioning of the cursor at a specified key-value pair * for multi-value hives. */ case MDBX_TO_EXACT_KEY_VALUE_LESSER_THAN: case MDBX_TO_EXACT_KEY_VALUE_LESSER_OR_EQUAL: case MDBX_TO_EXACT_KEY_VALUE_EQUAL: case MDBX_TO_EXACT_KEY_VALUE_GREATER_OR_EQUAL: case MDBX_TO_EXACT_KEY_VALUE_GREATER_THAN: if (unlikely(key == nullptr || data == nullptr)) return MDBX_EINVAL; else { MDBX_val save_data = *data; csr_t csr = cursor_seek(mc, key, data, MDBX_SET_KEY); rc = csr.err; if (rc == MDBX_SUCCESS) { cASSERT(mc, csr.exact); if (inner_pointed(mc)) { MDBX_cursor *const mx = &mc->subcur->cursor; csr = cursor_seek(mx, &save_data, nullptr, MDBX_SET_RANGE); rc = csr.err; if (csr.exact) { cASSERT(mc, csr.err == MDBX_SUCCESS); if (op == MDBX_TO_EXACT_KEY_VALUE_LESSER_THAN) rc = inner_prev(mx, data); else if (op == MDBX_TO_EXACT_KEY_VALUE_GREATER_THAN) rc = inner_next(mx, data); } else if (op < MDBX_TO_EXACT_KEY_VALUE_EQUAL && (rc == MDBX_NOTFOUND || rc == MDBX_SUCCESS)) rc = inner_prev(mx, data); else if (op == MDBX_TO_EXACT_KEY_VALUE_EQUAL && rc == MDBX_SUCCESS) rc = MDBX_NOTFOUND; } else { int cmp = mc->clc->v.cmp(data, &save_data); switch (op) { default: __unreachable(); case MDBX_TO_EXACT_KEY_VALUE_LESSER_THAN: rc = (cmp < 0) ? MDBX_SUCCESS : MDBX_NOTFOUND; break; case MDBX_TO_EXACT_KEY_VALUE_LESSER_OR_EQUAL: rc = (cmp <= 0) ? MDBX_SUCCESS : MDBX_NOTFOUND; break; case MDBX_TO_EXACT_KEY_VALUE_EQUAL: rc = (cmp == 0) ? MDBX_SUCCESS : MDBX_NOTFOUND; break; case MDBX_TO_EXACT_KEY_VALUE_GREATER_OR_EQUAL: rc = (cmp >= 0) ? MDBX_SUCCESS : MDBX_NOTFOUND; break; case MDBX_TO_EXACT_KEY_VALUE_GREATER_THAN: rc = (cmp > 0) ? MDBX_SUCCESS : MDBX_NOTFOUND; break; } } } } return rc; case MDBX_TO_PAIR_LESSER_THAN: case MDBX_TO_PAIR_LESSER_OR_EQUAL: case MDBX_TO_PAIR_EQUAL: case MDBX_TO_PAIR_GREATER_OR_EQUAL: case MDBX_TO_PAIR_GREATER_THAN: if (unlikely(key == nullptr || data == nullptr)) return MDBX_EINVAL; else { MDBX_val save_data = *data; csr_t csr = cursor_seek(mc, key, data, MDBX_SET_RANGE); rc = csr.err; if (csr.exact) { cASSERT(mc, csr.err == MDBX_SUCCESS); if (inner_pointed(mc)) { MDBX_cursor *const mx = &mc->subcur->cursor; csr = cursor_seek(mx, &save_data, nullptr, MDBX_SET_RANGE); rc = csr.err; if (csr.exact) { cASSERT(mc, csr.err == MDBX_SUCCESS); if (op == MDBX_TO_PAIR_LESSER_THAN) rc = outer_prev(mc, key, data, MDBX_PREV); else if (op == MDBX_TO_PAIR_GREATER_THAN) rc = outer_next(mc, key, data, MDBX_NEXT); } else if (op < MDBX_TO_PAIR_EQUAL && (rc == MDBX_NOTFOUND || rc == MDBX_SUCCESS)) rc = outer_prev(mc, key, data, MDBX_PREV); else if (op == MDBX_TO_PAIR_EQUAL && rc == MDBX_SUCCESS) rc = MDBX_NOTFOUND; else if (op > MDBX_TO_PAIR_EQUAL && rc == MDBX_NOTFOUND) rc = outer_next(mc, key, data, MDBX_NEXT); } else { int cmp = mc->clc->v.cmp(data, &save_data); switch (op) { default: __unreachable(); case MDBX_TO_PAIR_LESSER_THAN: if (cmp >= 0) rc = outer_prev(mc, key, data, MDBX_PREV); break; case MDBX_TO_PAIR_LESSER_OR_EQUAL: if (cmp > 0) rc = outer_prev(mc, key, data, MDBX_PREV); break; case MDBX_TO_PAIR_EQUAL: rc = (cmp == 0) ? MDBX_SUCCESS : MDBX_NOTFOUND; break; case MDBX_TO_PAIR_GREATER_OR_EQUAL: if (cmp < 0) rc = outer_next(mc, key, data, MDBX_NEXT); break; case MDBX_TO_PAIR_GREATER_THAN: if (cmp <= 0) rc = outer_next(mc, key, data, MDBX_NEXT); break; } } } else if (op < MDBX_TO_PAIR_EQUAL && (rc == MDBX_NOTFOUND || rc == MDBX_SUCCESS)) rc = outer_prev(mc, key, data, MDBX_PREV_NODUP); else if (op == MDBX_TO_PAIR_EQUAL && rc == MDBX_SUCCESS) rc = MDBX_NOTFOUND; } return rc; default: DEBUG("unhandled/unimplemented cursor operation %u", op); return MDBX_EINVAL; } }