mdbx: выделение cursor_del() для уменьшения кол-ва проверок.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-12-27 15:03:44 +03:00
parent 66a5704949
commit df63ff0e7e

View File

@ -3338,7 +3338,8 @@ static int __must_check_result cursor_put_nochecklen(MDBX_cursor *mc,
MDBX_val *data,
unsigned flags);
static int __must_check_result cursor_check_updating(MDBX_cursor *mc);
static int __must_check_result cursor_del(MDBX_cursor *mc);
static int __must_check_result cursor_del(MDBX_cursor *mc,
MDBX_put_flags_t flags);
static int __must_check_result delete(MDBX_txn *txn, MDBX_dbi dbi,
const MDBX_val *key, const MDBX_val *data,
unsigned flags);
@ -10014,7 +10015,7 @@ static int gcu_clean_stored_retired(MDBX_txn *txn, gcu_context_t *ctx) {
const struct cursor_set_result csr = cursor_set(gc, &key, &val, MDBX_SET);
if (csr.err == MDBX_SUCCESS && csr.exact) {
ctx->retired_stored = 0;
err = mdbx_cursor_del(gc, 0);
err = cursor_del(gc, 0);
TRACE("== clear-4linear, backlog %zu, err %d", gcu_backlog_size(txn),
err);
}
@ -10217,7 +10218,7 @@ retry:
TRACE("%s: cleanup-reclaimed-id [%zu]%" PRIaTXN, dbg_prefix_mode,
ctx->cleaned_slot, ctx->cleaned_id);
tASSERT(txn, *txn->mt_cursors == &ctx->cursor);
rc = mdbx_cursor_del(&ctx->cursor, 0);
rc = cursor_del(&ctx->cursor, 0);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
} while (ctx->cleaned_slot < MDBX_PNL_GETSIZE(txn->tw.lifo_reclaimed));
@ -10252,7 +10253,7 @@ retry:
TRACE("%s: cleanup-reclaimed-id %" PRIaTXN, dbg_prefix_mode,
ctx->cleaned_id);
tASSERT(txn, *txn->mt_cursors == &ctx->cursor);
rc = mdbx_cursor_del(&ctx->cursor, 0);
rc = cursor_del(&ctx->cursor, 0);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
}
@ -17019,14 +17020,14 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
if (mc->mc_xcursor->mx_db.md_entries > 1 ||
current_data.iov_len != data->iov_len) {
drop_current:
err = mdbx_cursor_del(mc, flags & MDBX_ALLDUPS);
err = cursor_del(mc, flags & MDBX_ALLDUPS);
if (unlikely(err != MDBX_SUCCESS))
return err;
flags -= MDBX_CURRENT;
goto skip_check_samedata;
}
} else if (unlikely(node_size(key, data) > env->me_leaf_nodemax)) {
err = mdbx_cursor_del(mc, 0);
err = cursor_del(mc, 0);
if (unlikely(err != MDBX_SUCCESS))
return err;
flags -= MDBX_CURRENT;
@ -17089,7 +17090,7 @@ static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
}
if (unlikely(flags & MDBX_ALLDUPS) && mc->mc_xcursor &&
(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
err = mdbx_cursor_del(mc, MDBX_ALLDUPS);
err = cursor_del(mc, MDBX_ALLDUPS);
if (unlikely(err != MDBX_SUCCESS))
return err;
flags -= MDBX_ALLDUPS;
@ -17771,7 +17772,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
return cursor_put_checklen(mc, key, data, flags);
}
__hot int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
if (unlikely(!mc))
return MDBX_EINVAL;
@ -17792,7 +17793,14 @@ __hot int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
if (unlikely(mc->mc_ki[mc->mc_top] >= page_numkeys(mc->mc_pg[mc->mc_top])))
return MDBX_NOTFOUND;
rc = cursor_touch(mc, nullptr, nullptr);
return cursor_del(mc, flags);
}
static __hot int cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
cASSERT(mc, mc->mc_flags & C_INITIALIZED);
cASSERT(mc, mc->mc_ki[mc->mc_top] < page_numkeys(mc->mc_pg[mc->mc_top]));
int rc = cursor_touch(mc, nullptr, nullptr);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
@ -17808,22 +17816,21 @@ __hot int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (node_flags(node) & F_DUPDATA) {
if (flags & (MDBX_ALLDUPS | /* for compatibility */ MDBX_NODUPDATA)) {
/* cursor_del() will subtract the final entry */
/* will subtract the final entry later */
mc->mc_db->md_entries -= mc->mc_xcursor->mx_db.md_entries - 1;
mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
} else {
if (!(node_flags(node) & F_SUBDATA))
mc->mc_xcursor->mx_cursor.mc_pg[0] = node_data(node);
rc = mdbx_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
rc = cursor_del(&mc->mc_xcursor->mx_cursor, 0);
if (unlikely(rc))
return rc;
/* If sub-DB still has entries, we're done */
if (mc->mc_xcursor->mx_db.md_entries) {
if (node_flags(node) & F_SUBDATA) {
/* update subDB info */
void *db = node_data(node);
mc->mc_xcursor->mx_db.md_mod_txnid = mc->mc_txn->mt_txnid;
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDBX_db));
memcpy(node_data(node), &mc->mc_xcursor->mx_db, sizeof(MDBX_db));
} else {
/* shrink fake page */
node_shrink(mp, mc->mc_ki[mc->mc_top]);
@ -17878,7 +17885,109 @@ __hot int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
}
del_key:
return cursor_del(mc);
mc->mc_db->md_entries--;
const MDBX_dbi dbi = mc->mc_dbi;
indx_t ki = mc->mc_ki[mc->mc_top];
mp = mc->mc_pg[mc->mc_top];
cASSERT(mc, IS_LEAF(mp));
node_del(mc, mc->mc_db->md_xsize);
/* Adjust other cursors pointing to mp */
for (MDBX_cursor *m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
MDBX_cursor *m3 = (mc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (m3 == mc || !(m2->mc_flags & m3->mc_flags & C_INITIALIZED))
continue;
if (m3->mc_snum < mc->mc_snum)
continue;
if (m3->mc_pg[mc->mc_top] == mp) {
if (m3->mc_ki[mc->mc_top] == ki) {
m3->mc_flags |= C_DEL;
if (mc->mc_db->md_flags & MDBX_DUPSORT) {
/* Sub-cursor referred into dataset which is gone */
m3->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED | C_EOF);
}
continue;
} else if (m3->mc_ki[mc->mc_top] > ki) {
m3->mc_ki[mc->mc_top]--;
}
if (XCURSOR_INITED(m3))
XCURSOR_REFRESH(m3, m3->mc_pg[mc->mc_top], m3->mc_ki[mc->mc_top]);
}
}
rc = rebalance(mc);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
if (unlikely(!mc->mc_snum)) {
/* DB is totally empty now, just bail out.
* Other cursors adjustments were already done
* by rebalance and aren't needed here. */
cASSERT(mc, mc->mc_db->md_entries == 0 && mc->mc_db->md_depth == 0 &&
mc->mc_db->md_root == P_INVALID);
mc->mc_flags |= C_EOF;
return MDBX_SUCCESS;
}
ki = mc->mc_ki[mc->mc_top];
mp = mc->mc_pg[mc->mc_top];
cASSERT(mc, IS_LEAF(mc->mc_pg[mc->mc_top]));
size_t nkeys = page_numkeys(mp);
cASSERT(mc, (mc->mc_db->md_entries > 0 && nkeys > 0) ||
((mc->mc_flags & C_SUB) && mc->mc_db->md_entries == 0 &&
nkeys == 0));
/* Adjust this and other cursors pointing to mp */
for (MDBX_cursor *m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
MDBX_cursor *m3 = (mc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (!(m2->mc_flags & m3->mc_flags & C_INITIALIZED))
continue;
if (m3->mc_snum < mc->mc_snum)
continue;
if (m3->mc_pg[mc->mc_top] == mp) {
/* if m3 points past last node in page, find next sibling */
if (m3->mc_ki[mc->mc_top] >= nkeys) {
rc = cursor_sibling(m3, SIBLING_RIGHT);
if (rc == MDBX_NOTFOUND) {
m3->mc_flags |= C_EOF;
rc = MDBX_SUCCESS;
continue;
}
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
}
if (m3->mc_ki[mc->mc_top] >= ki ||
/* moved to right sibling */ m3->mc_pg[mc->mc_top] != mp) {
if (m3->mc_xcursor && !(m3->mc_flags & C_EOF)) {
node = page_node(m3->mc_pg[m3->mc_top], m3->mc_ki[m3->mc_top]);
/* If this node has dupdata, it may need to be reinited
* because its data has moved.
* If the xcursor was not inited it must be reinited.
* Else if node points to a subDB, nothing is needed. */
if (node_flags(node) & F_DUPDATA) {
if (m3->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED) {
if (!(node_flags(node) & F_SUBDATA))
m3->mc_xcursor->mx_cursor.mc_pg[0] = node_data(node);
} else {
rc = cursor_xinit1(m3, node, m3->mc_pg[m3->mc_top]);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
rc = cursor_first(&m3->mc_xcursor->mx_cursor, NULL, NULL);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
}
}
m3->mc_xcursor->mx_cursor.mc_flags |= C_DEL;
}
m3->mc_flags |= C_DEL;
}
}
}
cASSERT(mc, rc == MDBX_SUCCESS);
if (AUDIT_ENABLED())
rc = cursor_check(mc);
return rc;
fail:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
@ -20058,124 +20167,6 @@ __cold static int cursor_check_updating(MDBX_cursor *mc) {
return rc;
}
/* Complete a delete operation started by mdbx_cursor_del(). */
static int cursor_del(MDBX_cursor *mc) {
int rc;
MDBX_page *mp;
indx_t ki;
size_t nkeys;
MDBX_dbi dbi = mc->mc_dbi;
cASSERT(mc, cursor_is_tracked(mc));
cASSERT(mc, IS_LEAF(mc->mc_pg[mc->mc_top]));
ki = mc->mc_ki[mc->mc_top];
mp = mc->mc_pg[mc->mc_top];
node_del(mc, mc->mc_db->md_xsize);
mc->mc_db->md_entries--;
/* Adjust other cursors pointing to mp */
for (MDBX_cursor *m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
MDBX_cursor *m3 = (mc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (m3 == mc || !(m2->mc_flags & m3->mc_flags & C_INITIALIZED))
continue;
if (m3->mc_snum < mc->mc_snum)
continue;
if (m3->mc_pg[mc->mc_top] == mp) {
if (m3->mc_ki[mc->mc_top] == ki) {
m3->mc_flags |= C_DEL;
if (mc->mc_db->md_flags & MDBX_DUPSORT) {
/* Sub-cursor referred into dataset which is gone */
m3->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED | C_EOF);
}
continue;
} else if (m3->mc_ki[mc->mc_top] > ki) {
m3->mc_ki[mc->mc_top]--;
}
if (XCURSOR_INITED(m3))
XCURSOR_REFRESH(m3, m3->mc_pg[mc->mc_top], m3->mc_ki[mc->mc_top]);
}
}
rc = rebalance(mc);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
if (unlikely(!mc->mc_snum)) {
/* DB is totally empty now, just bail out.
* Other cursors adjustments were already done
* by rebalance and aren't needed here. */
cASSERT(mc, mc->mc_db->md_entries == 0 && mc->mc_db->md_depth == 0 &&
mc->mc_db->md_root == P_INVALID);
mc->mc_flags |= C_EOF;
return MDBX_SUCCESS;
}
ki = mc->mc_ki[mc->mc_top];
mp = mc->mc_pg[mc->mc_top];
cASSERT(mc, IS_LEAF(mc->mc_pg[mc->mc_top]));
nkeys = page_numkeys(mp);
cASSERT(mc, (mc->mc_db->md_entries > 0 && nkeys > 0) ||
((mc->mc_flags & C_SUB) && mc->mc_db->md_entries == 0 &&
nkeys == 0));
/* Adjust this and other cursors pointing to mp */
for (MDBX_cursor *m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
MDBX_cursor *m3 = (mc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
if (!(m2->mc_flags & m3->mc_flags & C_INITIALIZED))
continue;
if (m3->mc_snum < mc->mc_snum)
continue;
if (m3->mc_pg[mc->mc_top] == mp) {
/* if m3 points past last node in page, find next sibling */
if (m3->mc_ki[mc->mc_top] >= nkeys) {
rc = cursor_sibling(m3, SIBLING_RIGHT);
if (rc == MDBX_NOTFOUND) {
m3->mc_flags |= C_EOF;
rc = MDBX_SUCCESS;
continue;
}
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
}
if (m3->mc_ki[mc->mc_top] >= ki ||
/* moved to right sibling */ m3->mc_pg[mc->mc_top] != mp) {
if (m3->mc_xcursor && !(m3->mc_flags & C_EOF)) {
MDBX_node *node =
page_node(m3->mc_pg[m3->mc_top], m3->mc_ki[m3->mc_top]);
/* If this node has dupdata, it may need to be reinited
* because its data has moved.
* If the xcursor was not inited it must be reinited.
* Else if node points to a subDB, nothing is needed. */
if (node_flags(node) & F_DUPDATA) {
if (m3->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED) {
if (!(node_flags(node) & F_SUBDATA))
m3->mc_xcursor->mx_cursor.mc_pg[0] = node_data(node);
} else {
rc = cursor_xinit1(m3, node, m3->mc_pg[m3->mc_top]);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
rc = cursor_first(&m3->mc_xcursor->mx_cursor, NULL, NULL);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
}
}
m3->mc_xcursor->mx_cursor.mc_flags |= C_DEL;
}
m3->mc_flags |= C_DEL;
}
}
}
cASSERT(mc, rc == MDBX_SUCCESS);
if (AUDIT_ENABLED())
rc = cursor_check(mc);
return rc;
bailout:
mc->mc_txn->mt_flags |= MDBX_TXN_ERROR;
return rc;
}
int mdbx_del(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
const MDBX_val *data) {
int rc = check_txn_rw(txn, MDBX_TXN_BLOCKED);
@ -20228,7 +20219,7 @@ static int delete(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
* cursor to be consistent until the end of the rebalance. */
cx.outer.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &cx.outer;
rc = mdbx_cursor_del(&cx.outer, flags);
rc = cursor_del(&cx.outer, flags);
txn->mt_cursors[dbi] = cx.outer.mc_next;
}
return rc;
@ -24100,7 +24091,7 @@ int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
if (likely(new_data))
rc = cursor_put_checklen(&cx.outer, key, new_data, flags);
else
rc = mdbx_cursor_del(&cx.outer, flags & MDBX_ALLDUPS);
rc = cursor_del(&cx.outer, flags & MDBX_ALLDUPS);
bailout:
txn->mt_cursors[dbi] = cx.outer.mc_next;