mdbx: rework open/import/export of DBI-handles for robustness

Resolve https://github.com/erthink/libmdbx/issues/146

Change-Id: Idd18dc0d038eeba47668983ecf4ff46eabd16de5
This commit is contained in:
Leonid Yuriev 2020-12-17 01:21:48 +03:00
parent 166ed1c7d4
commit cda64ca663

View File

@ -6816,8 +6816,45 @@ int mdbx_txn_flags(const MDBX_txn *txn) {
return txn->mt_flags; return txn->mt_flags;
} }
/* Check for misused dbi handles */
#define TXN_DBI_CHANGED(txn, dbi) \
((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
static void dbi_import_locked(MDBX_txn *txn) {
MDBX_env *const env = txn->mt_env;
const unsigned n = env->me_numdbs;
for (unsigned i = CORE_DBS; i < n; ++i) {
if (i >= txn->mt_numdbs) {
txn->mt_dbistate[i] = 0;
if (!(txn->mt_flags & MDBX_TXN_RDONLY))
txn->tw.cursors[i] = NULL;
}
if ((env->me_dbflags[i] & DB_VALID) &&
!(txn->mt_dbistate[i] & DBI_USRVALID)) {
txn->mt_dbiseqs[i] = env->me_dbiseqs[i];
txn->mt_dbs[i].md_flags = env->me_dbflags[i] & DB_PERSISTENT_FLAGS;
txn->mt_dbistate[i] = DBI_VALID | DBI_USRVALID | DBI_STALE;
mdbx_tassert(txn, txn->mt_dbxs[i].md_cmp != NULL);
}
}
txn->mt_numdbs = n;
}
/* Import DBI which opened after txn started into context */
static __cold bool dbi_import(MDBX_txn *txn, MDBX_dbi dbi) {
if (dbi < CORE_DBS || dbi >= txn->mt_env->me_numdbs)
return false;
mdbx_ensure(txn->mt_env, mdbx_fastmutex_acquire(&txn->mt_env->me_dbi_lock) ==
MDBX_SUCCESS);
dbi_import_locked(txn);
mdbx_ensure(txn->mt_env, mdbx_fastmutex_release(&txn->mt_env->me_dbi_lock) ==
MDBX_SUCCESS);
return txn->mt_dbistate[dbi] & DBI_USRVALID;
}
/* Export or close DBI handles opened in this txn. */ /* Export or close DBI handles opened in this txn. */
static void mdbx_dbis_update(MDBX_txn *txn, int keep) { static void dbi_update(MDBX_txn *txn, int keep) {
mdbx_tassert(txn, !txn->mt_parent && txn == txn->mt_env->me_txn0); mdbx_tassert(txn, !txn->mt_parent && txn == txn->mt_env->me_txn0);
MDBX_dbi n = txn->mt_numdbs; MDBX_dbi n = txn->mt_numdbs;
if (n) { if (n) {
@ -6931,7 +6968,7 @@ static int mdbx_txn_end(MDBX_txn *txn, unsigned mode) {
if (txn == env->me_txn0) { if (txn == env->me_txn0) {
mdbx_assert(env, txn->mt_parent == NULL); mdbx_assert(env, txn->mt_parent == NULL);
/* Export or close DBI handles created in this txn */ /* Export or close DBI handles created in this txn */
mdbx_dbis_update(txn, mode & MDBX_END_UPDATE); dbi_update(txn, mode & MDBX_END_UPDATE);
mdbx_pnl_shrink(&txn->tw.retired_pages); mdbx_pnl_shrink(&txn->tw.retired_pages);
mdbx_pnl_shrink(&txn->tw.reclaimed_pglist); mdbx_pnl_shrink(&txn->tw.reclaimed_pglist);
/* The writer mutex was locked in mdbx_txn_begin. */ /* The writer mutex was locked in mdbx_txn_begin. */
@ -8032,42 +8069,13 @@ __hot static int mdbx_page_flush(MDBX_txn *txn, const unsigned keep) {
return MDBX_SUCCESS; return MDBX_SUCCESS;
} }
/* Check for misused dbi handles */
#define TXN_DBI_CHANGED(txn, dbi) \
((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
/* Import DBI which opened after txn started into context */
static __cold bool mdbx_txn_import_dbi(MDBX_txn *txn, MDBX_dbi dbi) {
MDBX_env *env = txn->mt_env;
if (dbi < CORE_DBS || dbi >= env->me_numdbs)
return false;
mdbx_ensure(env, mdbx_fastmutex_acquire(&env->me_dbi_lock) == MDBX_SUCCESS);
const unsigned snap_numdbs = env->me_numdbs;
mdbx_compiler_barrier();
for (unsigned i = CORE_DBS; i < snap_numdbs; ++i) {
if (i >= txn->mt_numdbs)
txn->mt_dbistate[i] = 0;
if (!(txn->mt_dbistate[i] & DBI_USRVALID) &&
(env->me_dbflags[i] & DB_VALID)) {
txn->mt_dbs[i].md_flags = env->me_dbflags[i] & DB_PERSISTENT_FLAGS;
txn->mt_dbistate[i] = DBI_VALID | DBI_USRVALID | DBI_STALE;
mdbx_tassert(txn, txn->mt_dbxs[i].md_cmp != NULL);
}
}
txn->mt_numdbs = snap_numdbs;
mdbx_ensure(env, mdbx_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
return txn->mt_dbistate[dbi] & DBI_USRVALID;
}
/* Check txn and dbi arguments to a function */ /* Check txn and dbi arguments to a function */
static __always_inline bool mdbx_txn_dbi_exists(MDBX_txn *txn, MDBX_dbi dbi, static __always_inline bool mdbx_txn_dbi_exists(MDBX_txn *txn, MDBX_dbi dbi,
unsigned validity) { unsigned validity) {
if (likely(dbi < txn->mt_numdbs && (txn->mt_dbistate[dbi] & validity))) if (likely(dbi < txn->mt_numdbs && (txn->mt_dbistate[dbi] & validity)))
return true; return true;
return mdbx_txn_import_dbi(txn, dbi); return dbi_import(txn, dbi);
} }
int mdbx_txn_commit(MDBX_txn *txn) { return __inline_mdbx_txn_commit(txn); } int mdbx_txn_commit(MDBX_txn *txn) { return __inline_mdbx_txn_commit(txn); }
@ -17533,18 +17541,8 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
goto early_bailout; goto early_bailout;
} }
if (txn->mt_numdbs < env->me_numdbs) {
/* Import handles from env */ /* Import handles from env */
for (unsigned i = txn->mt_numdbs; i < env->me_numdbs; ++i) { dbi_import_locked(txn);
txn->mt_dbistate[i] = 0;
if (env->me_dbflags[i] & DB_VALID) {
txn->mt_dbs[i].md_flags = env->me_dbflags[i] & DB_PERSISTENT_FLAGS;
txn->mt_dbistate[i] = DBI_VALID | DBI_USRVALID | DBI_STALE;
mdbx_tassert(txn, txn->mt_dbxs[i].md_cmp != NULL);
}
}
txn->mt_numdbs = env->me_numdbs;
}
/* Rescan after mutex acquisition & import handles */ /* Rescan after mutex acquisition & import handles */
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) { for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
@ -17604,18 +17602,16 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
txn->mt_dbistate[slot] = (uint8_t)dbiflags; txn->mt_dbistate[slot] = (uint8_t)dbiflags;
txn->mt_dbxs[slot].md_name.iov_base = namedup; txn->mt_dbxs[slot].md_name.iov_base = namedup;
txn->mt_dbxs[slot].md_name.iov_len = len; txn->mt_dbxs[slot].md_name.iov_len = len;
if ((txn->mt_flags & MDBX_TXN_RDONLY) == 0) txn->mt_dbiseqs[slot] = ++env->me_dbiseqs[slot];
txn->tw.cursors[slot] = NULL; if (!(dbiflags & DBI_CREAT))
txn->mt_numdbs += (slot == txn->mt_numdbs);
if ((dbiflags & DBI_CREAT) == 0) {
env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | DB_VALID; env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | DB_VALID;
if (txn->mt_numdbs == slot) {
mdbx_compiler_barrier(); mdbx_compiler_barrier();
if (env->me_numdbs <= slot) txn->mt_numdbs = env->me_numdbs = slot + 1;
env->me_numdbs = slot + 1; if (!(txn->mt_flags & MDBX_TXN_RDONLY))
} else { txn->tw.cursors[slot] = NULL;
env->me_dbiseqs[slot]++;
} }
txn->mt_dbiseqs[slot] = env->me_dbiseqs[slot]; mdbx_assert(env, env->me_numdbs > slot);
*dbi = slot; *dbi = slot;
} }