mdbx: rework internal DBI-handles serials, validation and import into a txn (squashed).

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-05-26 18:37:32 +03:00
parent f91dbc6864
commit c7bde8be8d

View File

@ -8461,29 +8461,56 @@ int mdbx_txn_flags(const MDBX_txn *txn) {
#define TXN_DBI_CHANGED(txn, dbi) \ #define TXN_DBI_CHANGED(txn, dbi) \
((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi]) ((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
static __inline unsigned dbi_seq(const MDBX_env *const env, unsigned slot) {
unsigned v = env->me_dbiseqs[slot] + 1;
return v + (v == 0);
}
static void dbi_import_locked(MDBX_txn *txn) { static void dbi_import_locked(MDBX_txn *txn) {
MDBX_env *const env = txn->mt_env; const MDBX_env *const env = txn->mt_env;
const unsigned n = env->me_numdbs; unsigned n = env->me_numdbs;
for (unsigned i = CORE_DBS; i < n; ++i) { for (unsigned i = CORE_DBS; i < n; ++i) {
if (i >= txn->mt_numdbs) { if (i >= txn->mt_numdbs) {
txn->mt_dbistate[i] = 0;
txn->mt_cursors[i] = NULL; txn->mt_cursors[i] = NULL;
if (txn->mt_dbiseqs != env->me_dbiseqs)
txn->mt_dbiseqs[i] = 0;
txn->mt_dbistate[i] = 0;
} }
if ((env->me_dbflags[i] & DB_VALID) && if ((TXN_DBI_CHANGED(txn, i) &&
!(txn->mt_dbistate[i] & DBI_USRVALID)) { (txn->mt_dbistate[i] & (DBI_CREAT | DBI_DIRTY | DBI_FRESH)) == 0) ||
((env->me_dbflags[i] & DB_VALID) &&
!(txn->mt_dbistate[i] & DBI_VALID))) {
mdbx_tassert(txn, (txn->mt_dbistate[i] &
(DBI_CREAT | DBI_DIRTY | DBI_FRESH)) == 0);
txn->mt_dbiseqs[i] = env->me_dbiseqs[i]; txn->mt_dbiseqs[i] = env->me_dbiseqs[i];
txn->mt_dbs[i].md_flags = env->me_dbflags[i] & DB_PERSISTENT_FLAGS; txn->mt_dbs[i].md_flags = env->me_dbflags[i] & DB_PERSISTENT_FLAGS;
txn->mt_dbistate[i] = 0;
if (env->me_dbflags[i] & DB_VALID) {
txn->mt_dbistate[i] = DBI_VALID | DBI_USRVALID | DBI_STALE; txn->mt_dbistate[i] = DBI_VALID | DBI_USRVALID | DBI_STALE;
mdbx_tassert(txn, txn->mt_dbxs[i].md_cmp != NULL); mdbx_tassert(txn, txn->mt_dbxs[i].md_cmp != NULL);
mdbx_tassert(txn, txn->mt_dbxs[i].md_name.iov_base != NULL); mdbx_tassert(txn, txn->mt_dbxs[i].md_name.iov_base != NULL);
} }
} }
}
while (unlikely(n < txn->mt_numdbs))
if (txn->mt_cursors[txn->mt_numdbs - 1] == NULL &&
(txn->mt_dbistate[txn->mt_numdbs - 1] & DBI_USRVALID) == 0)
txn->mt_numdbs -= 1;
else {
if ((txn->mt_dbistate[n] & DBI_USRVALID) == 0) {
if (txn->mt_dbiseqs != env->me_dbiseqs)
txn->mt_dbiseqs[n] = 0;
txn->mt_dbistate[n] = 0;
}
++n;
}
txn->mt_numdbs = n; txn->mt_numdbs = n;
} }
/* Import DBI which opened after txn started into context */ /* Import DBI which opened after txn started into context */
__cold static bool dbi_import(MDBX_txn *txn, MDBX_dbi dbi) { __cold static bool dbi_import(MDBX_txn *txn, MDBX_dbi dbi) {
if (dbi < CORE_DBS || dbi >= txn->mt_env->me_numdbs) if (dbi < CORE_DBS ||
(dbi >= txn->mt_numdbs && dbi >= txn->mt_env->me_numdbs))
return false; return false;
mdbx_ensure(txn->mt_env, mdbx_fastmutex_acquire(&txn->mt_env->me_dbi_lock) == mdbx_ensure(txn->mt_env, mdbx_fastmutex_acquire(&txn->mt_env->me_dbi_lock) ==
@ -8520,7 +8547,7 @@ static void dbi_update(MDBX_txn *txn, int keep) {
env->me_dbxs[i].md_name.iov_len = 0; env->me_dbxs[i].md_name.iov_len = 0;
mdbx_memory_fence(mo_AcquireRelease, true); mdbx_memory_fence(mo_AcquireRelease, true);
mdbx_assert(env, env->me_dbflags[i] == 0); mdbx_assert(env, env->me_dbflags[i] == 0);
env->me_dbiseqs[i]++; env->me_dbiseqs[i] = dbi_seq(env, i);
env->me_dbxs[i].md_name.iov_base = NULL; env->me_dbxs[i].md_name.iov_base = NULL;
mdbx_free(ptr); mdbx_free(ptr);
} }
@ -9844,11 +9871,16 @@ static int mdbx_txn_write(MDBX_txn *txn, struct mdbx_iov_ctx *ctx) {
/* Check txn and dbi arguments to a function */ /* Check txn and dbi arguments to a function */
static __always_inline bool check_dbi(MDBX_txn *txn, MDBX_dbi dbi, static __always_inline bool check_dbi(MDBX_txn *txn, MDBX_dbi dbi,
unsigned validity) { unsigned validity) {
if (likely(dbi < txn->mt_numdbs)) if (likely(dbi < txn->mt_numdbs)) {
return likely((txn->mt_dbistate[dbi] & validity) && mdbx_memory_fence(mo_AcquireRelease, false);
!TXN_DBI_CHANGED(txn, dbi) && if (likely(!TXN_DBI_CHANGED(txn, dbi))) {
(txn->mt_dbxs[dbi].md_name.iov_base || dbi < CORE_DBS)); if (likely(txn->mt_dbistate[dbi] & validity))
return true;
if (likely(dbi < CORE_DBS ||
(txn->mt_env->me_dbflags[dbi] & DB_VALID) == 0))
return false;
}
}
return dbi_import(txn, dbi); return dbi_import(txn, dbi);
} }
@ -10422,10 +10454,6 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
goto fail; goto fail;
for (MDBX_dbi i = CORE_DBS; i < txn->mt_numdbs; i++) { for (MDBX_dbi i = CORE_DBS; i < txn->mt_numdbs; i++) {
if (txn->mt_dbistate[i] & DBI_DIRTY) { if (txn->mt_dbistate[i] & DBI_DIRTY) {
if (unlikely(TXN_DBI_CHANGED(txn, i))) {
rc = MDBX_BAD_DBI;
goto fail;
}
MDBX_db *db = &txn->mt_dbs[i]; MDBX_db *db = &txn->mt_dbs[i];
mdbx_debug("update main's entry for sub-db %u, mod_txnid %" PRIaTXN mdbx_debug("update main's entry for sub-db %u, mod_txnid %" PRIaTXN
" -> %" PRIaTXN, " -> %" PRIaTXN,
@ -16643,9 +16671,6 @@ static __inline int mdbx_couple_init(MDBX_cursor_couple *couple,
/* Initialize a cursor for a given transaction and database. */ /* Initialize a cursor for a given transaction and database. */
static int mdbx_cursor_init(MDBX_cursor *mc, MDBX_txn *txn, MDBX_dbi dbi) { static int mdbx_cursor_init(MDBX_cursor *mc, MDBX_txn *txn, MDBX_dbi dbi) {
STATIC_ASSERT(offsetof(MDBX_cursor_couple, outer) == 0); STATIC_ASSERT(offsetof(MDBX_cursor_couple, outer) == 0);
if (unlikely(TXN_DBI_CHANGED(txn, dbi)))
return MDBX_BAD_DBI;
return mdbx_couple_init(container_of(mc, MDBX_cursor_couple, outer), dbi, txn, return mdbx_couple_init(container_of(mc, MDBX_cursor_couple, outer), dbi, txn,
&txn->mt_dbs[dbi], &txn->mt_dbxs[dbi], &txn->mt_dbs[dbi], &txn->mt_dbxs[dbi],
&txn->mt_dbistate[dbi]); &txn->mt_dbistate[dbi]);
@ -20441,15 +20466,16 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
txn->mt_dbistate[slot] = (uint8_t)dbiflags; txn->mt_dbistate[slot] = (uint8_t)dbiflags;
txn->mt_dbxs[slot].md_name.iov_base = namedup; txn->mt_dbxs[slot].md_name.iov_base = namedup;
txn->mt_dbxs[slot].md_name.iov_len = len; txn->mt_dbxs[slot].md_name.iov_len = len;
txn->mt_dbiseqs[slot] = ++env->me_dbiseqs[slot]; txn->mt_dbiseqs[slot] = env->me_dbiseqs[slot] = dbi_seq(env, slot);
if (!(dbiflags & DBI_CREAT)) if (!(dbiflags & DBI_CREAT))
env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | DB_VALID; env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | DB_VALID;
if (txn->mt_numdbs == slot) { if (txn->mt_numdbs == slot) {
mdbx_compiler_barrier(); mdbx_compiler_barrier();
txn->mt_numdbs = env->me_numdbs = slot + 1; txn->mt_numdbs = slot + 1;
txn->mt_cursors[slot] = NULL; txn->mt_cursors[slot] = NULL;
} }
mdbx_assert(env, env->me_numdbs > slot); if (env->me_numdbs <= slot)
env->me_numdbs = slot + 1;
*dbi = slot; *dbi = slot;
} }
@ -20509,7 +20535,6 @@ static int mdbx_dbi_close_locked(MDBX_env *env, MDBX_dbi dbi) {
return MDBX_BAD_DBI; return MDBX_BAD_DBI;
env->me_dbflags[dbi] = 0; env->me_dbflags[dbi] = 0;
env->me_dbiseqs[dbi]++;
env->me_dbxs[dbi].md_name.iov_len = 0; env->me_dbxs[dbi].md_name.iov_len = 0;
mdbx_memory_fence(mo_AcquireRelease, true); mdbx_memory_fence(mo_AcquireRelease, true);
env->me_dbxs[dbi].md_name.iov_base = NULL; env->me_dbxs[dbi].md_name.iov_base = NULL;
@ -22821,7 +22846,7 @@ int mdbx_set_attr(MDBX_txn *txn, MDBX_dbi dbi, MDBX_val *key, MDBX_val *data,
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE)) if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDBX_EBADSIGN; return MDBX_EBADSIGN;
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID))) if (unlikely(!check_dbi(txn, dbi, DB_USRVALID)))
return MDBX_BAD_DBI; return MDBX_BAD_DBI;
if (unlikely(txn->mt_flags & (MDBX_TXN_RDONLY | MDBX_TXN_BLOCKED))) if (unlikely(txn->mt_flags & (MDBX_TXN_RDONLY | MDBX_TXN_BLOCKED)))