mdbx: пересоздание пустой MAIN_DBI при необходимости.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-12-17 21:03:31 +03:00
parent 9cbbdfa025
commit 85828f677a

View File

@ -8861,6 +8861,8 @@ static int txn_renew(MDBX_txn *txn, const unsigned flags) {
(db_flags & DB_VALID) ? DBI_VALID | DBI_USRVALID | DBI_STALE : 0;
}
txn->mt_dbistate[MAIN_DBI] = DBI_VALID | DBI_USRVALID;
rc =
setup_dbx(&txn->mt_dbxs[MAIN_DBI], &txn->mt_dbs[MAIN_DBI], env->me_psize);
txn->mt_dbistate[FREE_DBI] = DBI_VALID;
txn->mt_front =
txn->mt_txnid + ((flags & (MDBX_WRITEMAP | MDBX_RDONLY)) == 0);
@ -14493,6 +14495,10 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
env_pathname.ent_len * sizeof(pathchar_t));
env->me_dbxs[FREE_DBI].md_cmp = cmp_int_align4; /* aligned MDBX_INTEGERKEY */
env->me_dbxs[FREE_DBI].md_dcmp = cmp_lenfast;
env->me_dbxs[FREE_DBI].md_klen_max = env->me_dbxs[FREE_DBI].md_klen_min = 8;
env->me_dbxs[FREE_DBI].md_vlen_min = 4;
env->me_dbxs[FREE_DBI].md_vlen_max =
mdbx_env_get_maxvalsize_ex(env, MDBX_INTEGERKEY);
/* Использование O_DSYNC или FILE_FLAG_WRITE_THROUGH:
*
@ -22152,15 +22158,27 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
if (unlikely(!dbi))
return rc;
void *clone = nullptr;
bool locked = false;
if (unlikely((user_flags & ~DB_USABLE_FLAGS) != 0)) {
early_bailout:
bailout:
tASSERT(txn, MDBX_IS_ERROR(rc));
*dbi = 0;
if (locked)
ENSURE(txn->mt_env,
osal_fastmutex_release(&txn->mt_env->me_dbi_lock) == MDBX_SUCCESS);
osal_free(clone);
return rc;
}
rc = check_txn(txn, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
goto bailout;
if ((user_flags & MDBX_CREATE) && unlikely(txn->mt_flags & MDBX_TXN_RDONLY)) {
rc = MDBX_EACCESS;
goto bailout;
}
switch (user_flags & (MDBX_INTEGERDUP | MDBX_DUPFIXED | MDBX_DUPSORT |
MDBX_REVERSEDUP | MDBX_ACCEDE)) {
@ -22170,7 +22188,7 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
__fallthrough /* fall through */;
default:
rc = MDBX_EINVAL;
goto early_bailout;
goto bailout;
case MDBX_DUPSORT:
case MDBX_DUPSORT | MDBX_REVERSEDUP:
@ -22187,21 +22205,21 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
table_name->iov_base == MDBX_PGWALK_MAIN) {
rc = dbi_bind(txn, MAIN_DBI, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
goto bailout;
*dbi = MAIN_DBI;
return rc;
}
if (table_name == MDBX_PGWALK_GC || table_name->iov_base == MDBX_PGWALK_GC) {
rc = dbi_bind(txn, FREE_DBI, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
goto bailout;
*dbi = FREE_DBI;
return rc;
}
if (table_name == MDBX_PGWALK_META ||
table_name->iov_base == MDBX_PGWALK_META) {
rc = MDBX_EINVAL;
goto early_bailout;
goto bailout;
}
MDBX_val key = *table_name;
@ -22209,13 +22227,34 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
if (key.iov_len > env->me_leaf_nodemax - NODESIZE - sizeof(MDBX_db))
return MDBX_EINVAL;
if (txn->mt_dbxs[MAIN_DBI].md_cmp == NULL) {
/* Cannot mix named table(s) with DUPSORT flags */
if (unlikely(txn->mt_dbs[MAIN_DBI].md_flags & MDBX_DUPSORT)) {
if ((user_flags & MDBX_CREATE) == 0) {
rc = MDBX_NOTFOUND;
goto bailout;
}
if (txn->mt_dbs[MAIN_DBI].md_leaf_pages || txn->mt_dbxs[MAIN_DBI].md_cmp) {
/* В MAIN_DBI есть записи либо она уже использовалась. */
rc = MDBX_INCOMPATIBLE;
goto bailout;
}
/* Пересоздаём MAIN_DBI если там пусто. */
atomic_store32(&txn->mt_dbiseqs[MAIN_DBI], dbi_seq(env, MAIN_DBI),
mo_AcquireRelease);
tASSERT(txn, txn->mt_dbs[MAIN_DBI].md_depth == 0 &&
txn->mt_dbs[MAIN_DBI].md_entries == 0 &&
txn->mt_dbs[MAIN_DBI].md_root == P_INVALID);
txn->mt_dbs[MAIN_DBI].md_flags &= MDBX_REVERSEKEY | MDBX_INTEGERKEY;
txn->mt_dbistate[MAIN_DBI] |= DBI_DIRTY;
txn->mt_flags |= MDBX_TXN_DIRTY;
txn->mt_dbxs[MAIN_DBI].md_cmp =
get_default_keycmp(txn->mt_dbs[MAIN_DBI].md_flags);
txn->mt_dbxs[MAIN_DBI].md_dcmp =
get_default_datacmp(txn->mt_dbs[MAIN_DBI].md_flags);
}
tASSERT(txn, txn->mt_dbxs[MAIN_DBI].md_cmp);
/* Is the DB already open? */
MDBX_dbi scan, slot;
for (slot = scan = txn->mt_numdbs; --scan >= CORE_DBS;) {
@ -22229,7 +22268,7 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
key.iov_len)) {
rc = dbi_bind(txn, scan, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
goto bailout;
*dbi = scan;
return rc;
}
@ -22238,14 +22277,7 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
/* Fail, if no free slot and max hit */
if (unlikely(slot >= env->me_maxdbs)) {
rc = MDBX_DBS_FULL;
goto early_bailout;
}
/* Cannot mix named table with some main-table flags */
if (unlikely(txn->mt_dbs[MAIN_DBI].md_flags &
(MDBX_DUPSORT | MDBX_INTEGERKEY))) {
rc = (user_flags & MDBX_CREATE) ? MDBX_INCOMPATIBLE : MDBX_NOTFOUND;
goto early_bailout;
goto bailout;
}
/* Find the DB info */
@ -22253,37 +22285,36 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
MDBX_cursor_couple couple;
rc = cursor_init(&couple.outer, txn, MAIN_DBI);
if (unlikely(rc != MDBX_SUCCESS))
goto early_bailout;
goto bailout;
rc = cursor_set(&couple.outer, &key, &data, MDBX_SET).err;
if (unlikely(rc != MDBX_SUCCESS)) {
if (rc != MDBX_NOTFOUND || !(user_flags & MDBX_CREATE))
goto early_bailout;
goto bailout;
} else {
/* make sure this is actually a table */
MDBX_node *node = page_node(couple.outer.mc_pg[couple.outer.mc_top],
couple.outer.mc_ki[couple.outer.mc_top]);
if (unlikely((node_flags(node) & (F_DUPDATA | F_SUBDATA)) != F_SUBDATA)) {
rc = MDBX_INCOMPATIBLE;
goto early_bailout;
goto bailout;
}
if (!MDBX_DISABLE_VALIDATION && unlikely(data.iov_len != sizeof(MDBX_db))) {
rc = MDBX_CORRUPTED;
goto early_bailout;
goto bailout;
}
}
if (rc != MDBX_SUCCESS && unlikely(txn->mt_flags & MDBX_TXN_RDONLY)) {
rc = MDBX_EACCESS;
goto early_bailout;
goto bailout;
}
/* Done here so we cannot fail after creating a new DB */
void *clone = nullptr;
if (key.iov_len) {
clone = osal_malloc(key.iov_len);
if (unlikely(!clone)) {
rc = MDBX_ENOMEM;
goto early_bailout;
goto bailout;
}
key.iov_base = memcpy(clone, key.iov_base, key.iov_len);
} else
@ -22292,9 +22323,9 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
int err = osal_fastmutex_acquire(&env->me_dbi_lock);
if (unlikely(err != MDBX_SUCCESS)) {
rc = err;
osal_free(clone);
goto early_bailout;
goto bailout;
}
locked = true;
/* Import handles from env */
dbi_import_locked(txn);
@ -22311,15 +22342,15 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
key.iov_len)) {
rc = dbi_bind(txn, scan, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS))
goto later_bailout;
*dbi = scan;
goto later_exit;
goto bailout;
slot = scan;
goto done;
}
}
if (unlikely(slot >= env->me_maxdbs)) {
rc = MDBX_DBS_FULL;
goto later_bailout;
goto bailout;
}
unsigned dbiflags = DBI_FRESH | DBI_VALID | DBI_USRVALID;
@ -22336,9 +22367,8 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
WITH_CURSOR_TRACKING(couple.outer,
rc = mdbx_cursor_put(&couple.outer, &key, &data,
F_SUBDATA | MDBX_NOOVERWRITE));
if (unlikely(rc != MDBX_SUCCESS))
goto later_bailout;
goto bailout;
dbiflags |= DBI_DIRTY | DBI_CREAT;
txn->mt_flags |= MDBX_TXN_DIRTY;
@ -22352,31 +22382,28 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
rc = dbi_bind(txn, slot, user_flags, keycmp, datacmp);
if (unlikely(rc != MDBX_SUCCESS)) {
tASSERT(txn, (dbiflags & DBI_CREAT) == 0);
later_bailout:
*dbi = 0;
later_exit:
osal_free(clone);
} else {
txn->mt_dbistate[slot] = (uint8_t)dbiflags;
txn->mt_dbxs[slot].md_name = key;
txn->mt_dbiseqs[slot].weak = env->me_dbiseqs[slot].weak =
dbi_seq(env, slot);
if (!(dbiflags & DBI_CREAT))
env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | DB_VALID;
if (txn->mt_numdbs == slot) {
txn->mt_cursors[slot] = NULL;
osal_compiler_barrier();
txn->mt_numdbs = slot + 1;
}
if (env->me_numdbs <= slot) {
osal_memory_fence(mo_AcquireRelease, true);
env->me_numdbs = slot + 1;
}
*dbi = slot;
goto bailout;
}
txn->mt_dbistate[slot] = (uint8_t)dbiflags;
txn->mt_dbxs[slot].md_name = key;
txn->mt_dbiseqs[slot].weak = env->me_dbiseqs[slot].weak = dbi_seq(env, slot);
if (!(dbiflags & DBI_CREAT))
env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags | DB_VALID;
if (txn->mt_numdbs == slot) {
txn->mt_cursors[slot] = NULL;
osal_compiler_barrier();
txn->mt_numdbs = slot + 1;
}
if (env->me_numdbs <= slot) {
osal_memory_fence(mo_AcquireRelease, true);
env->me_numdbs = slot + 1;
}
done:
*dbi = slot;
ENSURE(env, osal_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
return rc;
return MDBX_SUCCESS;
}
static int dbi_open_cstr(MDBX_txn *txn, const char *name_cstr,