mdbx: поддержка не-печатных имен для subDb.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-12-13 17:08:39 +03:00
parent b247b081af
commit 7011743262
2 changed files with 115 additions and 69 deletions

23
mdbx.h
View File

@ -4163,6 +4163,8 @@ typedef int(MDBX_cmp_func)(const MDBX_val *a,
* by current thread. */
LIBMDBX_API int mdbx_dbi_open(MDBX_txn *txn, const char *name,
MDBX_db_flags_t flags, MDBX_dbi *dbi);
LIBMDBX_API int mdbx_dbi_open2(MDBX_txn *txn, const MDBX_val *name,
MDBX_db_flags_t flags, MDBX_dbi *dbi);
/** \deprecated Please
* \ref avoid_custom_comparators "avoid using custom comparators" and use
@ -4182,6 +4184,9 @@ LIBMDBX_API int mdbx_dbi_open(MDBX_txn *txn, const char *name,
MDBX_DEPRECATED LIBMDBX_API int
mdbx_dbi_open_ex(MDBX_txn *txn, const char *name, MDBX_db_flags_t flags,
MDBX_dbi *dbi, MDBX_cmp_func *keycmp, MDBX_cmp_func *datacmp);
MDBX_DEPRECATED LIBMDBX_API int
mdbx_dbi_open_ex2(MDBX_txn *txn, const MDBX_val *name, MDBX_db_flags_t flags,
MDBX_dbi *dbi, MDBX_cmp_func *keycmp, MDBX_cmp_func *datacmp);
/** \defgroup value2key Value-to-Key functions
* \brief Value-to-Key functions to
@ -5479,18 +5484,20 @@ typedef enum MDBX_page_type_t MDBX_page_type_t;
#endif
/** \brief Pseudo-name for MainDB */
#define MDBX_PGWALK_MAIN ((const char *)((ptrdiff_t)0))
#define MDBX_PGWALK_MAIN ((void *)((ptrdiff_t)0))
/** \brief Pseudo-name for GarbageCollectorDB */
#define MDBX_PGWALK_GC ((const char *)((ptrdiff_t)-1))
#define MDBX_PGWALK_GC ((void *)((ptrdiff_t)-1))
/** \brief Pseudo-name for MetaPages */
#define MDBX_PGWALK_META ((const char *)((ptrdiff_t)-2))
#define MDBX_PGWALK_META ((void *)((ptrdiff_t)-2))
/** \brief Callback function for traverse the b-tree. \see mdbx_env_pgwalk() */
typedef int MDBX_pgvisitor_func(
const uint64_t pgno, const unsigned number, void *const ctx, const int deep,
const char *const dbi, const size_t page_size, const MDBX_page_type_t type,
const MDBX_error_t err, const size_t nentries, const size_t payload_bytes,
const size_t header_bytes, const size_t unused_bytes) MDBX_CXX17_NOEXCEPT;
typedef int
MDBX_pgvisitor_func(const uint64_t pgno, const unsigned number, void *const ctx,
const int deep, const MDBX_val *dbi_name,
const size_t page_size, const MDBX_page_type_t type,
const MDBX_error_t err, const size_t nentries,
const size_t payload_bytes, const size_t header_bytes,
const size_t unused_bytes) MDBX_CXX17_NOEXCEPT;
/** \brief B-tree traversal function. */
LIBMDBX_API int mdbx_env_pgwalk(MDBX_txn *txn, MDBX_pgvisitor_func *visitor,

View File

@ -9489,14 +9489,18 @@ static void dbi_update(MDBX_txn *txn, int keep) {
if (keep) {
env->me_dbflags[i] = txn->mt_dbs[i].md_flags | DB_VALID;
} else {
char *ptr = env->me_dbxs[i].md_name.iov_base;
if (ptr) {
env->me_dbxs[i].md_name.iov_len = 0;
const MDBX_val name = env->me_dbxs[i].md_name;
if (name.iov_base) {
env->me_dbxs[i].md_name.iov_base = nullptr;
eASSERT(env, env->me_dbflags[i] == 0);
atomic_store32(&env->me_dbiseqs[i], dbi_seq(env, i),
mo_AcquireRelease);
env->me_dbxs[i].md_name.iov_base = NULL;
osal_free(ptr);
env->me_dbxs[i].md_name.iov_len = 0;
if (name.iov_len)
osal_free(name.iov_base);
} else {
eASSERT(env, name.iov_len == 0);
eASSERT(env, env->me_dbflags[i] == 0);
}
}
}
@ -9862,7 +9866,7 @@ __cold static int audit_ex(MDBX_txn *txn, size_t retired_stored,
if ((txn->mt_flags & MDBX_TXN_RDONLY) == 0) {
for (MDBX_dbi k = txn->mt_numdbs; --k > MAIN_DBI;) {
if ((txn->mt_dbistate[k] & DBI_VALID) &&
/* txn->mt_dbxs[k].md_name.iov_len > 0 && */
/* txn->mt_dbxs[k].md_name.iov_base && */
node_ks(node) == txn->mt_dbxs[k].md_name.iov_len &&
memcmp(node_key(node), txn->mt_dbxs[k].md_name.iov_base,
node_ks(node)) == 0) {
@ -14986,6 +14990,7 @@ __cold static int env_close(MDBX_env *env) {
if (env->me_dbxs) {
for (size_t i = CORE_DBS; i < env->me_numdbs; ++i)
if (env->me_dbxs[i].md_name.iov_len)
osal_free(env->me_dbxs[i].md_name.iov_base);
osal_free(env->me_dbxs);
env->me_numdbs = CORE_DBS;
@ -22160,8 +22165,8 @@ static int dbi_bind(MDBX_txn *txn, const MDBX_dbi dbi, unsigned user_flags,
return MDBX_SUCCESS;
}
static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
MDBX_dbi *dbi, MDBX_cmp_func *keycmp,
static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
unsigned user_flags, MDBX_dbi *dbi, MDBX_cmp_func *keycmp,
MDBX_cmp_func *datacmp) {
int rc = MDBX_EINVAL;
if (unlikely(!dbi))
@ -22198,17 +22203,30 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
}
/* main table? */
if (!table_name) {
if (table_name == MDBX_PGWALK_MAIN ||
table_name->iov_base == MDBX_PGWALK_MAIN) {
rc = dbi_bind(txn, MAIN_DBI, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
*dbi = MAIN_DBI;
return rc;
}
if (table_name == MDBX_PGWALK_GC || table_name->iov_base == MDBX_PGWALK_GC) {
rc = dbi_bind(txn, FREE_DBI, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
*dbi = FREE_DBI;
return rc;
}
if (table_name == MDBX_PGWALK_META ||
table_name->iov_base == MDBX_PGWALK_META) {
rc = MDBX_EINVAL;
goto early_bailout;
}
MDBX_env *env = txn->mt_env;
size_t len = strlen(table_name);
if (len > env->me_leaf_nodemax - NODESIZE - sizeof(MDBX_db))
MDBX_val key = *table_name;
MDBX_env *const env = txn->mt_env;
if (key.iov_len > env->me_leaf_nodemax - NODESIZE - sizeof(MDBX_db))
return MDBX_EINVAL;
if (txn->mt_dbxs[MAIN_DBI].md_cmp == NULL) {
@ -22221,13 +22239,14 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
/* Is the DB already open? */
MDBX_dbi scan, slot;
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
if (!txn->mt_dbxs[scan].md_name.iov_len) {
if (!txn->mt_dbxs[scan].md_name.iov_base) {
/* Remember this free slot */
slot = scan;
continue;
}
if (len == txn->mt_dbxs[scan].md_name.iov_len &&
!strncmp(table_name, txn->mt_dbxs[scan].md_name.iov_base, len)) {
if (key.iov_len == txn->mt_dbxs[scan].md_name.iov_len &&
!memcmp(key.iov_base, txn->mt_dbxs[scan].md_name.iov_base,
key.iov_len)) {
rc = dbi_bind(txn, scan, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
@ -22250,9 +22269,7 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
}
/* Find the DB info */
MDBX_val key, data;
key.iov_len = len;
key.iov_base = (void *)table_name;
MDBX_val data;
MDBX_cursor_couple couple;
rc = cursor_init(&couple.outer, txn, MAIN_DBI);
if (unlikely(rc != MDBX_SUCCESS))
@ -22281,16 +22298,21 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
}
/* Done here so we cannot fail after creating a new DB */
char *const namedup = osal_strdup(table_name);
if (unlikely(!namedup)) {
void *clone = nullptr;
if (key.iov_len) {
clone = osal_malloc(key.iov_len);
if (unlikely(!clone)) {
rc = MDBX_ENOMEM;
goto early_bailout;
}
key.iov_base = memcpy(clone, key.iov_base, key.iov_len);
} else
key.iov_base = "";
int err = osal_fastmutex_acquire(&env->me_dbi_lock);
if (unlikely(err != MDBX_SUCCESS)) {
rc = err;
osal_free(namedup);
osal_free(clone);
goto early_bailout;
}
@ -22299,13 +22321,14 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
/* Rescan after mutex acquisition & import handles */
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
if (!txn->mt_dbxs[scan].md_name.iov_len) {
if (!txn->mt_dbxs[scan].md_name.iov_base) {
/* Remember this free slot */
slot = scan;
continue;
}
if (len == txn->mt_dbxs[scan].md_name.iov_len &&
!strncmp(table_name, txn->mt_dbxs[scan].md_name.iov_base, len)) {
if (key.iov_len == txn->mt_dbxs[scan].md_name.iov_len &&
!memcmp(key.iov_base, txn->mt_dbxs[scan].md_name.iov_base,
key.iov_len)) {
rc = dbi_bind(txn, scan, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto later_bailout;
@ -22352,11 +22375,10 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
later_bailout:
*dbi = 0;
later_exit:
osal_free(namedup);
osal_free(clone);
} else {
txn->mt_dbistate[slot] = (uint8_t)dbiflags;
txn->mt_dbxs[slot].md_name.iov_base = namedup;
txn->mt_dbxs[slot].md_name.iov_len = len;
txn->mt_dbxs[slot].md_name = key;
txn->mt_dbiseqs[slot].weak = env->me_dbiseqs[slot].weak =
dbi_seq(env, slot);
if (!(dbiflags & DBI_CREAT))
@ -22377,15 +22399,41 @@ static int dbi_open(MDBX_txn *txn, const char *table_name, unsigned user_flags,
return rc;
}
int mdbx_dbi_open(MDBX_txn *txn, const char *table_name,
MDBX_db_flags_t table_flags, MDBX_dbi *dbi) {
return dbi_open(txn, table_name, table_flags, dbi, nullptr, nullptr);
static int dbi_open_cstr(MDBX_txn *txn, const char *name_cstr,
MDBX_db_flags_t flags, MDBX_dbi *dbi,
MDBX_cmp_func *keycmp, MDBX_cmp_func *datacmp) {
MDBX_val thunk, *name;
if (name_cstr == MDBX_PGWALK_MAIN || name_cstr == MDBX_PGWALK_GC ||
name_cstr == MDBX_PGWALK_META)
name = (void *)name_cstr;
else {
thunk.iov_len = strlen(name_cstr);
thunk.iov_base = (void *)name_cstr;
name = &thunk;
}
return dbi_open(txn, name, flags, dbi, keycmp, datacmp);
}
int mdbx_dbi_open_ex(MDBX_txn *txn, const char *table_name,
MDBX_db_flags_t table_flags, MDBX_dbi *dbi,
int mdbx_dbi_open(MDBX_txn *txn, const char *name, MDBX_db_flags_t flags,
MDBX_dbi *dbi) {
return dbi_open_cstr(txn, name, flags, dbi, nullptr, nullptr);
}
int mdbx_dbi_open2(MDBX_txn *txn, const MDBX_val *name, MDBX_db_flags_t flags,
MDBX_dbi *dbi) {
return dbi_open(txn, name, flags, dbi, nullptr, nullptr);
}
int mdbx_dbi_open_ex(MDBX_txn *txn, const char *name, MDBX_db_flags_t flags,
MDBX_dbi *dbi, MDBX_cmp_func *keycmp,
MDBX_cmp_func *datacmp) {
return dbi_open_cstr(txn, name, flags, dbi, keycmp, datacmp);
}
int mdbx_dbi_open_ex2(MDBX_txn *txn, const MDBX_val *name,
MDBX_db_flags_t flags, MDBX_dbi *dbi,
MDBX_cmp_func *keycmp, MDBX_cmp_func *datacmp) {
return dbi_open(txn, table_name, table_flags, dbi, keycmp, datacmp);
return dbi_open(txn, name, flags, dbi, keycmp, datacmp);
}
__cold int mdbx_dbi_stat(MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *dest,
@ -23064,7 +23112,7 @@ typedef struct mdbx_walk_ctx {
} mdbx_walk_ctx_t;
__cold static int walk_sdb(mdbx_walk_ctx_t *ctx, MDBX_db *const sdb,
const char *name, int deep);
const MDBX_val *name, int deep);
static MDBX_page_type_t walk_page_type(const MDBX_page *mp) {
if (mp)
@ -23085,7 +23133,8 @@ static MDBX_page_type_t walk_page_type(const MDBX_page *mp) {
/* Depth-first tree traversal. */
__cold static int walk_tree(mdbx_walk_ctx_t *ctx, const pgno_t pgno,
const char *name, int deep, txnid_t parent_txnid) {
const MDBX_val *name, int deep,
txnid_t parent_txnid) {
assert(pgno != P_INVALID);
MDBX_page *mp = nullptr;
int err = page_get(ctx->mw_cursor, pgno, &mp, parent_txnid);
@ -23251,33 +23300,22 @@ __cold static int walk_tree(mdbx_walk_ctx_t *ctx, const pgno_t pgno,
}
assert(type == MDBX_page_leaf);
MDBX_db db;
switch (node_flags(node)) {
default:
continue;
case F_SUBDATA /* sub-db */: {
const size_t namelen = node_ks(node);
if (unlikely(namelen == 0 || node_ds(node) != sizeof(MDBX_db))) {
case F_SUBDATA /* sub-db */:
if (unlikely(node_ds(node) != sizeof(MDBX_db))) {
assert(err == MDBX_CORRUPTED);
err = MDBX_CORRUPTED;
break;
}
char namebuf_onstask[64];
char *const sub_name = (namelen < sizeof(namebuf_onstask))
? namebuf_onstask
: osal_malloc(namelen + 1);
if (unlikely(!sub_name))
return MDBX_ENOMEM;
memcpy(sub_name, node_key(node), namelen);
sub_name[namelen] = 0;
} else {
MDBX_db db;
memcpy(&db, node_data(node), sizeof(db));
const MDBX_val subdb_name = {node_key(node), node_ks(node)};
assert(err == MDBX_SUCCESS);
err = walk_sdb(ctx, &db, sub_name, deep + 1);
if (sub_name != namebuf_onstask)
osal_free(sub_name);
} break;
err = walk_sdb(ctx, &db, &subdb_name, deep + 1);
}
break;
case F_SUBDATA | F_DUPDATA /* dupsorted sub-tree */:
if (unlikely(node_ds(node) != sizeof(MDBX_db) ||
@ -23285,6 +23323,7 @@ __cold static int walk_tree(mdbx_walk_ctx_t *ctx, const pgno_t pgno,
assert(err == MDBX_CORRUPTED);
err = MDBX_CORRUPTED;
} else {
MDBX_db db;
memcpy(&db, node_data(node), sizeof(db));
assert(ctx->mw_cursor->mc_xcursor ==
&container_of(ctx->mw_cursor, MDBX_cursor_couple, outer)->inner);
@ -23308,7 +23347,7 @@ __cold static int walk_tree(mdbx_walk_ctx_t *ctx, const pgno_t pgno,
}
__cold static int walk_sdb(mdbx_walk_ctx_t *ctx, MDBX_db *const sdb,
const char *name, int deep) {
const MDBX_val *name, int deep) {
if (unlikely(sdb->md_root == P_INVALID))
return MDBX_SUCCESS; /* empty db */