diff --git a/src/dbi.c b/src/dbi.c index ef7a1411..46f42cd8 100644 --- a/src/dbi.c +++ b/src/dbi.c @@ -379,7 +379,7 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi, MD slot = (slot < scan) ? slot : scan; continue; } - if (!env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name)) { + if (env->kvs[MAIN_DBI].clc.k.cmp(&name, &env->kvs[scan].name) == 0) { slot = scan; int err = dbi_check(txn, slot); if (err == MDBX_BAD_DBI && txn->dbi_state[slot] == (DBI_OLDEN | DBI_LINDO)) { @@ -535,54 +535,68 @@ int dbi_open(MDBX_txn *txn, const MDBX_val *const name, unsigned user_flags, MDB #if MDBX_ENABLE_DBI_LOCKFREE /* Is the DB already open? */ const MDBX_env *const env = txn->env; - size_t free_slot = env->n_dbi; + bool have_free_slot = env->n_dbi < env->max_dbi; for (size_t i = CORE_DBS; i < env->n_dbi; ++i) { - retry: if ((env->dbs_flags[i] & DB_VALID) == 0) { - free_slot = i; + have_free_slot = true; continue; } - const uint32_t snap_seq = atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease); - const uint16_t snap_flags = env->dbs_flags[i]; + struct dbi_snap_result snap = dbi_snap(env, i); const MDBX_val snap_name = env->kvs[i].name; - if (user_flags != MDBX_ACCEDE && - (((user_flags ^ snap_flags) & DB_PERSISTENT_FLAGS) || (keycmp && keycmp != env->kvs[i].clc.k.cmp) || - (datacmp && datacmp != env->kvs[i].clc.v.cmp))) - continue; const uint32_t main_seq = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease); MDBX_cmp_func *const snap_cmp = env->kvs[MAIN_DBI].clc.k.cmp; - if (unlikely(!(snap_flags & DB_VALID) || !snap_name.iov_base || !snap_name.iov_len || !snap_cmp)) - continue; + if (unlikely(!(snap.flags & DB_VALID) || !snap_name.iov_base || !snap_name.iov_len || !snap_cmp)) + /* похоже на столкновение с параллельно работающим обновлением */ + goto slowpath_locking; const bool name_match = snap_cmp(&snap_name, name) == 0; - osal_flush_incoherent_cpu_writeback(); - if (unlikely(snap_seq != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) || + if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) || main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) || - snap_flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base || + snap.flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base || snap_name.iov_len != env->kvs[i].name.iov_len)) - goto retry; - if (name_match) { + /* похоже на столкновение с параллельно работающим обновлением */ + goto slowpath_locking; + + if (!name_match) + continue; + + osal_flush_incoherent_cpu_writeback(); + if (user_flags != MDBX_ACCEDE && + (((user_flags ^ snap.flags) & DB_PERSISTENT_FLAGS) || (keycmp && keycmp != env->kvs[i].clc.k.cmp) || + (datacmp && datacmp != env->kvs[i].clc.v.cmp))) + /* есть подозрение что пользователь открывает таблицу с другими флагами/атрибутами + * или другими компараторами, поэтому уходим в безопасный режим */ + goto slowpath_locking; + + rc = dbi_check(txn, i); + if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) { + /* хендл использовался, стал невалидным, + * но теперь явно пере-открывается в этой транзакци */ + eASSERT(env, !txn->cursors[i]); + txn->dbi_state[i] = DBI_LINDO; rc = dbi_check(txn, i); - if (rc == MDBX_BAD_DBI && txn->dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) { - /* хендл использовался, стал невалидным, - * но теперь явно пере-открывается в этой транзакци */ - eASSERT(env, !txn->cursors[i]); - txn->dbi_state[i] = DBI_LINDO; - rc = dbi_check(txn, i); - } - if (likely(rc == MDBX_SUCCESS)) { - rc = dbi_bind(txn, i, user_flags, keycmp, datacmp); - if (likely(rc == MDBX_SUCCESS)) - *dbi = (MDBX_dbi)i; - } - return rc; } + if (likely(rc == MDBX_SUCCESS)) { + if (unlikely(snap.sequence != atomic_load32(&env->dbi_seqs[i], mo_AcquireRelease) || + main_seq != atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease) || + snap.flags != env->dbs_flags[i] || snap_name.iov_base != env->kvs[i].name.iov_base || + snap_name.iov_len != env->kvs[i].name.iov_len)) + /* похоже на столкновение с параллельно работающим обновлением */ + goto slowpath_locking; + rc = dbi_bind(txn, i, user_flags, keycmp, datacmp); + if (likely(rc == MDBX_SUCCESS)) + *dbi = (MDBX_dbi)i; + } + return rc; } /* Fail, if no free slot and max hit */ - if (unlikely(free_slot >= env->max_dbi)) + if (unlikely(!have_free_slot)) return MDBX_DBS_FULL; + +slowpath_locking: + #endif /* MDBX_ENABLE_DBI_LOCKFREE */ rc = osal_fastmutex_acquire(&txn->env->dbi_lock);