diff --git a/src/api-dbi.c b/src/api-dbi.c index 66f08c4a..695e7b81 100644 --- a/src/api-dbi.c +++ b/src/api-dbi.c @@ -241,7 +241,7 @@ __cold int mdbx_dbi_stat(const MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *dest, siz return LOG_IFERR(MDBX_BAD_TXN); if (unlikely(txn->dbi_state[dbi] & DBI_STALE)) { - rc = tbl_fetch((MDBX_txn *)txn, dbi); + rc = tbl_refresh((MDBX_txn *)txn, dbi); if (unlikely(rc != MDBX_SUCCESS)) return LOG_IFERR(rc); } diff --git a/src/api-misc.c b/src/api-misc.c index 92e3fc2f..bd8e27f3 100644 --- a/src/api-misc.c +++ b/src/api-misc.c @@ -37,7 +37,7 @@ int mdbx_dbi_sequence(MDBX_txn *txn, MDBX_dbi dbi, uint64_t *result, uint64_t in return LOG_IFERR(rc); if (unlikely(txn->dbi_state[dbi] & DBI_STALE)) { - rc = tbl_fetch(txn, dbi); + rc = tbl_refresh_absent2baddbi(txn, dbi); if (unlikely(rc != MDBX_SUCCESS)) return LOG_IFERR(rc); } diff --git a/src/cogs.h b/src/cogs.h index 457a721c..fc2c870f 100644 --- a/src/cogs.h +++ b/src/cogs.h @@ -200,10 +200,16 @@ static inline bool check_table_flags(unsigned flags) { } } -static inline int tbl_setup_ifneed(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *const db) { +MDBX_MAYBE_UNUSED static inline int tbl_setup_ifneed(const MDBX_env *env, volatile kvx_t *const kvx, + const tree_t *const db) { return likely(kvx->clc.v.lmax) ? MDBX_SUCCESS : tbl_setup(env, kvx, db); } +MDBX_MAYBE_UNUSED static inline int tbl_refresh_absent2baddbi(MDBX_txn *txn, size_t dbi) { + int rc = tbl_refresh(txn, dbi); + return likely(rc != MDBX_NOTFOUND) ? rc : MDBX_BAD_DBI; +} + /*----------------------------------------------------------------------------*/ MDBX_NOTHROW_PURE_FUNCTION static inline size_t pgno2bytes(const MDBX_env *env, size_t pgno) { diff --git a/src/cursor.c b/src/cursor.c index f2df9244..c64b49cd 100644 --- a/src/cursor.c +++ b/src/cursor.c @@ -293,7 +293,7 @@ static __always_inline int couple_init(cursor_couple_t *couple, const MDBX_txn * } if (unlikely(*dbi_state & DBI_STALE)) - return tbl_fetch(couple->outer.txn, cursor_dbi(&couple->outer)); + return tbl_refresh_absent2baddbi(couple->outer.txn, cursor_dbi(&couple->outer)); return tbl_setup_ifneed(txn->env, kvx, tree); } diff --git a/src/dbi.c b/src/dbi.c index daf11783..8030d995 100644 --- a/src/dbi.c +++ b/src/dbi.c @@ -33,6 +33,23 @@ struct dbi_snap_result dbi_snap(const MDBX_env *env, const size_t dbi) { return r; } +int dbi_gone(MDBX_txn *txn, const size_t dbi, const int rc) { + tASSERT(txn, txn->n_dbi > dbi && F_ISSET(txn->dbi_state[dbi], DBI_LINDO | DBI_VALID)); + for (;;) { + unsigned state = txn->dbi_state[dbi]; + txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO; + if (state & (DBI_FRESH | DBI_CREAT)) + return rc; + if (!txn->parent) + break; + txn = txn->parent; + } + + /* TODO: FIXME */ + txn->dbi_seqs[dbi] = 0; + return rc; +} + __noinline int dbi_import(MDBX_txn *txn, const size_t dbi) { const MDBX_env *const env = txn->env; if (dbi >= env->n_dbi || !env->dbs_flags[dbi]) @@ -266,8 +283,8 @@ int dbi_bind(MDBX_txn *txn, const size_t dbi, unsigned user_flags, MDBX_cmp_func else { if (txn->dbi_state[dbi] & DBI_STALE) { eASSERT(env, env->dbs_flags[dbi] & DB_VALID); - int err = tbl_fetch(txn, dbi); - if (unlikely(err == MDBX_SUCCESS)) + int err = tbl_refresh(txn, dbi); + if (unlikely(err != MDBX_NOTFOUND)) return err; } eASSERT(env, ((env->dbs_flags[dbi] ^ txn->dbs[dbi].flags) & DB_PERSISTENT_FLAGS) == 0); @@ -325,8 +342,9 @@ static inline size_t dbi_namelen(const MDBX_val name) { return (name.iov_len > sizeof(defer_free_item_t)) ? name.iov_len : sizeof(defer_free_item_t); } -static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MDBX_cmp_func *keycmp, - MDBX_cmp_func *datacmp, MDBX_val name) { +static int dbi_open_locked(MDBX_txn *txn, cursor_couple_t *maindb_cx, unsigned user_flags, MDBX_cmp_func *keycmp, + MDBX_cmp_func *datacmp, MDBX_val name, const size_t fastpath_slot) { + int rc; MDBX_env *const env = txn->env; /* Cannot mix named table(s) with DUPSORT flags */ @@ -352,12 +370,12 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD env->kvs[MAIN_DBI].clc.v.cmp = builtin_datacmp(main_flags); txn->dbs[MAIN_DBI].flags = main_flags; txn->dbs[MAIN_DBI].dupfix_size = 0; - int err = tbl_setup(env, &env->kvs[MAIN_DBI], &txn->dbs[MAIN_DBI]); - if (unlikely(err != MDBX_SUCCESS)) { + rc = tbl_setup(env, &env->kvs[MAIN_DBI], &txn->dbs[MAIN_DBI]); + if (unlikely(rc != MDBX_SUCCESS)) { txn->dbi_state[MAIN_DBI] = DBI_LINDO; txn->flags |= MDBX_TXN_ERROR; env->flags |= ENV_FATAL_ERROR; - return err; + return rc; } env->dbs_flags[MAIN_DBI] = main_flags | DB_VALID; txn->dbi_seqs[MAIN_DBI] = atomic_store32(&env->dbi_seqs[MAIN_DBI], seq, mo_AcquireRelease); @@ -368,6 +386,7 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD tASSERT(txn, env->kvs[MAIN_DBI].clc.k.cmp); /* Is the DB already open? */ + defer_free_item_t *clone = nullptr; size_t slot = env->n_dbi; for (size_t scan = CORE_DBS; scan < env->n_dbi; ++scan) { if ((env->dbs_flags[scan] & DB_VALID) == 0) { @@ -377,21 +396,49 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD } if (env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name) == 0) { slot = scan; - int err = dbi_check(txn, slot); - if (err == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) { + rc = dbi_check(txn, slot); + if (rc == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) { /* хендл использовался, стал невалидным, - * но теперь явно пере-открывается в этой транзакци */ + * но теперь явно пере-открывается в этой транзакции */ eASSERT(env, !txn->cursors[slot]); txn->dbi_state[slot] = DBI_LINDO; - err = dbi_check(txn, slot); + txn->dbi_seqs[slot] = 0; + rc = dbi_import(txn, slot); + /* TODO: FIXME */ } - if (err == MDBX_SUCCESS) { - err = dbi_bind(txn, slot, user_flags, keycmp, datacmp); - if (likely(err == MDBX_SUCCESS)) { - goto done; + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + if (unlikely((txn->dbi_state[slot] & DBI_STALE) == 0)) + goto done; + + if (fastpath_slot /* уже был выполнен поиск посредством tbl_fetch() */) { + if (slot != fastpath_slot) + txn->dbs[slot] = txn->dbs[fastpath_slot]; + if (user_flags & MDBX_CREATE) { + /* значит таблица уже была открытой, но проверка её наличия в fastpath вернула MDBX_NOTFOUND */ + rc = MDBX_NOTFOUND; + } else { + /* значит в fastpath был найден пустой слот и проверка наличия таблицы завершилась успешно */ + assert(rc == MDBX_SUCCESS); } + } else { + rc = tbl_fetch(txn, &maindb_cx->outer, slot, &name, user_flags); } - return err; + + if (likely(rc == MDBX_SUCCESS)) + goto done; + + if (rc == MDBX_NOTFOUND && (user_flags & MDBX_CREATE)) { + name = env->kvs[scan].name; + goto create; + } + + return dbi_gone(txn, slot, rc); } } @@ -409,88 +456,72 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD env->n_dbi = (unsigned)slot + 1; eASSERT(env, slot < env->n_dbi); - int err = dbi_check(txn, slot); - eASSERT(env, err == MDBX_BAD_DBI); - if (unlikely(err != MDBX_BAD_DBI)) + rc = dbi_check(txn, slot); + eASSERT(env, rc == MDBX_BAD_DBI); + if (unlikely(rc != MDBX_BAD_DBI)) return MDBX_PROBLEM; /* Find the DB info */ - MDBX_val body; - cursor_couple_t cx; - int rc = cursor_init(&cx.outer, txn, MAIN_DBI); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; - rc = cursor_seek(&cx.outer, &name, &body, MDBX_SET).err; + rc = tbl_fetch(txn, &maindb_cx->outer, slot, &name, user_flags); if (unlikely(rc != MDBX_SUCCESS)) { if (rc != MDBX_NOTFOUND || !(user_flags & MDBX_CREATE)) return rc; - } else { - /* make sure this is actually a table */ - node_t *node = page_node(cx.outer.pg[cx.outer.top], cx.outer.ki[cx.outer.top]); - if (unlikely((node_flags(node) & (N_DUP | N_TREE)) != N_TREE)) - return MDBX_INCOMPATIBLE; - if (!MDBX_DISABLE_VALIDATION && unlikely(body.iov_len != sizeof(tree_t))) { - ERROR("%s/%d: %s %zu", "MDBX_CORRUPTED", MDBX_CORRUPTED, "invalid table node size", body.iov_len); - return MDBX_CORRUPTED; - } - memcpy(&txn->dbs[slot], body.iov_base, sizeof(tree_t)); } /* Done here so we cannot fail after creating a new DB */ - defer_free_item_t *const clone = osal_malloc(dbi_namelen(name)); + clone = osal_malloc(dbi_namelen(name)); if (unlikely(!clone)) return MDBX_ENOMEM; memcpy(clone, name.iov_base, name.iov_len); name.iov_base = clone; +create: + tASSERT(txn, rc == MDBX_SUCCESS || rc == MDBX_NOTFOUND); uint8_t dbi_state = DBI_LINDO | DBI_VALID | DBI_FRESH; if (unlikely(rc != MDBX_SUCCESS)) { - /* MDBX_NOTFOUND and MDBX_CREATE: Create new DB */ - tASSERT(txn, rc == MDBX_NOTFOUND); - body.iov_base = memset(&txn->dbs[slot], 0, body.iov_len = sizeof(tree_t)); - txn->dbs[slot].root = P_INVALID; - txn->dbs[slot].mod_txnid = txn->txnid; - txn->dbs[slot].flags = user_flags & DB_PERSISTENT_FLAGS; - cx.outer.next = txn->cursors[MAIN_DBI]; - txn->cursors[MAIN_DBI] = &cx.outer; - rc = cursor_put_checklen(&cx.outer, &name, &body, N_TREE | MDBX_NOOVERWRITE); - txn->cursors[MAIN_DBI] = cx.outer.next; + rc = tbl_create(txn, &maindb_cx->outer, slot, &name, user_flags); if (unlikely(rc != MDBX_SUCCESS)) goto bailout; - dbi_state |= DBI_DIRTY | DBI_CREAT; - txn->flags |= MDBX_TXN_DIRTY; - tASSERT(txn, (txn->dbi_state[MAIN_DBI] & DBI_DIRTY) != 0); } /* Got info, register DBI in this txn */ const uint32_t seq = dbi_seq_next(env, slot); - eASSERT(env, env->dbs_flags[slot] == DB_POISON && !txn->cursors[slot] && - (txn->dbi_state[slot] & (DBI_LINDO | DBI_VALID)) == DBI_LINDO); - txn->dbi_state[slot] = dbi_state; - memcpy(&txn->dbs[slot], body.iov_base, sizeof(txn->dbs[slot])); - env->dbs_flags[slot] = txn->dbs[slot].flags; - rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp); - if (unlikely(rc != MDBX_SUCCESS)) - goto bailout; + eASSERT(env, !txn->cursors[slot]); + if (clone) { + eASSERT(env, env->dbs_flags[slot] == DB_POISON && (txn->dbi_state[slot] & (DBI_LINDO | DBI_VALID)) == DBI_LINDO); + txn->dbi_state[slot] = dbi_state; + env->dbs_flags[slot] = txn->dbs[slot].flags; + rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp); + if (unlikely(rc != MDBX_SUCCESS)) + goto bailout; - env->kvs[slot].name = name; - env->dbs_flags[slot] = txn->dbs[slot].flags | DB_VALID; - txn->dbi_seqs[slot] = atomic_store32(&env->dbi_seqs[slot], seq, mo_AcquireRelease); + env->kvs[slot].name = name; + env->dbs_flags[slot] = txn->dbs[slot].flags | DB_VALID; + txn->dbi_seqs[slot] = atomic_store32(&env->dbi_seqs[slot], seq, mo_AcquireRelease); + } else { + eASSERT(env, env->dbs_flags[slot] == (DB_VALID | (user_flags & DB_PERSISTENT_FLAGS)) && + env->dbs_flags[slot] == (DB_VALID | txn->dbs[slot].flags) && + txn->dbi_state[slot] == (DBI_LINDO | DBI_VALID | DBI_STALE)); + } done: - *dbi = (MDBX_dbi)slot; + *(MDBX_dbi *)maindb_cx->userctx = (MDBX_dbi)slot; tASSERT(txn, slot < txn->n_dbi && (env->dbs_flags[slot] & DB_VALID) != 0); eASSERT(env, dbi_check(txn, slot) == MDBX_SUCCESS); return MDBX_SUCCESS; bailout: - eASSERT(env, !txn->cursors[slot] && !env->kvs[slot].name.iov_len && !env->kvs[slot].name.iov_base); txn->dbi_state[slot] &= DBI_LINDO | DBI_OLDEN; env->dbs_flags[slot] = 0; - osal_free(clone); - if (slot + 1 == env->n_dbi) - txn->n_dbi = env->n_dbi = (unsigned)slot; + if (clone) { + eASSERT(env, !txn->cursors[slot] && !env->kvs[slot].name.iov_len && !env->kvs[slot].name.iov_base); + osal_free(clone); + if (slot + 1 == env->n_dbi) + txn->n_dbi = env->n_dbi = (unsigned)slot; + } else { + eASSERT(env, name.iov_base == env->kvs[slot].name.iov_base); + } return rc; } @@ -528,18 +559,20 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB if (unlikely(name->iov_len > txn->env->leaf_nodemax - NODESIZE - sizeof(tree_t))) return MDBX_EINVAL; + cursor_couple_t cx; + size_t fastpath_slot = 0; #if MDBX_ENABLE_DBI_LOCKFREE /* Is the DB already open? */ const MDBX_env *const env = txn->env; - bool have_free_slot = env->n_dbi < env->max_dbi; - for (size_t i = CORE_DBS; i < env->n_dbi; ++i) { - if ((env->dbs_flags[i] & DB_VALID) == 0) { - have_free_slot = true; + size_t first_free_slot = env->n_dbi; + for (size_t slot = CORE_DBS; slot < env->n_dbi; ++slot) { + if ((env->dbs_flags[slot] & DB_VALID) == 0) { + first_free_slot = (first_free_slot < slot) ? first_free_slot : slot; continue; } - struct dbi_snap_result snap = dbi_snap(env, i); - const MDBX_val snap_name = env->kvs[i].name; + struct dbi_snap_result snap = dbi_snap(env, slot); + const MDBX_val snap_name = env->kvs[slot].name; const uint32_t main_seq = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease); MDBX_cmp_func *const snap_cmp = env->kvs[MAIN_DBI].clc.k.cmp; if (unlikely(!(snap.flags & DB_VALID) || !snap_name.iov_base || !snap_name.iov_len || !snap_cmp)) @@ -547,10 +580,10 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB goto slowpath_locking; const bool name_match = snap_cmp(&snap_name, name) == 0; - if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) || + if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[slot], mo_AcquireRelease) || main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) || - snap.flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base || - snap_name.iov_len != env->kvs[i].name.iov_len)) + snap.flags != env->dbs_flags[slot] || snap_name.iov_base != env->kvs[slot].name.iov_base || + snap_name.iov_len != env->kvs[slot].name.iov_len)) /* похоже на столкновение с параллельно работающим обновлением */ goto slowpath_locking; @@ -559,45 +592,65 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB osal_flush_incoherent_cpu_writeback(); if (user_flags != MDBX_ACCEDE && - (((user_flags ^ snap.flags) & DB_PERSISTENT_FLAGS) || (keycmp && keycmp != env->kvs[i].clc.k.cmp) || - (datacmp && datacmp != env->kvs[i].clc.v.cmp))) + (((user_flags ^ snap.flags) & DB_PERSISTENT_FLAGS) || (keycmp && keycmp != env->kvs[slot].clc.k.cmp) || + (datacmp && datacmp != env->kvs[slot].clc.v.cmp))) /* есть подозрение что пользователь открывает таблицу с другими флагами/атрибутами * или другими компараторами, поэтому уходим в безопасный режим */ goto slowpath_locking; - rc = dbi_check(txn, i); - if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) { + rc = dbi_check(txn, slot); + if (rc == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) { /* хендл использовался, стал невалидным, - * но теперь явно пере-открывается в этой транзакци */ - eASSERT(env, !txn->cursors[i]); - txn->dbi_state[i] = DBI_LINDO; - rc = dbi_check(txn, i); + * но теперь явно пере-открывается в этой транзакции */ + goto slowpath_locking; } - if (likely(rc == MDBX_SUCCESS)) { - if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) || - main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) || - snap.flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base || - snap_name.iov_len != env->kvs[i].name.iov_len)) - /* похоже на столкновение с параллельно работающим обновлением */ - goto slowpath_locking; - rc = dbi_bind(txn, i, user_flags, keycmp, datacmp); - if (likely(rc == MDBX_SUCCESS)) - *dbi = (MDBX_dbi)i; + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[slot], mo_AcquireRelease) || + main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) || + snap.flags != env->dbs_flags[slot] || snap_name.iov_base != env->kvs[slot].name.iov_base || + snap_name.iov_len != env->kvs[slot].name.iov_len)) + /* похоже на столкновение с параллельно работающим обновлением */ + goto slowpath_locking; + + rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + tASSERT(txn, F_ISSET(txn->dbi_state[slot], DBI_LINDO | DBI_VALID)); + if (txn->dbi_state[slot] & DBI_STALE) { + rc = tbl_fetch(txn, &cx.outer, fastpath_slot = slot, name, user_flags); + if (unlikely(rc != MDBX_SUCCESS)) { + if (rc == MDBX_NOTFOUND && (user_flags & MDBX_CREATE)) + /* таблицы уже нет, но запрошено её пересоздание */ + goto slowpath_locking; + + return dbi_gone(txn, slot, rc); + } + txn->dbi_state[slot] -= DBI_STALE; } - return rc; + *dbi = (MDBX_dbi)slot; + return MDBX_SUCCESS; } /* Fail, if no free slot and max hit */ - if (unlikely(!have_free_slot)) + if (unlikely(first_free_slot >= env->max_dbi)) return MDBX_DBS_FULL; -slowpath_locking: + if (!(user_flags & MDBX_CREATE)) { + rc = tbl_fetch(txn, &cx.outer, fastpath_slot = first_free_slot, name, user_flags); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + } +slowpath_locking: #endif /* MDBX_ENABLE_DBI_LOCKFREE */ + cx.userctx = dbi; rc = osal_fastmutex_acquire(&txn->env->dbi_lock); if (likely(rc == MDBX_SUCCESS)) { - rc = dbi_open_locked(txn, user_flags, dbi, keycmp, datacmp, *name); + rc = dbi_open_locked(txn, &cx, user_flags, keycmp, datacmp, *name, fastpath_slot); ENSURE(txn->env, osal_fastmutex_release(&txn->env->dbi_lock) == MDBX_SUCCESS); } return rc; diff --git a/src/dbi.h b/src/dbi.h index 5fdccb3e..d08dadd0 100644 --- a/src/dbi.h +++ b/src/dbi.h @@ -80,6 +80,7 @@ static inline bool dbi_foreach_step(const MDBX_txn *const txn, size_t *bitmap_it #define TXN_FOREACH_DBI_USER(TXN, I) TXN_FOREACH_DBI_FROM(TXN, I, CORE_DBS) MDBX_INTERNAL int dbi_import(MDBX_txn *txn, const size_t dbi); +MDBX_INTERNAL int dbi_gone(MDBX_txn *txn, const size_t dbi, const int rc); struct dbi_snap_result { uint32_t sequence; diff --git a/src/internals.h b/src/internals.h index e9e20331..4a3994d3 100644 --- a/src/internals.h +++ b/src/internals.h @@ -142,10 +142,10 @@ struct kvx { /* Non-shared DBI state flags inside transaction */ enum dbi_state { - DBI_DIRTY = 0x01 /* DB was written in this txn */, - DBI_STALE = 0x02 /* Named-DB record is older than txnID */, - DBI_FRESH = 0x04 /* Named-DB handle opened in this txn */, - DBI_CREAT = 0x08 /* Named-DB handle created in this txn */, + DBI_DIRTY = 0x01 /* table was written in this txn */, + DBI_STALE = 0x02 /* cached table record is outdated and should be reloaded/refreshed */, + DBI_FRESH = 0x04 /* table handle opened in this txn */, + DBI_CREAT = 0x08 /* table handle created in this txn */, DBI_VALID = 0x10 /* Handle is valid, see also DB_VALID */, DBI_OLDEN = 0x40 /* Handle was closed/reopened outside txn */, DBI_LINDO = 0x80 /* Lazy initialization done for DBI-slot */, diff --git a/src/proto.h b/src/proto.h index 8b1018b4..8850c578 100644 --- a/src/proto.h +++ b/src/proto.h @@ -109,8 +109,12 @@ MDBX_INTERNAL void recalculate_merge_thresholds(MDBX_env *env); MDBX_INTERNAL void recalculate_subpage_thresholds(MDBX_env *env); /* table.c */ -MDBX_INTERNAL int __must_check_result tbl_fetch(MDBX_txn *txn, size_t dbi); +MDBX_INTERNAL int __must_check_result tbl_fetch(MDBX_txn *txn, MDBX_cursor *mc, size_t dbi, const MDBX_val *name, + unsigned wanna_flags); +MDBX_INTERNAL int __must_check_result tbl_create(MDBX_txn *txn, MDBX_cursor *mc, size_t slot, const MDBX_val *name, + unsigned db_flags); MDBX_INTERNAL int __must_check_result tbl_setup(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *const db); +MDBX_INTERNAL int __must_check_result tbl_refresh(MDBX_txn *txn, size_t dbi); /* coherency.c */ MDBX_INTERNAL bool coherency_check_meta(const MDBX_env *env, const volatile meta_t *meta, bool report); diff --git a/src/table.c b/src/table.c index 5b3e441a..2f5551a5 100644 --- a/src/table.c +++ b/src/table.c @@ -37,67 +37,99 @@ int tbl_setup(const MDBX_env *env, volatile kvx_t *const kvx, const tree_t *cons return MDBX_SUCCESS; } -int tbl_fetch(MDBX_txn *txn, size_t dbi) { - cursor_couple_t couple; - int rc = cursor_init(&couple.outer, txn, MAIN_DBI); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; +int tbl_fetch(MDBX_txn *txn, MDBX_cursor *mc, size_t dbi, const MDBX_val *name, unsigned wanna_flags) { + int err = cursor_init(mc, txn, MAIN_DBI); + if (unlikely(err != MDBX_SUCCESS)) + return err; - kvx_t *const kvx = &txn->env->kvs[dbi]; - rc = tree_search(&couple.outer, &kvx->name, 0); - if (unlikely(rc != MDBX_SUCCESS)) { - bailout: - NOTICE("dbi %zu refs to inaccessible table `%.*s` for txn %" PRIaTXN " (err %d)", dbi, (int)kvx->name.iov_len, - (const char *)kvx->name.iov_base, txn->txnid, rc); - return (rc == MDBX_NOTFOUND) ? MDBX_BAD_DBI : rc; + err = tree_search(mc, name, 0); + if (unlikely(err != MDBX_SUCCESS)) { + if (err == MDBX_NOTFOUND) + goto notfound; + return err; + } + + struct node_search_result nsr = node_search(mc, name); + if (unlikely(!nsr.exact)) { + notfound: + if (dbi < txn->env->n_dbi && (txn->env->dbs_flags[dbi] & DB_VALID) && !(wanna_flags & MDBX_CREATE)) + NOTICE("dbi %zu refs to non-existing table `%.*s` for txn %" PRIaTXN " (err %d)", dbi, (int)name->iov_len, + (const char *)name->iov_base, txn->txnid, err); + return MDBX_NOTFOUND; + } + + if (unlikely((node_flags(nsr.node) & (N_DUP | N_TREE)) != N_TREE)) { + NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)name->iov_len, + (const char *)name->iov_base, txn->txnid, "wrong node-flags"); + return MDBX_INCOMPATIBLE /* not a named DB */; } MDBX_val data; - struct node_search_result nsr = node_search(&couple.outer, &kvx->name); - if (unlikely(!nsr.exact)) { - rc = MDBX_NOTFOUND; - goto bailout; - } - if (unlikely((node_flags(nsr.node) & (N_DUP | N_TREE)) != N_TREE)) { - NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)kvx->name.iov_len, - (const char *)kvx->name.iov_base, txn->txnid, "wrong flags"); - return MDBX_INCOMPATIBLE; /* not a named DB */ + err = node_read(mc, nsr.node, &data, mc->pg[mc->top]); + if (unlikely(err != MDBX_SUCCESS)) + return err; + + if (unlikely(data.iov_len < sizeof(tree_t))) { + NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)name->iov_len, + (const char *)name->iov_base, txn->txnid, "wrong record-size"); + return MDBX_INCOMPATIBLE /* not a named DB */; } - rc = node_read(&couple.outer, nsr.node, &data, couple.outer.pg[couple.outer.top]); - if (unlikely(rc != MDBX_SUCCESS)) - return rc; - - if (unlikely(data.iov_len != sizeof(tree_t))) { - NOTICE("dbi %zu refs to not a named table `%.*s` for txn %" PRIaTXN " (%s)", dbi, (int)kvx->name.iov_len, - (const char *)kvx->name.iov_base, txn->txnid, "wrong rec-size"); - return MDBX_INCOMPATIBLE; /* not a named DB */ - } - - uint16_t flags = UNALIGNED_PEEK_16(data.iov_base, tree_t, flags); + const unsigned db_flags = UNALIGNED_PEEK_16(data.iov_base, tree_t, flags); + const pgno_t db_root_pgno = peek_pgno(ptr_disp(data.iov_base, offsetof(tree_t, root))); /* The txn may not know this DBI, or another process may * have dropped and recreated the DB with other flags. */ - tree_t *const db = &txn->dbs[dbi]; - if (unlikely((db->flags & DB_PERSISTENT_FLAGS) != flags)) { + if (unlikely((wanna_flags ^ db_flags) & DB_PERSISTENT_FLAGS) && !(wanna_flags & MDBX_DB_ACCEDE) && + !((wanna_flags & MDBX_CREATE) && db_root_pgno == P_INVALID)) { NOTICE("dbi %zu refs to the re-created table `%.*s` for txn %" PRIaTXN " with different flags (present 0x%X != wanna 0x%X)", - dbi, (int)kvx->name.iov_len, (const char *)kvx->name.iov_base, txn->txnid, db->flags & DB_PERSISTENT_FLAGS, - flags); - return MDBX_INCOMPATIBLE; + dbi, (int)name->iov_len, (const char *)name->iov_base, txn->txnid, db_flags & DB_PERSISTENT_FLAGS, + wanna_flags & DB_PERSISTENT_FLAGS); + return MDBX_INCOMPATIBLE /* not a named DB */; } + tree_t *const db = &txn->dbs[dbi]; memcpy(db, data.iov_base, sizeof(tree_t)); #if !MDBX_DISABLE_VALIDATION - const txnid_t pp_txnid = couple.outer.pg[couple.outer.top]->txnid; - tASSERT(txn, txn->front_txnid >= pp_txnid); - if (unlikely(db->mod_txnid > pp_txnid)) { - ERROR("db.mod_txnid (%" PRIaTXN ") > page-txnid (%" PRIaTXN ")", db->mod_txnid, pp_txnid); + const txnid_t maindb_leafpage_txnid = mc->pg[mc->top]->txnid; + tASSERT(txn, txn->front_txnid >= maindb_leafpage_txnid); + if (unlikely(db->mod_txnid > maindb_leafpage_txnid)) { + ERROR("db.mod_txnid (%" PRIaTXN ") > page-txnid (%" PRIaTXN ")", db->mod_txnid, maindb_leafpage_txnid); return MDBX_CORRUPTED; } #endif /* !MDBX_DISABLE_VALIDATION */ - rc = tbl_setup_ifneed(txn->env, kvx, db); + + return MDBX_SUCCESS; +} + +int tbl_create(MDBX_txn *txn, MDBX_cursor *mc, size_t slot, const MDBX_val *name, unsigned db_flags) { + tASSERT(txn, db_flags & MDBX_CREATE); + MDBX_val body; + body.iov_base = memset(&txn->dbs[slot], 0, body.iov_len = sizeof(tree_t)); + txn->dbs[slot].root = P_INVALID; + txn->dbs[slot].mod_txnid = txn->txnid; + txn->dbs[slot].flags = db_flags & DB_PERSISTENT_FLAGS; + mc->next = txn->cursors[MAIN_DBI]; + txn->cursors[MAIN_DBI] = mc; + int err = cursor_put_checklen(mc, name, &body, N_TREE | MDBX_NOOVERWRITE); + txn->cursors[MAIN_DBI] = mc->next; + if (likely(err == MDBX_SUCCESS)) { + txn->flags |= MDBX_TXN_DIRTY; + tASSERT(txn, (txn->dbi_state[MAIN_DBI] & DBI_DIRTY) != 0); + } + return err; +} + +int tbl_refresh(MDBX_txn *txn, size_t dbi) { + cursor_couple_t couple; + kvx_t *const kvx = &txn->env->kvs[dbi]; + int rc = tbl_fetch(txn, &couple.outer, dbi, &kvx->name, txn->dbs[dbi].flags); + if (likely(rc != MDBX_SUCCESS)) + return dbi_gone(txn, dbi, rc); + + rc = tbl_setup_ifneed(txn->env, kvx, &txn->dbs[dbi]); if (unlikely(rc != MDBX_SUCCESS)) - return rc; + return dbi_gone(txn, dbi, rc); if (unlikely(dbi_changed(txn, dbi))) return MDBX_BAD_DBI; diff --git a/src/tree-search.c b/src/tree-search.c index 9cc68d7d..47a54c46 100644 --- a/src/tree-search.c +++ b/src/tree-search.c @@ -39,7 +39,7 @@ __hot int tree_search(MDBX_cursor *mc, const MDBX_val *key, int flags) { const size_t dbi = cursor_dbi(mc); if (unlikely(*cursor_dbi_state(mc) & DBI_STALE)) { - err = tbl_fetch(mc->txn, dbi); + err = tbl_refresh_absent2baddbi(mc->txn, dbi); if (unlikely(err != MDBX_SUCCESS)) goto bailout; } diff --git a/src/txn.c b/src/txn.c index b2657832..54d41691 100644 --- a/src/txn.c +++ b/src/txn.c @@ -47,7 +47,7 @@ int txn_shadow_cursors(const MDBX_txn *parent, const size_t dbi) { int err = cursor_shadow(cursor, txn, dbi); if (unlikely(err != MDBX_SUCCESS)) { /* не получилось забекапить курсоры */ - txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO | DBI_STALE; + txn->dbi_state[dbi] = DBI_OLDEN | DBI_LINDO; txn->flags |= MDBX_TXN_ERROR; return err; }