mdbx: рефакторинг затенения и завершения курсоров, с удалением TXN_END_EOTDONE и добавлением txn_may_have_cursors.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2025-01-06 20:53:16 +03:00
parent b681b59434
commit 81e2623a54
9 changed files with 116 additions and 92 deletions

View File

@ -88,6 +88,7 @@ int mdbx_cursor_bind(const MDBX_txn *txn, MDBX_cursor *mc, MDBX_dbi dbi) {
mc->next = txn->cursors[dbi];
txn->cursors[dbi] = mc;
((MDBX_txn *)txn)->flags |= txn_may_have_cursors;
return MDBX_SUCCESS;
}

View File

@ -344,7 +344,7 @@ int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, M
(txn->parent ? txn->parent->tw.dirtyroom : txn->env->options.dp_limit));
env->txn = txn;
tASSERT(parent, parent->cursors[FREE_DBI] == nullptr);
rc = parent->cursors[MAIN_DBI] ? cursor_shadow(parent->cursors[MAIN_DBI], txn, MAIN_DBI) : MDBX_SUCCESS;
rc = txn_shadow_cursors(parent, MAIN_DBI);
if (AUDIT_ENABLED() && ASSERT_ENABLED()) {
txn->signature = txn_signature;
tASSERT(txn, audit_ex(txn, 0, false) == 0);
@ -370,8 +370,8 @@ int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, M
eASSERT(env, (txn->flags & ~(MDBX_NOSTICKYTHREADS | MDBX_TXN_RDONLY | MDBX_WRITEMAP |
/* Win32: SRWL flag */ txn_shrink_allowed)) == 0);
else {
eASSERT(env, (txn->flags & ~(MDBX_NOSTICKYTHREADS | MDBX_WRITEMAP | txn_shrink_allowed | MDBX_NOMETASYNC |
MDBX_SAFE_NOSYNC | MDBX_TXN_SPILLS)) == 0);
eASSERT(env, (txn->flags & ~(MDBX_NOSTICKYTHREADS | MDBX_WRITEMAP | txn_shrink_allowed | txn_may_have_cursors |
MDBX_NOMETASYNC | MDBX_SAFE_NOSYNC | MDBX_TXN_SPILLS)) == 0);
assert(!txn->tw.spilled.list && !txn->tw.spilled.least_removed);
}
txn->signature = txn_signature;
@ -521,9 +521,9 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
parent->tw.loose_count = txn->tw.loose_count;
parent->tw.loose_pages = txn->tw.loose_pages;
if (txn->flags & txn_may_have_cursors)
/* Merge our cursors into parent's and close them */
txn_done_cursors(txn, true);
end_mode |= TXN_END_EOTDONE;
txn_done_cursors(txn);
/* Update parent's DBs array */
eASSERT(env, parent->n_dbi == txn->n_dbi);
@ -581,8 +581,8 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
tASSERT(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length ==
(txn->parent ? txn->parent->tw.dirtyroom : env->options.dp_limit));
}
txn_done_cursors(txn, false);
end_mode |= TXN_END_EOTDONE;
if (txn->flags & txn_may_have_cursors)
txn_done_cursors(txn);
if ((!txn->tw.dirtylist || txn->tw.dirtylist->length == 0) &&
(txn->flags & (MDBX_TXN_DIRTY | MDBX_TXN_SPILLS)) == 0) {
@ -766,7 +766,7 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
goto fail;
}
end_mode = TXN_END_COMMITTED | TXN_END_UPDATE | TXN_END_EOTDONE;
end_mode = TXN_END_COMMITTED | TXN_END_UPDATE;
done:
if (latency)

View File

@ -8,7 +8,7 @@ N | MASK | ENV | TXN | DB | PUT | DBI | NOD
5 |0000 0020| |TXN_PARKED |INTEGERDUP|NODUPDATA | | |P_DUPFIX | |
6 |0000 0040| |TXN_AUTOUNPARK|REVERSEDUP|CURRENT |DBI_OLDEN | |P_SUBP | |
7 |0000 0080| |TXN_DRAINED_GC|DB_VALID |ALLDUPS |DBI_LINDO | | | |
8 |0000 0100| _MAY_MOVE | | | | | | | <= |
8 |0000 0100| _MAY_MOVE |TXN_CURSORS | | | | | | <= |
9 |0000 0200| _MAY_UNMAP| | | | | | | <= |
10|0000 0400| | | | | | | | |
11|0000 0800| | | | | | | | |

View File

@ -184,78 +184,73 @@ __hot int cursor_touch(MDBX_cursor *const mc, const MDBX_val *key, const MDBX_va
/*----------------------------------------------------------------------------*/
int cursor_shadow(MDBX_cursor *parent_cursor, MDBX_txn *nested_txn, const size_t dbi) {
int cursor_shadow(MDBX_cursor *cursor, MDBX_txn *nested_txn, const size_t dbi) {
tASSERT(nested_txn, cursor->signature == cur_signature_live);
tASSERT(nested_txn, cursor->txn != nested_txn);
cASSERT(cursor, cursor->txn->flags & txn_may_have_cursors);
cASSERT(cursor, dbi == cursor_dbi(cursor));
tASSERT(nested_txn, dbi > FREE_DBI && dbi < nested_txn->n_dbi);
const size_t size = parent_cursor->subcur ? sizeof(MDBX_cursor) + sizeof(subcur_t) : sizeof(MDBX_cursor);
for (MDBX_cursor *bk; parent_cursor; parent_cursor = bk->next) {
cASSERT(parent_cursor, parent_cursor != parent_cursor->next);
bk = parent_cursor;
if (parent_cursor->signature != cur_signature_live)
continue;
bk = osal_malloc(size);
if (unlikely(!bk))
const size_t size = cursor->subcur ? sizeof(MDBX_cursor) + sizeof(subcur_t) : sizeof(MDBX_cursor);
MDBX_cursor *const shadow = osal_malloc(size);
if (unlikely(!shadow))
return MDBX_ENOMEM;
#if MDBX_DEBUG
memset(bk, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(bk, size);
memset(shadow, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(shadow, size);
#endif /* MDBX_DEBUG */
*bk = *parent_cursor;
parent_cursor->backup = bk;
/* Kill pointers into src to reduce abuse: The
* user may not use mc until dst ends. But we need a valid
* txn pointer here for cursor fixups to keep working. */
parent_cursor->txn = nested_txn;
parent_cursor->tree = &nested_txn->dbs[dbi];
parent_cursor->dbi_state = &nested_txn->dbi_state[dbi];
subcur_t *mx = parent_cursor->subcur;
if (mx != nullptr) {
*(subcur_t *)(bk + 1) = *mx;
mx->cursor.txn = nested_txn;
mx->cursor.dbi_state = parent_cursor->dbi_state;
}
parent_cursor->next = nested_txn->cursors[dbi];
nested_txn->cursors[dbi] = parent_cursor;
*shadow = *cursor;
cursor->backup = shadow;
cursor->txn = nested_txn;
cursor->tree = &nested_txn->dbs[dbi];
cursor->dbi_state = &nested_txn->dbi_state[dbi];
subcur_t *subcur = cursor->subcur;
if (subcur) {
*(subcur_t *)(shadow + 1) = *subcur;
subcur->cursor.txn = nested_txn;
subcur->cursor.dbi_state = cursor->dbi_state;
}
return MDBX_SUCCESS;
}
void cursor_eot(MDBX_cursor *mc, const bool merge) {
const unsigned stage = mc->signature;
MDBX_cursor *const bk = mc->backup;
ENSURE(mc->txn->env, stage == cur_signature_live || (stage == cur_signature_wait4eot && bk));
if (bk) {
subcur_t *mx = mc->subcur;
cASSERT(mc, mc->txn->parent != nullptr);
/* Zap: Using uninitialized memory '*mc->backup'. */
void cursor_eot(MDBX_cursor *cursor) {
const unsigned stage = cursor->signature;
MDBX_cursor *const shadow = cursor->backup;
ENSURE(cursor->txn->env, stage == cur_signature_live || (stage == cur_signature_wait4eot && shadow));
if (shadow) {
subcur_t *subcur = cursor->subcur;
cASSERT(cursor, cursor->txn->parent != nullptr);
/* Zap: Using uninitialized memory '*cursor->backup'. */
MDBX_SUPPRESS_GOOFY_MSVC_ANALYZER(6001);
ENSURE(mc->txn->env, bk->signature == cur_signature_live);
cASSERT(mc, mx == bk->subcur);
if (merge) {
ENSURE(cursor->txn->env, shadow->signature == cur_signature_live);
cASSERT(cursor, subcur == shadow->subcur);
if (((cursor->txn->flags | cursor->txn->parent->flags) & MDBX_TXN_ERROR) == 0) {
/* Update pointers to parent txn */
mc->next = bk->next;
mc->backup = bk->backup;
mc->txn = bk->txn;
mc->tree = bk->tree;
mc->dbi_state = bk->dbi_state;
if (mx) {
mx->cursor.txn = mc->txn;
mx->cursor.dbi_state = mc->dbi_state;
cursor->next = shadow->next;
cursor->backup = shadow->backup;
cursor->txn = shadow->txn;
cursor->tree = shadow->tree;
cursor->dbi_state = shadow->dbi_state;
if (subcur) {
subcur->cursor.txn = cursor->txn;
subcur->cursor.dbi_state = cursor->dbi_state;
}
} else {
/* Restore from backup, i.e. rollback/abort nested txn */
*mc = *bk;
if (mx)
*mx = *(subcur_t *)(bk + 1);
*cursor = *shadow;
if (subcur)
*subcur = *(subcur_t *)(shadow + 1);
}
if (stage == cur_signature_wait4eot /* Cursor was closed by user */)
mc->signature = stage /* Promote closed state to parent txn */;
bk->signature = 0;
osal_free(bk);
cursor->signature = stage /* Promote closed state to parent txn */;
shadow->signature = 0;
osal_free(shadow);
} else {
ENSURE(mc->txn->env, stage == cur_signature_live);
mc->signature = cur_signature_ready4dispose /* Cursor may be reused */;
mc->next = mc;
ENSURE(cursor->txn->env, stage == cur_signature_live);
be_poor(cursor);
cursor->signature = cur_signature_ready4dispose /* Cursor may be reused */;
cursor->next = cursor;
}
}

View File

@ -292,8 +292,8 @@ MDBX_NOTHROW_PURE_FUNCTION static inline bool check_leaf_type(const MDBX_cursor
return (((page_type(mp) ^ mc->checking) & (z_branch | z_leaf | z_largepage | z_dupfix)) == 0);
}
MDBX_INTERNAL void cursor_eot(MDBX_cursor *mc, const bool merge);
MDBX_INTERNAL int cursor_shadow(MDBX_cursor *parent_cursor, MDBX_txn *nested_txn, const size_t dbi);
MDBX_INTERNAL void cursor_eot(MDBX_cursor *cursor);
MDBX_INTERNAL int cursor_shadow(MDBX_cursor *cursor, MDBX_txn *nested_txn, const size_t dbi);
MDBX_INTERNAL MDBX_cursor *cursor_cpstk(const MDBX_cursor *csrc, MDBX_cursor *cdst);

View File

@ -87,19 +87,12 @@ __noinline int dbi_import(MDBX_txn *txn, const size_t dbi) {
if (parent) {
/* вложенная пишущая транзакция */
int rc = dbi_check(parent, dbi);
/* копируем состояние table очищая new-флаги. */
/* копируем состояние dbi-хендла очищая new-флаги. */
eASSERT(env, txn->dbi_seqs == parent->dbi_seqs);
txn->dbi_state[dbi] = parent->dbi_state[dbi] & ~(DBI_FRESH | DBI_CREAT | DBI_DIRTY);
if (likely(rc == MDBX_SUCCESS)) {
txn->dbs[dbi] = parent->dbs[dbi];
if (parent->cursors[dbi]) {
rc = cursor_shadow(parent->cursors[dbi], txn, dbi);
if (unlikely(rc != MDBX_SUCCESS)) {
/* не получилось забекапить курсоры */
txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO | DBI_STALE;
txn->flags |= MDBX_TXN_ERROR;
}
}
rc = txn_shadow_cursors(parent, dbi);
}
return rc;
}

View File

@ -155,7 +155,8 @@ enum txn_flags {
txn_rw_begin_flags = MDBX_TXN_NOMETASYNC | MDBX_TXN_NOSYNC | MDBX_TXN_TRY,
txn_shrink_allowed = UINT32_C(0x40000000),
txn_parked = MDBX_TXN_PARKED,
txn_gc_drained = 0x40 /* GC was depleted up to oldest reader */,
txn_gc_drained = 0x80 /* GC was depleted up to oldest reader */,
txn_may_have_cursors = 0x100,
txn_state_flags = MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_DIRTY | MDBX_TXN_SPILLS | MDBX_TXN_HAS_CHILD |
MDBX_TXN_INVALID | txn_gc_drained
};

View File

@ -46,7 +46,8 @@ MDBX_INTERNAL int txn_renew(MDBX_txn *txn, unsigned flags);
MDBX_INTERNAL int txn_park(MDBX_txn *txn, bool autounpark);
MDBX_INTERNAL int txn_unpark(MDBX_txn *txn);
MDBX_INTERNAL int txn_check_badbits_parked(const MDBX_txn *txn, int bad_bits);
MDBX_INTERNAL void txn_done_cursors(MDBX_txn *txn, const bool merge);
MDBX_INTERNAL void txn_done_cursors(MDBX_txn *txn);
MDBX_INTERNAL int txn_shadow_cursors(const MDBX_txn *parent, const size_t dbi);
#define TXN_END_NAMES \
{"committed", "empty-commit", "abort", "reset", "fail-begin", "fail-beginchild", "ousted", nullptr}
@ -63,8 +64,7 @@ enum {
TXN_END_OPMASK = 0x07 /* mask for txn_end() operation number */,
TXN_END_UPDATE = 0x10 /* update env state (DBIs) */,
TXN_END_FREE = 0x20 /* free txn unless it is env.basal_txn */,
TXN_END_EOTDONE = 0x40 /* txn's cursors already closed */,
TXN_END_SLOT = 0x80 /* release any reader slot if NOSTICKYTHREADS */
TXN_END_SLOT = 0x40 /* release any reader slot if NOSTICKYTHREADS */
};
MDBX_INTERNAL int txn_end(MDBX_txn *txn, unsigned mode);
MDBX_INTERNAL int txn_write(MDBX_txn *txn, iov_ctx_t *ctx);

View File

@ -7,19 +7,51 @@ __hot txnid_t txn_snapshot_oldest(const MDBX_txn *const txn) {
return mvcc_shapshot_oldest(txn->env, txn->tw.troika.txnid[txn->tw.troika.prefer_steady]);
}
void txn_done_cursors(MDBX_txn *txn, const bool merge) {
void txn_done_cursors(MDBX_txn *txn) {
tASSERT(txn, txn->flags & txn_may_have_cursors);
tASSERT(txn, txn->cursors[FREE_DBI] == nullptr);
TXN_FOREACH_DBI_FROM(txn, i, /* skip FREE_DBI */ 1) {
MDBX_cursor *mc = txn->cursors[i];
if (mc) {
MDBX_cursor *cursor = txn->cursors[i];
if (cursor) {
txn->cursors[i] = nullptr;
do {
MDBX_cursor *const next = mc->next;
cursor_eot(mc, merge);
mc = next;
} while (mc);
MDBX_cursor *const next = cursor->next;
cursor_eot(cursor);
cursor = next;
} while (cursor);
}
}
txn->flags &= ~txn_may_have_cursors;
}
int txn_shadow_cursors(const MDBX_txn *parent, const size_t dbi) {
tASSERT(parent, dbi > FREE_DBI && dbi < parent->n_dbi);
MDBX_cursor *cursor = parent->cursors[dbi];
if (!cursor)
return MDBX_SUCCESS;
MDBX_txn *const txn = parent->nested;
tASSERT(parent, parent->flags & txn_may_have_cursors);
MDBX_cursor *next = nullptr;
do {
next = cursor->next;
if (cursor->signature != cur_signature_live)
continue;
tASSERT(parent, cursor->txn == parent && dbi == cursor_dbi(cursor));
int err = cursor_shadow(cursor, txn, dbi);
if (unlikely(err != MDBX_SUCCESS)) {
/* не получилось забекапить курсоры */
txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO | DBI_STALE;
txn->flags |= MDBX_TXN_ERROR;
return err;
}
cursor->next = txn->cursors[dbi];
txn->cursors[dbi] = cursor;
txn->flags |= txn_may_have_cursors;
} while ((cursor = next) != nullptr);
return MDBX_SUCCESS;
}
int txn_write(MDBX_txn *txn, iov_ctx_t *ctx) {
@ -847,7 +879,7 @@ int txn_renew(MDBX_txn *txn, unsigned flags) {
}
bailout:
tASSERT(txn, rc != MDBX_SUCCESS);
txn_end(txn, TXN_END_SLOT | TXN_END_EOTDONE | TXN_END_FAIL_BEGIN);
txn_end(txn, TXN_END_SLOT | TXN_END_FAIL_BEGIN);
return rc;
}
@ -859,8 +891,10 @@ int txn_end(MDBX_txn *txn, unsigned mode) {
txn->txnid, (txn->flags & MDBX_TXN_RDONLY) ? 'r' : 'w', txn->flags, (void *)txn, (void *)env,
txn->dbs[MAIN_DBI].root, txn->dbs[FREE_DBI].root);
if (!(mode & TXN_END_EOTDONE)) /* !(already closed cursors) */
txn_done_cursors(txn, false);
if (txn->flags & txn_may_have_cursors) {
txn->flags |= /* avoid merge cursors' state */ MDBX_TXN_ERROR;
txn_done_cursors(txn);
}
int rc = MDBX_SUCCESS;
if (txn->flags & MDBX_TXN_RDONLY) {