mdbx: refactoring fetch/refresh/create/open tables and DBI-handles.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev)
2025-11-05 02:54:43 +03:00
parent 53637e99d8
commit b015557bb3
11 changed files with 164 additions and 107 deletions

View File

@@ -241,7 +241,7 @@ __cold int mdbx_dbi_stat(const MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *dest, siz
return LOG_IFERR(MDBX_BAD_TXN); return LOG_IFERR(MDBX_BAD_TXN);
if (unlikely(txn->dbi_state[dbi] & DBI_STALE)) { if (unlikely(txn->dbi_state[dbi] & DBI_STALE)) {
rc = tbl_fetch((MDBX_txn *)txn, dbi); rc = tbl_refresh((MDBX_txn *)txn, dbi);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc); return LOG_IFERR(rc);
} }

View File

@@ -37,7 +37,7 @@ int mdbx_dbi_sequence(MDBX_txn *txn, MDBX_dbi dbi, uint64_t *result, uint64_t in
return LOG_IFERR(rc); return LOG_IFERR(rc);
if (unlikely(txn->dbi_state[dbi] & DBI_STALE)) { if (unlikely(txn->dbi_state[dbi] & DBI_STALE)) {
rc = tbl_fetch(txn, dbi); rc = tbl_refresh_absent2baddbi(txn, dbi);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc); return LOG_IFERR(rc);
} }

View File

@@ -200,10 +200,16 @@ static inline bool check_table_flags(unsigned flags) {
} }
} }
static inline int tbl_setup_ifneed(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *const db) { MDBX_MAYBE_UNUSED static inline int tbl_setup_ifneed(const MDBX_env *env, volatile kvx_t *const kvx,
const tree_t *const db) {
return likely(kvx->clc.v.lmax) ? MDBX_SUCCESS : tbl_setup(env, kvx, db); return likely(kvx->clc.v.lmax) ? MDBX_SUCCESS : tbl_setup(env, kvx, db);
} }
MDBX_MAYBE_UNUSED static inline int tbl_refresh_absent2baddbi(MDBX_txn *txn, size_t dbi) {
int rc = tbl_refresh(txn, dbi);
return likely(rc != MDBX_NOTFOUND) ? rc : MDBX_BAD_DBI;
}
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
MDBX_NOTHROW_PURE_FUNCTION static inline size_t pgno2bytes(const MDBX_env *env, size_t pgno) { MDBX_NOTHROW_PURE_FUNCTION static inline size_t pgno2bytes(const MDBX_env *env, size_t pgno) {

View File

@@ -293,7 +293,7 @@ static __always_inline int couple_init(cursor_couple_t *couple, const MDBX_txn *
} }
if (unlikely(*dbi_state & DBI_STALE)) if (unlikely(*dbi_state & DBI_STALE))
return tbl_fetch(couple->outer.txn, cursor_dbi(&couple->outer)); return tbl_refresh_absent2baddbi(couple->outer.txn, cursor_dbi(&couple->outer));
return tbl_setup_ifneed(txn->env, kvx, tree); return tbl_setup_ifneed(txn->env, kvx, tree);
} }

102
src/dbi.c
View File

@@ -33,6 +33,13 @@ struct dbi_snap_result dbi_snap(const MDBX_env *env, const size_t dbi) {
return r; return r;
} }
int dbi_gone(MDBX_txn *txn, const size_t dbi, const int rc) {
tASSERT(txn, txn->n_dbi > dbi && F_ISSET(txn->dbi_state[dbi], DBI_LINDO | DBI_VALID));
/* TODO: FIXME */
txn->dbi_seqs[dbi] = 0;
return rc;
}
__noinline int dbi_import(MDBX_txn *txn, const size_t dbi) { __noinline int dbi_import(MDBX_txn *txn, const size_t dbi) {
const MDBX_env *const env = txn->env; const MDBX_env *const env = txn->env;
if (dbi >= env->n_dbi || !env->dbs_flags[dbi]) if (dbi >= env->n_dbi || !env->dbs_flags[dbi])
@@ -266,8 +273,8 @@ int dbi_bind(MDBX_txn *txn, const size_t dbi, unsigned user_flags, MDBX_cmp_func
else { else {
if (txn->dbi_state[dbi] & DBI_STALE) { if (txn->dbi_state[dbi] & DBI_STALE) {
eASSERT(env, env->dbs_flags[dbi] & DB_VALID); eASSERT(env, env->dbs_flags[dbi] & DB_VALID);
int err = tbl_fetch(txn, dbi); int err = tbl_refresh(txn, dbi);
if (unlikely(err == MDBX_SUCCESS)) if (unlikely(err != MDBX_NOTFOUND))
return err; return err;
} }
eASSERT(env, ((env->dbs_flags[dbi] ^ txn->dbs[dbi].flags) & DB_PERSISTENT_FLAGS) == 0); eASSERT(env, ((env->dbs_flags[dbi] ^ txn->dbs[dbi].flags) & DB_PERSISTENT_FLAGS) == 0);
@@ -327,6 +334,8 @@ static inline size_t dbi_namelen(const MDBX_val name) {
static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MDBX_cmp_func *keycmp, static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MDBX_cmp_func *keycmp,
MDBX_cmp_func *datacmp, MDBX_val name) { MDBX_cmp_func *datacmp, MDBX_val name) {
cursor_couple_t cx;
int err, rc;
MDBX_env *const env = txn->env; MDBX_env *const env = txn->env;
/* Cannot mix named table(s) with DUPSORT flags */ /* Cannot mix named table(s) with DUPSORT flags */
@@ -352,7 +361,7 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD
env->kvs[MAIN_DBI].clc.v.cmp = builtin_datacmp(main_flags); env->kvs[MAIN_DBI].clc.v.cmp = builtin_datacmp(main_flags);
txn->dbs[MAIN_DBI].flags = main_flags; txn->dbs[MAIN_DBI].flags = main_flags;
txn->dbs[MAIN_DBI].dupfix_size = 0; txn->dbs[MAIN_DBI].dupfix_size = 0;
int err = tbl_setup(env, &env->kvs[MAIN_DBI], &txn->dbs[MAIN_DBI]); err = tbl_setup(env, &env->kvs[MAIN_DBI], &txn->dbs[MAIN_DBI]);
if (unlikely(err != MDBX_SUCCESS)) { if (unlikely(err != MDBX_SUCCESS)) {
txn->dbi_state[MAIN_DBI] = DBI_LINDO; txn->dbi_state[MAIN_DBI] = DBI_LINDO;
txn->flags |= MDBX_TXN_ERROR; txn->flags |= MDBX_TXN_ERROR;
@@ -377,20 +386,34 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD
} }
if (env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name) == 0) { if (env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name) == 0) {
slot = scan; slot = scan;
int err = dbi_check(txn, slot); err = dbi_check(txn, slot);
if (err == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) { if (err == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) {
/* хендл использовался, стал невалидным, /* хендл использовался, стал невалидным,
* но теперь явно пере-открывается в этой транзакци */ * но теперь явно пере-открывается в этой транзакции */
eASSERT(env, !txn->cursors[slot]); eASSERT(env, !txn->cursors[slot]);
txn->dbi_state[slot] = DBI_LINDO; txn->dbi_state[slot] = DBI_LINDO;
err = dbi_check(txn, slot); err = dbi_import(txn, slot);
/* TODO: FIXME */
} }
if (err == MDBX_SUCCESS) { if (unlikely(err != MDBX_SUCCESS))
return err;
err = dbi_bind(txn, slot, user_flags, keycmp, datacmp); err = dbi_bind(txn, slot, user_flags, keycmp, datacmp);
if (likely(err == MDBX_SUCCESS)) { if (unlikely(err != MDBX_SUCCESS))
return err;
if (unlikely((txn->dbi_state[slot] & DBI_STALE) == 0))
goto done; goto done;
err = tbl_refresh(txn, slot);
if (likely(err == MDBX_SUCCESS))
goto done;
if (err == MDBX_NOTFOUND && (user_flags & MDBX_CREATE)) {
name = env->kvs[scan].name;
/* TODO: FIXME */
} }
}
return err; return err;
} }
} }
@@ -409,58 +432,37 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD
env->n_dbi = (unsigned)slot + 1; env->n_dbi = (unsigned)slot + 1;
eASSERT(env, slot < env->n_dbi); eASSERT(env, slot < env->n_dbi);
int err = dbi_check(txn, slot); err = dbi_check(txn, slot);
eASSERT(env, err == MDBX_BAD_DBI); eASSERT(env, err == MDBX_BAD_DBI);
if (unlikely(err != MDBX_BAD_DBI)) if (unlikely(err != MDBX_BAD_DBI))
return MDBX_PROBLEM; return MDBX_PROBLEM;
/* Find the DB info */ /* Find the DB info */
MDBX_val body; rc = tbl_fetch(txn, &cx.outer, slot, &name, user_flags);
cursor_couple_t cx;
int rc = cursor_init(&cx.outer, txn, MAIN_DBI);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
rc = cursor_seek(&cx.outer, &name, &body, MDBX_SET).err;
if (unlikely(rc != MDBX_SUCCESS)) { if (unlikely(rc != MDBX_SUCCESS)) {
if (rc != MDBX_NOTFOUND || !(user_flags & MDBX_CREATE)) if (rc != MDBX_NOTFOUND || !(user_flags & MDBX_CREATE))
return rc; return rc;
} else {
/* make sure this is actually a table */
node_t *node = page_node(cx.outer.pg[cx.outer.top], cx.outer.ki[cx.outer.top]);
if (unlikely((node_flags(node) & (N_DUP | N_TREE)) != N_TREE))
return MDBX_INCOMPATIBLE;
if (!MDBX_DISABLE_VALIDATION && unlikely(body.iov_len != sizeof(tree_t))) {
ERROR("%s/%d: %s %zu", "MDBX_CORRUPTED", MDBX_CORRUPTED, "invalid table node size", body.iov_len);
return MDBX_CORRUPTED;
}
memcpy(&txn->dbs[slot], body.iov_base, sizeof(tree_t));
} }
/* Done here so we cannot fail after creating a new DB */ /* Done here so we cannot fail after creating a new DB */
defer_free_item_t *const clone = osal_malloc(dbi_namelen(name)); defer_free_item_t *clone = nullptr;
if (name.iov_base != env->kvs[slot].name.iov_base) {
clone = osal_malloc(dbi_namelen(name));
if (unlikely(!clone)) if (unlikely(!clone))
return MDBX_ENOMEM; return MDBX_ENOMEM;
memcpy(clone, name.iov_base, name.iov_len); memcpy(clone, name.iov_base, name.iov_len);
name.iov_base = clone; name.iov_base = clone;
}
uint8_t dbi_state = DBI_LINDO | DBI_VALID | DBI_FRESH; uint8_t dbi_state = DBI_LINDO | DBI_VALID | DBI_FRESH;
if (unlikely(rc != MDBX_SUCCESS)) { if (unlikely(rc != MDBX_SUCCESS)) {
/* MDBX_NOTFOUND and MDBX_CREATE: Create new DB */ /* MDBX_NOTFOUND and MDBX_CREATE: Create new DB */
tASSERT(txn, rc == MDBX_NOTFOUND); tASSERT(txn, rc == MDBX_NOTFOUND);
body.iov_base = memset(&txn->dbs[slot], 0, body.iov_len = sizeof(tree_t)); rc = tbl_create(txn, &cx.outer, slot, &name, user_flags);
txn->dbs[slot].root = P_INVALID;
txn->dbs[slot].mod_txnid = txn->txnid;
txn->dbs[slot].flags = user_flags & DB_PERSISTENT_FLAGS;
cx.outer.next = txn->cursors[MAIN_DBI];
txn->cursors[MAIN_DBI] = &cx.outer;
rc = cursor_put_checklen(&cx.outer, &name, &body, N_TREE | MDBX_NOOVERWRITE);
txn->cursors[MAIN_DBI] = cx.outer.next;
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
dbi_state |= DBI_DIRTY | DBI_CREAT; dbi_state |= DBI_DIRTY | DBI_CREAT;
txn->flags |= MDBX_TXN_DIRTY;
tASSERT(txn, (txn->dbi_state[MAIN_DBI] & DBI_DIRTY) != 0);
} }
/* Got info, register DBI in this txn */ /* Got info, register DBI in this txn */
@@ -468,7 +470,6 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD
eASSERT(env, env->dbs_flags[slot] == DB_POISON && !txn->cursors[slot] && eASSERT(env, env->dbs_flags[slot] == DB_POISON && !txn->cursors[slot] &&
(txn->dbi_state[slot] & (DBI_LINDO | DBI_VALID)) == DBI_LINDO); (txn->dbi_state[slot] & (DBI_LINDO | DBI_VALID)) == DBI_LINDO);
txn->dbi_state[slot] = dbi_state; txn->dbi_state[slot] = dbi_state;
memcpy(&txn->dbs[slot], body.iov_base, sizeof(txn->dbs[slot]));
env->dbs_flags[slot] = txn->dbs[slot].flags; env->dbs_flags[slot] = txn->dbs[slot].flags;
rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp); rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
@@ -485,12 +486,16 @@ done:
return MDBX_SUCCESS; return MDBX_SUCCESS;
bailout: bailout:
eASSERT(env, !txn->cursors[slot] && !env->kvs[slot].name.iov_len && !env->kvs[slot].name.iov_base);
txn->dbi_state[slot] &= DBI_LINDO | DBI_OLDEN; txn->dbi_state[slot] &= DBI_LINDO | DBI_OLDEN;
env->dbs_flags[slot] = 0; env->dbs_flags[slot] = 0;
if (clone) {
eASSERT(env, !txn->cursors[slot] && !env->kvs[slot].name.iov_len && !env->kvs[slot].name.iov_base);
osal_free(clone); osal_free(clone);
if (slot + 1 == env->n_dbi) if (slot + 1 == env->n_dbi)
txn->n_dbi = env->n_dbi = (unsigned)slot; txn->n_dbi = env->n_dbi = (unsigned)slot;
} else {
eASSERT(env, name.iov_base == env->kvs[slot].name.iov_base);
}
return rc; return rc;
} }
@@ -568,10 +573,8 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB
rc = dbi_check(txn, i); rc = dbi_check(txn, i);
if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) { if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) {
/* хендл использовался, стал невалидным, /* хендл использовался, стал невалидным,
* но теперь явно пере-открывается в этой транзакци */ * но теперь явно пере-открывается в этой транзакции */
eASSERT(env, !txn->cursors[i]); goto slowpath_locking;
txn->dbi_state[i] = DBI_LINDO;
rc = dbi_check(txn, i);
} }
if (likely(rc == MDBX_SUCCESS)) { if (likely(rc == MDBX_SUCCESS)) {
if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) || if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) ||
@@ -581,9 +584,20 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB
/* похоже на столкновение с параллельно работающим обновлением */ /* похоже на столкновение с параллельно работающим обновлением */
goto slowpath_locking; goto slowpath_locking;
rc = dbi_bind(txn, i, user_flags, keycmp, datacmp); rc = dbi_bind(txn, i, user_flags, keycmp, datacmp);
if (likely(rc == MDBX_SUCCESS)) if (likely(rc == MDBX_SUCCESS)) {
tASSERT(txn, F_ISSET(txn->dbi_state[i], DBI_LINDO | DBI_VALID));
if (txn->dbi_state[i] & DBI_STALE) {
rc = tbl_refresh(txn, i);
if (unlikely(rc != MDBX_SUCCESS)) {
tASSERT(txn, F_ISSET(txn->dbi_state[i], DBI_LINDO | DBI_OLDEN));
if (rc == MDBX_NOTFOUND && (user_flags & MDBX_CREATE))
goto slowpath_locking;
return rc;
}
}
*dbi = (MDBX_dbi)i; *dbi = (MDBX_dbi)i;
} }
}
return rc; return rc;
} }

View File

@@ -80,6 +80,7 @@ static inline bool dbi_foreach_step(const MDBX_txn *const txn, size_t *bitmap_it
#define TXN_FOREACH_DBI_USER(TXN, I) TXN_FOREACH_DBI_FROM(TXN, I, CORE_DBS) #define TXN_FOREACH_DBI_USER(TXN, I) TXN_FOREACH_DBI_FROM(TXN, I, CORE_DBS)
MDBX_INTERNAL int dbi_import(MDBX_txn *txn, const size_t dbi); MDBX_INTERNAL int dbi_import(MDBX_txn *txn, const size_t dbi);
MDBX_INTERNAL int dbi_gone(MDBX_txn *txn, const size_t dbi, const int rc);
struct dbi_snap_result { struct dbi_snap_result {
uint32_t sequence; uint32_t sequence;

View File

@@ -142,10 +142,10 @@ struct kvx {
/* Non-shared DBI state flags inside transaction */ /* Non-shared DBI state flags inside transaction */
enum dbi_state { enum dbi_state {
DBI_DIRTY = 0x01 /* DB was written in this txn */, DBI_DIRTY = 0x01 /* table was written in this txn */,
DBI_STALE = 0x02 /* Named-DB record is older than txnID */, DBI_STALE = 0x02 /* cached table record is outdated and should be reloaded/refreshed */,
DBI_FRESH = 0x04 /* Named-DB handle opened in this txn */, DBI_FRESH = 0x04 /* table handle opened in this txn */,
DBI_CREAT = 0x08 /* Named-DB handle created in this txn */, DBI_CREAT = 0x08 /* table handle created in this txn */,
DBI_VALID = 0x10 /* Handle is valid, see also DB_VALID */, DBI_VALID = 0x10 /* Handle is valid, see also DB_VALID */,
DBI_OLDEN = 0x40 /* Handle was closed/reopened outside txn */, DBI_OLDEN = 0x40 /* Handle was closed/reopened outside txn */,
DBI_LINDO = 0x80 /* Lazy initialization done for DBI-slot */, DBI_LINDO = 0x80 /* Lazy initialization done for DBI-slot */,

View File

@@ -109,8 +109,12 @@ MDBX_INTERNAL void recalculate_merge_thresholds(MDBX_env *env);
MDBX_INTERNAL void recalculate_subpage_thresholds(MDBX_env *env); MDBX_INTERNAL void recalculate_subpage_thresholds(MDBX_env *env);
/* table.c */ /* table.c */
MDBX_INTERNAL int __must_check_result tbl_fetch(MDBX_txn *txn, size_t dbi); MDBX_INTERNAL int __must_check_result tbl_fetch(MDBX_txn *txn, MDBX_cursor *mc, size_t dbi, const MDBX_val *name,
unsigned wanna_flags);
MDBX_INTERNAL int __must_check_result tbl_create(MDBX_txn *txn, MDBX_cursor *mc, size_t slot, const MDBX_val *name,
unsigned db_flags);
MDBX_INTERNAL int __must_check_result tbl_setup(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *const db); MDBX_INTERNAL int __must_check_result tbl_setup(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *const db);
MDBX_INTERNAL int __must_check_result tbl_refresh(MDBX_txn *txn, size_t dbi);
/* coherency.c */ /* coherency.c */
MDBX_INTERNAL bool coherency_check_meta(const MDBX_env *env, const volatile meta_t *meta, bool report); MDBX_INTERNAL bool coherency_check_meta(const MDBX_env *env, const volatile meta_t *meta, bool report);

View File

@@ -37,67 +37,99 @@ int tbl_setup(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *cons
return MDBX_SUCCESS; return MDBX_SUCCESS;
} }
int tbl_fetch(MDBX_txn *txn, size_t dbi) { int tbl_fetch(MDBX_txn *txn, MDBX_cursor *mc, size_t dbi, const MDBX_val *name, unsigned wanna_flags) {
cursor_couple_t couple; int err = cursor_init(mc, txn, MAIN_DBI);
int rc = cursor_init(&couple.outer, txn, MAIN_DBI); if (unlikely(err != MDBX_SUCCESS))
if (unlikely(rc != MDBX_SUCCESS)) return err;
return rc;
kvx_t *const kvx = &txn->env->kvs[dbi]; err = tree_search(mc, name, 0);
rc = tree_search(&couple.outer, &kvx->name, 0); if (unlikely(err != MDBX_SUCCESS)) {
if (unlikely(rc != MDBX_SUCCESS)) { if (err == MDBX_NOTFOUND)
bailout: goto notfound;
NOTICE("dbi %zu refs to inaccessible table `%.*s` for txn %" PRIaTXN " (err %d)", dbi, (int)kvx->name.iov_len, return err;
(const char *)kvx->name.iov_base, txn->txnid, rc); }
return (rc == MDBX_NOTFOUND) ? MDBX_BAD_DBI : rc;
struct node_search_result nsr = node_search(mc, name);
if (unlikely(!nsr.exact)) {
notfound:
if (dbi < txn->env->n_dbi && (txn->env->dbs_flags[dbi] & DB_VALID) && !(wanna_flags & MDBX_CREATE))
NOTICE("dbi %zu refs to non-existing table `%.*s` for txn %" PRIaTXN " (err %d)", dbi, (int)name->iov_len,
(const char *)name->iov_base, txn->txnid, err);
return MDBX_NOTFOUND;
}
if (unlikely((node_flags(nsr.node) & (N_DUP | N_TREE)) != N_TREE)) {
NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)name->iov_len,
(const char *)name->iov_base, txn->txnid, "wrong node-flags");
return MDBX_INCOMPATIBLE /* not a named DB */;
} }
MDBX_val data; MDBX_val data;
struct node_search_result nsr = node_search(&couple.outer, &kvx->name); err = node_read(mc, nsr.node, &data, mc->pg[mc->top]);
if (unlikely(!nsr.exact)) { if (unlikely(err != MDBX_SUCCESS))
rc = MDBX_NOTFOUND; return err;
goto bailout;
} if (unlikely(data.iov_len < sizeof(tree_t))) {
if (unlikely((node_flags(nsr.node) & (N_DUP | N_TREE)) != N_TREE)) { NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)name->iov_len,
NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)kvx->name.iov_len, (const char *)name->iov_base, txn->txnid, "wrong record-size");
(const char *)kvx->name.iov_base, txn->txnid, "wrong flags"); return MDBX_INCOMPATIBLE /* not a named DB */;
return MDBX_INCOMPATIBLE; /* not a named DB */
} }
rc = node_read(&couple.outer, nsr.node, &data, couple.outer.pg[couple.outer.top]); const unsigned db_flags = UNALIGNED_PEEK_16(data.iov_base, tree_t, flags);
if (unlikely(rc != MDBX_SUCCESS)) const pgno_t db_root_pgno = peek_pgno(ptr_disp(data.iov_base, offsetof(tree_t, root)));
return rc;
if (unlikely(data.iov_len != sizeof(tree_t))) {
NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)kvx->name.iov_len,
(const char *)kvx->name.iov_base, txn->txnid, "wrong rec-size");
return MDBX_INCOMPATIBLE; /* not a named DB */
}
uint16_t flags = UNALIGNED_PEEK_16(data.iov_base, tree_t, flags);
/* The txn may not know this DBI, or another process may /* The txn may not know this DBI, or another process may
* have dropped and recreated the DB with other flags. */ * have dropped and recreated the DB with other flags. */
tree_t *const db = &txn->dbs[dbi]; if (unlikely((wanna_flags ^ db_flags) & DB_PERSISTENT_FLAGS) && !(wanna_flags & MDBX_DB_ACCEDE) &&
if (unlikely((db->flags & DB_PERSISTENT_FLAGS) != flags)) { !((wanna_flags & MDBX_CREATE) && db_root_pgno == P_INVALID)) {
NOTICE("dbi %zu refs to the re-created table `%.*s` for txn %" PRIaTXN NOTICE("dbi %zu refs to the re-created table `%.*s` for txn %" PRIaTXN
" with different flags (present 0x%X != wanna 0x%X)", " with different flags (present 0x%X != wanna 0x%X)",
dbi, (int)kvx->name.iov_len, (const char *)kvx->name.iov_base, txn->txnid, db->flags & DB_PERSISTENT_FLAGS, dbi, (int)name->iov_len, (const char *)name->iov_base, txn->txnid, db_flags & DB_PERSISTENT_FLAGS,
flags); wanna_flags & DB_PERSISTENT_FLAGS);
return MDBX_INCOMPATIBLE; return MDBX_INCOMPATIBLE /* not a named DB */;
} }
tree_t *const db = &txn->dbs[dbi];
memcpy(db, data.iov_base, sizeof(tree_t)); memcpy(db, data.iov_base, sizeof(tree_t));
#if !MDBX_DISABLE_VALIDATION #if !MDBX_DISABLE_VALIDATION
const txnid_t pp_txnid = couple.outer.pg[couple.outer.top]->txnid; const txnid_t maindb_leafpage_txnid = mc->pg[mc->top]->txnid;
tASSERT(txn, txn->front_txnid >= pp_txnid); tASSERT(txn, txn->front_txnid >= maindb_leafpage_txnid);
if (unlikely(db->mod_txnid > pp_txnid)) { if (unlikely(db->mod_txnid > maindb_leafpage_txnid)) {
ERROR("db.mod_txnid (%" PRIaTXN ") > page-txnid (%" PRIaTXN ")", db->mod_txnid, pp_txnid); ERROR("db.mod_txnid (%" PRIaTXN ") > page-txnid (%" PRIaTXN ")", db->mod_txnid, maindb_leafpage_txnid);
return MDBX_CORRUPTED; return MDBX_CORRUPTED;
} }
#endif /* !MDBX_DISABLE_VALIDATION */ #endif /* !MDBX_DISABLE_VALIDATION */
rc = tbl_setup_ifneed(txn->env, kvx, db);
return MDBX_SUCCESS;
}
int tbl_create(MDBX_txn *txn, MDBX_cursor *mc, size_t slot, const MDBX_val *name, unsigned db_flags) {
tASSERT(txn, db_flags & MDBX_CREATE);
MDBX_val body;
body.iov_base = memset(&txn->dbs[slot], 0, body.iov_len = sizeof(tree_t));
txn->dbs[slot].root = P_INVALID;
txn->dbs[slot].mod_txnid = txn->txnid;
txn->dbs[slot].flags = db_flags & DB_PERSISTENT_FLAGS;
mc->next = txn->cursors[MAIN_DBI];
txn->cursors[MAIN_DBI] = mc;
int err = cursor_put_checklen(mc, name, &body, N_TREE | MDBX_NOOVERWRITE);
txn->cursors[MAIN_DBI] = mc->next;
if (likely(err == MDBX_SUCCESS)) {
txn->flags |= MDBX_TXN_DIRTY;
tASSERT(txn, (txn->dbi_state[MAIN_DBI] & DBI_DIRTY) != 0);
}
return err;
}
int tbl_refresh(MDBX_txn *txn, size_t dbi) {
cursor_couple_t couple;
kvx_t *const kvx = &txn->env->kvs[dbi];
int rc = tbl_fetch(txn, &couple.outer, dbi, &kvx->name, txn->dbs[dbi].flags);
if (likely(rc != MDBX_SUCCESS))
return dbi_gone(txn, dbi, rc);
rc = tbl_setup_ifneed(txn->env, kvx, &txn->dbs[dbi]);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
return rc; return dbi_gone(txn, dbi, rc);
if (unlikely(dbi_changed(txn, dbi))) if (unlikely(dbi_changed(txn, dbi)))
return MDBX_BAD_DBI; return MDBX_BAD_DBI;

View File

@@ -39,7 +39,7 @@ __hot int tree_search(MDBX_cursor *mc, const MDBX_val *key, int flags) {
const size_t dbi = cursor_dbi(mc); const size_t dbi = cursor_dbi(mc);
if (unlikely(*cursor_dbi_state(mc) & DBI_STALE)) { if (unlikely(*cursor_dbi_state(mc) & DBI_STALE)) {
err = tbl_fetch(mc->txn, dbi); err = tbl_refresh_absent2baddbi(mc->txn, dbi);
if (unlikely(err != MDBX_SUCCESS)) if (unlikely(err != MDBX_SUCCESS))
goto bailout; goto bailout;
} }

View File

@@ -47,7 +47,7 @@ int txn_shadow_cursors(const MDBX_txn *parent, const size_t dbi) {
int err = cursor_shadow(cursor, txn, dbi); int err = cursor_shadow(cursor, txn, dbi);
if (unlikely(err != MDBX_SUCCESS)) { if (unlikely(err != MDBX_SUCCESS)) {
/* не получилось забекапить курсоры */ /* не получилось забекапить курсоры */
txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO | DBI_STALE; txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO;
txn->flags |= MDBX_TXN_ERROR; txn->flags |= MDBX_TXN_ERROR;
return err; return err;
} }