mdbx: rework mdbx_dbi_open().

This commit is contained in:
Leo Yuriev 2017-05-15 21:18:52 +03:00
parent 5fdad46cb9
commit ed46246931
2 changed files with 116 additions and 150 deletions

52
mdbx.h
View File

@ -1137,58 +1137,6 @@ LIBMDBX_API int mdbx_dbi_close(MDB_env *env, MDB_dbi dbi);
*/
LIBMDBX_API int mdbx_drop(MDB_txn *txn, MDB_dbi dbi, int del);
/* Set a custom key comparison function for a database.
*
* The comparison function is called whenever it is necessary to compare a
* key specified by the application with a key currently stored in the
*database.
* If no comparison function is specified, and no special key flags were
*specified
* with mdbx_dbi_open(), the keys are compared lexically, with shorter keys
*collating
* before longer keys.
* Warning: This function must be called before any data access functions are
*used,
* otherwise data corruption may occur. The same comparison function must be
*used by every
* program accessing the database, every time the database is used.
* [in] txn A transaction handle returned by mdbx_txn_begin()
* [in] dbi A database handle returned by mdbx_dbi_open()
* [in] cmp A MDB_cmp_func function
* Returns A non-zero error value on failure and 0 on success. Some possible
* errors are:
* - EINVAL - an invalid parameter was specified.
*/
LIBMDBX_API int mdbx_set_compare(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp);
/* Set a custom data comparison function for a MDB_DUPSORT database.
*
* This comparison function is called whenever it is necessary to compare a
*data
* item specified by the application with a data item currently stored in the
*database.
* This function only takes effect if the database was opened with the
*MDB_DUPSORT
* flag.
* If no comparison function is specified, and no special key flags were
*specified
* with mdbx_dbi_open(), the data items are compared lexically, with shorter
*items collating
* before longer items.
* Warning: This function must be called before any data access functions are
*used,
* otherwise data corruption may occur. The same comparison function must be
*used by every
* program accessing the database, every time the database is used.
* [in] txn A transaction handle returned by mdbx_txn_begin()
* [in] dbi A database handle returned by mdbx_dbi_open()
* [in] cmp A MDB_cmp_func function
* Returns A non-zero error value on failure and 0 on success. Some possible
* errors are:
* - EINVAL - an invalid parameter was specified.
*/
LIBMDBX_API int mdbx_set_dupsort(MDB_txn *txn, MDB_dbi dbi, MDB_cmp_func *cmp);
/* Get items from a database.
*
* This function retrieves key/data pairs from the database. The address

View File

@ -2435,12 +2435,11 @@ size_t mdbx_txn_id(MDB_txn *txn) {
/** Export or close DBI handles opened in this txn. */
static void mdbx_dbis_update(MDB_txn *txn, int keep) {
int i;
MDB_dbi n = txn->mt_numdbs;
MDB_env *env = txn->mt_env;
unsigned char *tdbflags = txn->mt_dbflags;
for (i = n; --i >= CORE_DBS;) {
for (unsigned i = n; --i >= CORE_DBS;) {
if (tdbflags[i] & DB_NEW) {
if (keep) {
env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID;
@ -8706,144 +8705,177 @@ static MDB_cmp_func *mdbx_default_datacmp(unsigned flags) {
: mdbx_cmp_memn));
}
/** Set the default comparison functions for a database.
* Called immediately after a database is opened to set the defaults.
* The user can then override them with #mdbx_set_compare() or
* #mdbx_set_dupsort().
* @param[in] txn A transaction handle returned by #mdbx_txn_begin()
* @param[in] dbi A database handle returned by #mdbx_dbi_open()
*/
static void mdbx_default_cmp(MDB_txn *txn, MDB_dbi dbi) {
unsigned flags = txn->mt_dbs[dbi].md_flags;
txn->mt_dbxs[dbi].md_cmp = mdbx_default_keycmp(flags);
txn->mt_dbxs[dbi].md_dcmp = mdbx_default_datacmp(flags);
static int mdbx_dbi_bind(MDB_txn *txn, const MDB_dbi dbi, unsigned user_flags,
MDB_cmp_func *keycmp, MDB_cmp_func *datacmp) {
/* LY: so, accepting only three cases for the table's flags:
* 1) user_flags and both comparators are zero
* = assume that a by-default mode/flags is requested for reading;
* 2) user_flags exactly the same
* = assume that the target mode/flags are requested properly;
* 3) user_flags differs, but table is empty and MDB_CREATE is provided
* = assume that a properly create request with custom flags;
*/
if ((user_flags ^ txn->mt_dbs[dbi].md_flags) & PERSISTENT_FLAGS) {
/* flags ara differs, check other conditions */
if (!user_flags && (!keycmp || keycmp == txn->mt_dbxs[dbi].md_cmp) &&
(!datacmp || datacmp == txn->mt_dbxs[dbi].md_dcmp)) {
/* no comparators were provided and flags are zero,
* seems that is case #1 above */
user_flags = txn->mt_dbs[dbi].md_flags;
} else if ((user_flags & MDB_CREATE) && txn->mt_dbs[dbi].md_entries == 0) {
if (txn->mt_flags & MDB_TXN_RDONLY)
return /* FIXME: return extended info */ MDBX_EACCESS;
/* make sure flags changes get committed */
txn->mt_dbs[dbi].md_flags = user_flags & PERSISTENT_FLAGS;
txn->mt_flags |= MDB_TXN_DIRTY;
} else {
return /* FIXME: return extended info */ MDB_INCOMPATIBLE;
}
}
if (!txn->mt_dbxs[dbi].md_cmp || MDB_DEBUG) {
if (!keycmp)
keycmp = mdbx_default_keycmp(user_flags);
assert(!txn->mt_dbxs[dbi].md_cmp || txn->mt_dbxs[dbi].md_cmp == keycmp);
txn->mt_dbxs[dbi].md_cmp = keycmp;
}
if (!txn->mt_dbxs[dbi].md_dcmp || MDB_DEBUG) {
if (!datacmp)
datacmp = mdbx_default_datacmp(user_flags);
assert(!txn->mt_dbxs[dbi].md_dcmp || txn->mt_dbxs[dbi].md_dcmp == datacmp);
txn->mt_dbxs[dbi].md_dcmp = datacmp;
}
return MDB_SUCCESS;
}
int mdbx_dbi_open(MDB_txn *txn, const char *name, unsigned flags,
MDB_dbi *dbi) {
MDB_val key, data;
MDB_dbi i;
MDB_cursor mc;
int rc, dbflag, exact;
unsigned unused = 0, seq;
char *namedup;
size_t len;
if (unlikely(!txn || !dbi))
int mdbx_dbi_open_ex(MDB_txn *txn, const char *table_name, unsigned user_flags,
MDB_dbi *dbi, MDB_cmp_func *keycmp,
MDB_cmp_func *datacmp) {
if (unlikely(!txn || !dbi || (user_flags & ~VALID_FLAGS) != 0))
return MDBX_EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDBX_EBADSIGN;
if (unlikely(flags & ~VALID_FLAGS))
return MDBX_EINVAL;
if (unlikely(txn->mt_flags & MDB_TXN_BLOCKED))
return MDB_BAD_TXN;
/* main DB? */
if (!name) {
/* main table? */
if (!table_name) {
*dbi = MAIN_DBI;
if (flags & PERSISTENT_FLAGS) {
uint16_t f2 = flags & PERSISTENT_FLAGS;
/* make sure flag changes get committed */
if ((txn->mt_dbs[MAIN_DBI].md_flags | f2) !=
txn->mt_dbs[MAIN_DBI].md_flags) {
txn->mt_dbs[MAIN_DBI].md_flags |= f2;
txn->mt_flags |= MDB_TXN_DIRTY;
}
}
mdbx_default_cmp(txn, MAIN_DBI);
return MDB_SUCCESS;
return mdbx_dbi_bind(txn, MAIN_DBI, user_flags, keycmp, datacmp);
}
if (txn->mt_dbxs[MAIN_DBI].md_cmp == NULL) {
mdbx_default_cmp(txn, MAIN_DBI);
txn->mt_dbxs[MAIN_DBI].md_cmp =
mdbx_default_keycmp(txn->mt_dbs[MAIN_DBI].md_flags);
txn->mt_dbxs[MAIN_DBI].md_dcmp =
mdbx_default_datacmp(txn->mt_dbs[MAIN_DBI].md_flags);
}
/* Is the DB already open? */
len = strlen(name);
for (i = CORE_DBS; i < txn->mt_numdbs; i++) {
if (!txn->mt_dbxs[i].md_name.mv_size) {
size_t len = strlen(table_name);
MDB_dbi scan, slot = txn->mt_numdbs;
for (scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
if (!txn->mt_dbxs[scan].md_name.mv_size) {
/* Remember this free slot */
if (!unused)
unused = i;
slot = scan;
continue;
}
if (len == txn->mt_dbxs[i].md_name.mv_size &&
!strncmp(name, txn->mt_dbxs[i].md_name.mv_data, len)) {
*dbi = i;
return MDB_SUCCESS;
if (len == txn->mt_dbxs[scan].md_name.mv_size &&
!strncmp(table_name, txn->mt_dbxs[scan].md_name.mv_data, len)) {
*dbi = scan;
return mdbx_dbi_bind(txn, scan, user_flags, keycmp, datacmp);
}
}
/* If no free slot and max hit, fail */
if (!unused && unlikely(txn->mt_numdbs >= txn->mt_env->me_maxdbs))
/* Fail, if no free slot and max hit */
if (unlikely(slot >= txn->mt_env->me_maxdbs))
return MDB_DBS_FULL;
/* Cannot mix named databases with some mainDB flags */
/* Cannot mix named table with some main-table flags */
if (unlikely(txn->mt_dbs[MAIN_DBI].md_flags & (MDB_DUPSORT | MDB_INTEGERKEY)))
return (flags & MDB_CREATE) ? MDB_INCOMPATIBLE : MDB_NOTFOUND;
return (user_flags & MDB_CREATE) ? MDB_INCOMPATIBLE : MDB_NOTFOUND;
/* Find the DB info */
dbflag = DB_NEW | DB_VALID | DB_USRVALID;
exact = 0;
int exact = 0;
MDB_val key, data;
key.mv_size = len;
key.mv_data = (void *)name;
key.mv_data = (void *)table_name;
MDB_cursor mc;
mdbx_cursor_init(&mc, txn, MAIN_DBI, NULL);
rc = mdbx_cursor_set(&mc, &key, &data, MDB_SET, &exact);
if (likely(rc == MDB_SUCCESS)) {
/* make sure this is actually a DB */
int rc = mdbx_cursor_set(&mc, &key, &data, MDB_SET, &exact);
if (unlikely(rc != MDB_SUCCESS)) {
if (rc != MDB_NOTFOUND || !(user_flags & MDB_CREATE))
return rc;
} else {
/* make sure this is actually a table */
MDB_node *node = NODEPTR(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]);
if (unlikely((node->mn_flags & (F_DUPDATA | F_SUBDATA)) != F_SUBDATA))
return MDB_INCOMPATIBLE;
} else if (!(rc == MDB_NOTFOUND && (flags & MDB_CREATE))) {
return rc;
}
/* FIXME: locking to avoid races ? */
/* Done here so we cannot fail after creating a new DB */
if (unlikely((namedup = mdbx_strdup(name)) == NULL))
char *namedup = mdbx_strdup(table_name);
if (unlikely(!namedup))
return MDBX_ENOMEM;
/* FIXME: lock here (to avoid races !!!) */
unsigned dbflag = DB_NEW | DB_VALID | DB_USRVALID;
if (unlikely(rc)) {
MDB_db db_dummy;
/* MDB_NOTFOUND and MDB_CREATE: Create new DB */
assert(rc == MDB_NOTFOUND);
MDB_db db_dummy;
memset(&db_dummy, 0, sizeof(db_dummy));
db_dummy.md_root = P_INVALID;
db_dummy.md_flags = flags & PERSISTENT_FLAGS;
db_dummy.md_flags = user_flags & PERSISTENT_FLAGS;
data.mv_size = sizeof(db_dummy);
data.mv_data = &db_dummy;
WITH_CURSOR_TRACKING(mc, rc = mdbx_cursor_put(&mc, &key, &data, F_SUBDATA));
WITH_CURSOR_TRACKING(mc, rc = mdbx_cursor_put(&mc, &key, &data,
F_SUBDATA | MDB_NOOVERWRITE));
if (unlikely(rc != MDB_SUCCESS))
goto bailout;
dbflag |= DB_DIRTY;
}
if (unlikely(rc)) {
/* Got info, register DBI in this txn */
txn->mt_dbxs[slot].md_name.mv_data = namedup;
txn->mt_dbxs[slot].md_name.mv_size = len;
txn->mt_dbxs[slot].md_cmp = nullptr;
txn->mt_dbxs[slot].md_dcmp = nullptr;
txn->mt_dbflags[slot] = dbflag;
txn->mt_dbiseqs[slot] = (txn->mt_env->me_dbiseqs[slot] += 1);
memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db));
rc = mdbx_dbi_bind(txn, slot, user_flags, keycmp, datacmp);
if (unlikely(rc != MDB_SUCCESS)) {
assert((dbflag & DB_DIRTY) == 0);
/* cleanup slot */
txn->mt_dbxs[slot].md_name.mv_data = NULL;
txn->mt_dbxs[slot].md_name.mv_size = 0;
txn->mt_dbflags[slot] = 0;
bailout:
free(namedup);
} else {
/* Got info, register DBI in this txn */
unsigned slot = unused ? unused : txn->mt_numdbs;
txn->mt_dbxs[slot].md_name.mv_data = namedup;
txn->mt_dbxs[slot].md_name.mv_size = len;
txn->mt_dbflags[slot] = dbflag;
/* txn-> and env-> are the same in read txns, use
* tmp variable to avoid undefined assignment
*/
seq = ++txn->mt_env->me_dbiseqs[slot];
txn->mt_dbiseqs[slot] = seq;
memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db));
*dbi = slot;
mdbx_default_cmp(txn, slot);
if (!unused) {
if (slot == txn->mt_numdbs)
txn->mt_numdbs++;
}
}
/* FIXME: unlock here (to avoid races !!!) */
return rc;
}
int mdbx_dbi_open(MDB_txn *txn, const char *table_name, unsigned table_flags,
MDB_dbi *dbi) {
return mdbx_dbi_open_ex(txn, table_name, table_flags, dbi, nullptr, nullptr);
}
int __cold mdbx_dbi_stat(MDB_txn *txn, MDB_dbi dbi, MDBX_stat *arg,
size_t bytes) {
if (unlikely(!arg || !txn))
@ -10214,20 +10246,6 @@ int mdbx_is_dirty(const MDB_txn *txn, const void *ptr) {
return MDBX_RESULT_TRUE;
}
int mdbx_dbi_open_ex(MDB_txn *txn, const char *name, unsigned flags,
MDB_dbi *pdbi, MDB_cmp_func *keycmp,
MDB_cmp_func *datacmp) {
int rc = mdbx_dbi_open(txn, name, flags, pdbi);
if (likely(rc == MDB_SUCCESS)) {
MDB_dbi dbi = *pdbi;
unsigned md_flags = txn->mt_dbs[dbi].md_flags;
txn->mt_dbxs[dbi].md_cmp = keycmp ? keycmp : mdbx_default_keycmp(md_flags);
txn->mt_dbxs[dbi].md_dcmp =
datacmp ? datacmp : mdbx_default_datacmp(md_flags);
}
return rc;
}
int mdbx_dbi_sequence(MDB_txn *txn, MDB_dbi dbi, uint64_t *result,
uint64_t increment) {
if (unlikely(!txn))