mdbx: immediately share dbi-handles for present TBLs (major).

Change-Id: I75c998e06b3ccc25bd22ef389d119052d524d70b
This commit is contained in:
Leo Yuriev 2018-02-04 12:57:36 +03:00
parent e8ae506773
commit 6a0ff097ee
3 changed files with 132 additions and 51 deletions

3
mdbx.h
View File

@ -1155,7 +1155,8 @@ LIBMDBX_API int mdbx_dbi_stat(MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *stat,
* Returns A non-zero error value on failure and 0 on success. */
#define MDBX_TBL_DIRTY 0x01 /* DB was written in this txn */
#define MDBX_TBL_STALE 0x02 /* Named-DB record is older than txnID */
#define MDBX_TBL_NEW 0x04 /* Named-DB handle opened in this txn */
#define MDBX_TBL_FRESH 0x04 /* Named-DB handle opened in this txn */
#define MDBX_TBL_CREAT 0x08 /* Named-DB handle created in this txn */
LIBMDBX_API int mdbx_dbi_flags_ex(MDBX_txn *txn, MDBX_dbi dbi, unsigned *flags,
unsigned *state);
LIBMDBX_API int mdbx_dbi_flags(MDBX_txn *txn, MDBX_dbi dbi, unsigned *flags);

View File

@ -539,10 +539,11 @@ struct MDBX_txn {
/* Transaction DB Flags */
#define DB_DIRTY MDBX_TBL_DIRTY /* DB was written in this txn */
#define DB_STALE MDBX_TBL_STALE /* Named-DB record is older than txnID */
#define DB_NEW MDBX_TBL_NEW /* Named-DB handle opened in this txn */
#define DB_VALID 0x08 /* DB handle is valid, see also MDBX_VALID */
#define DB_USRVALID 0x10 /* As DB_VALID, but not set for FREE_DBI */
#define DB_DUPDATA 0x20 /* DB is MDBX_DUPSORT data */
#define DB_FRESH MDBX_TBL_FRESH /* Named-DB handle opened in this txn */
#define DB_CREAT MDBX_TBL_CREAT /* Named-DB handle created in this txn */
#define DB_VALID 0x10 /* DB handle is valid, see also MDBX_VALID */
#define DB_USRVALID 0x20 /* As DB_VALID, but not set for FREE_DBI */
#define DB_DUPDATA 0x40 /* DB is MDBX_DUPSORT data */
/* In write txns, array of cursors for each DB */
MDBX_cursor **mt_cursors;
/* Array of flags for each DB */
@ -1179,14 +1180,6 @@ static __inline void SETDSZ(MDBX_node *node, size_t size) {
#define MDBX_COMMIT_PAGES IOV_MAX
#endif
/* Check txn and dbi arguments to a function */
#define TXN_DBI_EXIST(txn, dbi, validity) \
((dbi) < (txn)->mt_numdbs && ((txn)->mt_dbflags[dbi] & (validity)))
/* Check for misused dbi handles */
#define TXN_DBI_CHANGED(txn, dbi) \
((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
/* LY: fast enough on most systems
*
* /

View File

@ -2671,6 +2671,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
/* Setup db info */
txn->mt_numdbs = env->me_numdbs;
mdbx_compiler_barrier();
for (unsigned i = CORE_DBS; i < txn->mt_numdbs; i++) {
unsigned x = env->me_dbflags[i];
txn->mt_dbs[i].md_flags = x & PERSISTENT_FLAGS;
@ -2823,7 +2824,7 @@ int mdbx_txn_begin(MDBX_env *env, MDBX_txn *parent, unsigned flags,
memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDBX_db));
/* Copy parent's mt_dbflags, but clear DB_NEW */
for (i = 0; i < txn->mt_numdbs; i++)
txn->mt_dbflags[i] = parent->mt_dbflags[i] & ~DB_NEW;
txn->mt_dbflags[i] = parent->mt_dbflags[i] & ~(DB_FRESH | DB_CREAT);
rc = 0;
ntxn = (MDBX_ntxn *)txn;
ntxn->mnt_pgstate =
@ -2879,27 +2880,39 @@ uint64_t mdbx_txn_id(MDBX_txn *txn) {
static void mdbx_dbis_update(MDBX_txn *txn, int keep) {
MDBX_dbi n = txn->mt_numdbs;
if (n) {
bool locked = false;
MDBX_env *env = txn->mt_env;
uint8_t *tdbflags = txn->mt_dbflags;
for (unsigned i = n; --i >= CORE_DBS;) {
if (tdbflags[i] & DB_NEW) {
if (likely((tdbflags[i] & DB_CREAT) == 0))
continue;
if (!locked) {
mdbx_ensure(env,
mdbx_fastmutex_acquire(&env->me_dbi_lock) == MDBX_SUCCESS);
locked = true;
}
if (keep) {
env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDBX_VALID;
mdbx_compiler_barrier();
if (env->me_numdbs <= i)
env->me_numdbs = i + 1;
} else {
char *ptr = env->me_dbxs[i].md_name.iov_base;
if (ptr) {
env->me_dbxs[i].md_name.iov_base = NULL;
env->me_dbxs[i].md_name.iov_len = 0;
env->me_dbflags[i] = 0;
mdbx_compiler_barrier();
assert(env->me_dbflags[i] == 0);
env->me_dbiseqs[i]++;
env->me_dbxs[i].md_name.iov_base = NULL;
free(ptr);
}
}
}
}
if (keep && env->me_numdbs < n)
env->me_numdbs = n;
if (unlikely(locked))
mdbx_ensure(env,
mdbx_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
}
}
@ -2916,9 +2929,6 @@ static int mdbx_txn_end(MDBX_txn *txn, unsigned mode) {
return MDBX_PANIC;
}
/* Export or close DBI handles opened in this txn */
mdbx_dbis_update(txn, mode & MDBX_END_UPDATE);
mdbx_debug("%s txn %" PRIaTXN "%c %p on mdbenv %p, root page %" PRIaPGNO
"/%" PRIaPGNO,
names[mode & MDBX_END_OPMASK], txn->mt_txnid,
@ -2941,6 +2951,9 @@ static int mdbx_txn_end(MDBX_txn *txn, unsigned mode) {
txn->mt_flags = MDBX_TXN_RDONLY | MDBX_TXN_FINISHED;
txn->mt_owner = 0;
} else if (!F_ISSET(txn->mt_flags, MDBX_TXN_FINISHED)) {
/* Export or close DBI handles created in this txn */
mdbx_dbis_update(txn, mode & MDBX_END_UPDATE);
pgno_t *pghead = env->me_reclaimed_pglist;
if (!(mode & MDBX_END_EOTDONE)) /* !(already closed cursors) */
@ -3564,6 +3577,44 @@ done:
return MDBX_SUCCESS;
}
/* Check for misused dbi handles */
#define TXN_DBI_CHANGED(txn, dbi) \
((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
/* Import DBI which opened after txn started into context */
static __cold bool mdbx_txn_import_dbi(MDBX_txn *txn, MDBX_dbi dbi) {
MDBX_env *env = txn->mt_env;
if (dbi < CORE_DBS || dbi >= env->me_numdbs)
return false;
mdbx_ensure(env, mdbx_fastmutex_acquire(&env->me_dbi_lock) == MDBX_SUCCESS);
const unsigned snap_numdbs = env->me_numdbs;
mdbx_compiler_barrier();
for (unsigned i = CORE_DBS; i < snap_numdbs; ++i) {
if (i >= txn->mt_numdbs)
txn->mt_dbflags[i] = 0;
if (!(txn->mt_dbflags[i] & DB_USRVALID) &&
(env->me_dbflags[i] & MDBX_VALID)) {
txn->mt_dbs[i].md_flags = env->me_dbflags[i] & PERSISTENT_FLAGS;
txn->mt_dbflags[i] = DB_VALID | DB_USRVALID | DB_STALE;
assert(txn->mt_dbxs[i].md_cmp != NULL);
}
}
txn->mt_numdbs = snap_numdbs;
mdbx_ensure(env, mdbx_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
return txn->mt_dbflags[dbi] & DB_USRVALID;
}
/* Check txn and dbi arguments to a function */
static __inline bool TXN_DBI_EXIST(MDBX_txn *txn, MDBX_dbi dbi,
unsigned validity) {
if (likely(dbi < txn->mt_numdbs && (txn->mt_dbflags[dbi] & validity)))
return true;
return mdbx_txn_import_dbi(txn, dbi);
}
int mdbx_txn_commit(MDBX_txn *txn) {
int rc;
@ -3646,7 +3697,7 @@ int mdbx_txn_commit(MDBX_txn *txn) {
for (i = CORE_DBS; i < txn->mt_numdbs; i++) {
/* preserve parent's DB_NEW status */
parent->mt_dbflags[i] =
txn->mt_dbflags[i] | (parent->mt_dbflags[i] & DB_NEW);
txn->mt_dbflags[i] | (parent->mt_dbflags[i] & (DB_CREAT | DB_FRESH));
}
dst = parent->mt_rw_dirtylist;
@ -10216,8 +10267,8 @@ int mdbx_dbi_open_ex(MDBX_txn *txn, const char *table_name, unsigned user_flags,
/* Is the DB already open? */
size_t len = strlen(table_name);
MDBX_dbi scan, slot = txn->mt_numdbs;
for (scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
MDBX_dbi scan, slot;
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
if (!txn->mt_dbxs[scan].md_name.iov_len) {
/* Remember this free slot */
slot = scan;
@ -10272,7 +10323,38 @@ int mdbx_dbi_open_ex(MDBX_txn *txn, const char *table_name, unsigned user_flags,
return err;
}
unsigned dbflag = DB_NEW | DB_VALID | DB_USRVALID;
if (txn->mt_numdbs < env->me_numdbs) {
for (unsigned i = txn->mt_numdbs; i < env->me_numdbs; ++i) {
txn->mt_dbflags[i] = 0;
if (env->me_dbflags[i] & MDBX_VALID) {
txn->mt_dbs[i].md_flags = env->me_dbflags[i] & PERSISTENT_FLAGS;
txn->mt_dbflags[i] = DB_VALID | DB_USRVALID | DB_STALE;
assert(txn->mt_dbxs[i].md_cmp != NULL);
}
}
txn->mt_numdbs = env->me_numdbs;
}
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
if (!txn->mt_dbxs[scan].md_name.iov_len) {
/* Remember this free slot */
slot = scan;
continue;
}
if (len == txn->mt_dbxs[scan].md_name.iov_len &&
!strncmp(table_name, txn->mt_dbxs[scan].md_name.iov_base, len)) {
*dbi = scan;
rc = mdbx_dbi_bind(txn, scan, user_flags, keycmp, datacmp);
goto unlock_return_rc;
}
}
if (unlikely(slot >= env->me_maxdbs)) {
rc = MDBX_DBS_FULL;
goto unlock_return_rc;
}
unsigned dbflag = DB_FRESH | DB_VALID | DB_USRVALID;
if (unlikely(rc)) {
/* MDBX_NOTFOUND and MDBX_CREATE: Create new DB */
assert(rc == MDBX_NOTFOUND);
@ -10289,33 +10371,37 @@ int mdbx_dbi_open_ex(MDBX_txn *txn, const char *table_name, unsigned user_flags,
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
dbflag |= DB_DIRTY;
dbflag |= DB_DIRTY | DB_CREAT;
}
/* Got info, register DBI in this txn */
txn->mt_dbxs[slot].md_name.iov_base = namedup;
txn->mt_dbxs[slot].md_name.iov_len = len;
txn->mt_dbxs[slot].md_cmp = nullptr;
txn->mt_dbxs[slot].md_dcmp = nullptr;
txn->mt_dbflags[slot] = (uint8_t)dbflag;
txn->mt_dbiseqs[slot] = (env->me_dbiseqs[slot] += 1);
txn->mt_dbs[slot] = *(MDBX_db *)data.iov_base;
env->me_dbflags[slot] = 0;
rc = mdbx_dbi_bind(txn, slot, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS)) {
assert((dbflag & DB_DIRTY) == 0);
/* cleanup slot */
txn->mt_dbxs[slot].md_name.iov_base = NULL;
txn->mt_dbxs[slot].md_name.iov_len = 0;
txn->mt_dbflags[slot] = 0;
assert((dbflag & DB_CREAT) == 0);
bailout:
free(namedup);
} else {
*dbi = slot;
txn->mt_dbiseqs[slot] = (env->me_dbiseqs[slot] += 1);
txn->mt_dbflags[slot] = (uint8_t)dbflag;
txn->mt_dbxs[slot].md_name.iov_base = namedup;
mdbx_compiler_barrier();
txn->mt_dbxs[slot].md_name.iov_len = len;
if (slot == txn->mt_numdbs)
txn->mt_numdbs++;
if ((dbflag & DB_CREAT) == 0) {
env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | MDBX_VALID;
mdbx_compiler_barrier();
if (env->me_numdbs <= slot)
env->me_numdbs = slot + 1;
}
*dbi = slot;
}
unlock_return_rc:
mdbx_ensure(env, mdbx_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
return rc;
}
@ -10363,10 +10449,11 @@ static int mdbx_dbi_close_locked(MDBX_env *env, MDBX_dbi dbi) {
if (unlikely(!ptr))
return MDBX_BAD_DBI;
env->me_dbxs[dbi].md_name.iov_base = NULL;
env->me_dbxs[dbi].md_name.iov_len = 0;
env->me_dbiseqs[dbi]++;
env->me_dbflags[dbi] = 0;
env->me_dbxs[dbi].md_name.iov_len = 0;
mdbx_compiler_barrier();
env->me_dbiseqs[dbi]++;
env->me_dbxs[dbi].md_name.iov_base = NULL;
free(ptr);
return MDBX_SUCCESS;
}
@ -10398,7 +10485,7 @@ int mdbx_dbi_flags_ex(MDBX_txn *txn, MDBX_dbi dbi, unsigned *flags,
return MDBX_EINVAL;
*flags = txn->mt_dbs[dbi].md_flags & PERSISTENT_FLAGS;
*state = txn->mt_dbflags[dbi] & (DB_NEW | DB_DIRTY | DB_STALE);
*state = txn->mt_dbflags[dbi] & (DB_FRESH | DB_CREAT | DB_DIRTY | DB_STALE);
return MDBX_SUCCESS;
}