mdbx: устранение возможности неверного возврата MDBX_DBS_FULL при открытии DBI-дескрипторов (backport).

В lockfree-пути открытия DBI-дескрипторов, при просмотре уже открытых
таблиц, пропускались элементы отличающиеся не только по имени, но также
и при несовпадении запрашиваемых флагов и актуальных флагов уже открытой
таблицы.

Если при этом уже было достигнуто (ранее заданное) максимальное
количество открытых DBI-дескрипторов, то возвращалась ошибка
`MDBX_DBS_FULL`, в том числе в ситуациях когда результат должен быть
другим.

Спасибо [Артёму Воротникову](https://github.com/vorot93) за сообщение о проблеме!
This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2025-07-19 01:26:37 +03:00
parent 68f9dc18be
commit 6cb9305b31

View File

@ -385,7 +385,7 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD
slot = (slot < scan) ? slot : scan; slot = (slot < scan) ? slot : scan;
continue; continue;
} }
if (!env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name)) { if (env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name) == 0) {
slot = scan; slot = scan;
int err = dbi_check(txn, slot); int err = dbi_check(txn, slot);
if (err == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) { if (err == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) {
@ -541,34 +541,40 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB
#if MDBX_ENABLE_DBI_LOCKFREE #if MDBX_ENABLE_DBI_LOCKFREE
/* Is the DB already open? */ /* Is the DB already open? */
const MDBX_env *const env = txn->env; const MDBX_env *const env = txn->env;
size_t free_slot = env->n_dbi; bool have_free_slot = env->n_dbi < env->max_dbi;
for (size_t i = CORE_DBS; i < env->n_dbi; ++i) { for (size_t i = CORE_DBS; i < env->n_dbi; ++i) {
retry:
if ((env->dbs_flags[i] & DB_VALID) == 0) { if ((env->dbs_flags[i] & DB_VALID) == 0) {
free_slot = i; have_free_slot = true;
continue; continue;
} }
const uint32_t snap_seq = atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease); struct dbi_snap_result snap = dbi_snap(env, i);
const uint16_t snap_flags = env->dbs_flags[i];
const MDBX_val snap_name = env->kvs[i].name; const MDBX_val snap_name = env->kvs[i].name;
if (user_flags != MDBX_ACCEDE &&
(((user_flags ^ snap_flags) & DB_PERSISTENT_FLAGS) || (keycmp && keycmp != env->kvs[i].clc.k.cmp) ||
(datacmp && datacmp != env->kvs[i].clc.v.cmp)))
continue;
const uint32_t main_seq = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease); const uint32_t main_seq = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease);
MDBX_cmp_func *const snap_cmp = env->kvs[MAIN_DBI].clc.k.cmp; MDBX_cmp_func *const snap_cmp = env->kvs[MAIN_DBI].clc.k.cmp;
if (unlikely(!(snap_flags & DB_VALID) || !snap_name.iov_base || !snap_name.iov_len || !snap_cmp)) if (unlikely(!(snap.flags & DB_VALID) || !snap_name.iov_base || !snap_name.iov_len || !snap_cmp))
continue; /* похоже на столкновение с параллельно работающим обновлением */
goto slowpath_locking;
const bool name_match = snap_cmp(&snap_name, name) == 0; const bool name_match = snap_cmp(&snap_name, name) == 0;
osal_flush_incoherent_cpu_writeback(); if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) ||
if (unlikely(snap_seq != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) ||
main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) || main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) ||
snap_flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base || snap.flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base ||
snap_name.iov_len != env->kvs[i].name.iov_len)) snap_name.iov_len != env->kvs[i].name.iov_len))
goto retry; /* похоже на столкновение с параллельно работающим обновлением */
if (name_match) { goto slowpath_locking;
if (!name_match)
continue;
osal_flush_incoherent_cpu_writeback();
if (user_flags != MDBX_ACCEDE &&
(((user_flags ^ snap.flags) & DB_PERSISTENT_FLAGS) || (keycmp && keycmp != env->kvs[i].clc.k.cmp) ||
(datacmp && datacmp != env->kvs[i].clc.v.cmp)))
/* есть подозрение что пользователь открывает таблицу с другими флагами/атрибутами
* или другими компараторами, поэтому уходим в безопасный режим */
goto slowpath_locking;
rc = dbi_check(txn, i); rc = dbi_check(txn, i);
if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) { if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) {
/* хендл использовался, стал невалидным, /* хендл использовался, стал невалидным,
@ -578,17 +584,25 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB
rc = dbi_check(txn, i); rc = dbi_check(txn, i);
} }
if (likely(rc == MDBX_SUCCESS)) { if (likely(rc == MDBX_SUCCESS)) {
if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) ||
main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) ||
snap.flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base ||
snap_name.iov_len != env->kvs[i].name.iov_len))
/* похоже на столкновение с параллельно работающим обновлением */
goto slowpath_locking;
rc = dbi_bind(txn, i, user_flags, keycmp, datacmp); rc = dbi_bind(txn, i, user_flags, keycmp, datacmp);
if (likely(rc == MDBX_SUCCESS)) if (likely(rc == MDBX_SUCCESS))
*dbi = (MDBX_dbi)i; *dbi = (MDBX_dbi)i;
} }
return rc; return rc;
} }
}
/* Fail, if no free slot and max hit */ /* Fail, if no free slot and max hit */
if (unlikely(free_slot >= env->max_dbi)) if (unlikely(!have_free_slot))
return MDBX_DBS_FULL; return MDBX_DBS_FULL;
slowpath_locking:
#endif /* MDBX_ENABLE_DBI_LOCKFREE */ #endif /* MDBX_ENABLE_DBI_LOCKFREE */
rc = osal_fastmutex_acquire(&txn->env->dbi_lock); rc = osal_fastmutex_acquire(&txn->env->dbi_lock);