mdbx: переделка обновления/загрузки флагов MainDB с выделением latch_maindb_locked().

В новом коде:
 - нет необходимости в захвате глобальной txn-блокироки;
 - меньше вероятность коллизий/гонок.
This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2025-07-15 23:42:28 +03:00
parent 9c6eed615a
commit 8ebdc6d79b

View File

@ -87,6 +87,43 @@ static bool txn_check_overlapped(lck_t *const lck, const uint32_t pid, const uin
return false; return false;
} }
typedef struct seq_latch_result {
int err;
uint32_t seq;
} slr_t;
__cold static slr_t latch_maindb_locked(MDBX_txn *txn, MDBX_env *const env) {
slr_t slr = {.err = MDBX_SUCCESS, .seq = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease)};
if (env->dbs_flags[MAIN_DBI] & DB_VALID /* Флаги MainDB уже были загружены? */) {
/* Читаем и проверяем повторно после захвата dbi-блокировки */
if (env->dbs_flags[MAIN_DBI] == (DB_VALID | txn->dbs[MAIN_DBI].flags))
return slr;
if (env->basal_txn && env->txn != txn) {
/* Параллельно выполняется пишущая транзакция, которая вероятно и произвела изменение MainDB.
* Как-либо отказаться от использования новых атрибутов в рамках текущего процесса невозможно. */
ERROR("MainDB db-flags changes 0x%x -> 0x%x ahead of read-txn %" PRIaTXN, txn->dbs[MAIN_DBI].flags,
env->dbs_flags[MAIN_DBI] & ~DB_VALID, txn->txnid);
slr.err = MDBX_INCOMPATIBLE;
return slr;
}
NOTICE("renew MainDB for %s-txn %" PRIaTXN " since db-flags changes 0x%x -> 0x%x",
(txn->flags & MDBX_TXN_RDONLY) ? "ro" : "rw", txn->txnid, env->dbs_flags[MAIN_DBI] & ~DB_VALID,
txn->dbs[MAIN_DBI].flags);
slr.seq = dbi_seq_next(env, MAIN_DBI);
env->dbs_flags[MAIN_DBI] = DB_POISON;
atomic_store32(&env->dbi_seqs[MAIN_DBI], slr.seq, mo_AcquireRelease);
}
slr.err = tbl_setup(env, &env->kvs[MAIN_DBI], &txn->dbs[MAIN_DBI]);
if (likely(slr.seq == MDBX_SUCCESS)) {
slr.seq = dbi_seq_next(env, MAIN_DBI);
env->dbs_flags[MAIN_DBI] = DB_VALID | txn->dbs[MAIN_DBI].flags;
atomic_store32(&env->dbi_seqs[MAIN_DBI], slr.seq, mo_AcquireRelease);
}
return slr;
}
int txn_renew(MDBX_txn *txn, unsigned flags) { int txn_renew(MDBX_txn *txn, unsigned flags) {
MDBX_env *const env = txn->env; MDBX_env *const env = txn->env;
int rc; int rc;
@ -156,53 +193,18 @@ int txn_renew(MDBX_txn *txn, unsigned flags) {
txn->dbi_seqs[MAIN_DBI] = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease); txn->dbi_seqs[MAIN_DBI] = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease);
if (unlikely(env->dbs_flags[MAIN_DBI] != (DB_VALID | txn->dbs[MAIN_DBI].flags) || !txn->dbi_seqs[MAIN_DBI])) { if (unlikely(env->dbs_flags[MAIN_DBI] != (DB_VALID | txn->dbs[MAIN_DBI].flags) || !txn->dbi_seqs[MAIN_DBI])) {
const bool need_txn_lock = env->basal_txn && env->basal_txn->owner != osal_thread_self();
bool should_unlock = false;
if (need_txn_lock) {
rc = lck_txn_lock(env, true);
if (rc == MDBX_SUCCESS)
should_unlock = true;
else if (rc != MDBX_BUSY && rc != MDBX_EDEADLK)
goto bailout;
}
rc = osal_fastmutex_acquire(&env->dbi_lock); rc = osal_fastmutex_acquire(&env->dbi_lock);
if (likely(rc == MDBX_SUCCESS)) { if (unlikely(rc != MDBX_SUCCESS)) {
/* проверяем повторно после захвата блокировки */ ERROR("dbi_lock failed, err %d", rc);
uint32_t seq = atomic_load32(&env->dbi_seqs[MAIN_DBI], mo_AcquireRelease);
if (env->dbs_flags[MAIN_DBI] != (DB_VALID | txn->dbs[MAIN_DBI].flags)) {
if (!(env->dbs_flags[MAIN_DBI] & DB_VALID) || !need_txn_lock || should_unlock ||
/* если нет активной пишущей транзакции, * то следующая будет ждать на dbi_lock */ !env->txn) {
if (env->dbs_flags[MAIN_DBI] & DB_VALID) {
NOTICE("renew MainDB for %s-txn %" PRIaTXN " since db-flags changes 0x%x -> 0x%x",
(txn->flags & MDBX_TXN_RDONLY) ? "ro" : "rw", txn->txnid, env->dbs_flags[MAIN_DBI] & ~DB_VALID,
txn->dbs[MAIN_DBI].flags);
seq = dbi_seq_next(env, MAIN_DBI);
env->dbs_flags[MAIN_DBI] = DB_POISON;
atomic_store32(&env->dbi_seqs[MAIN_DBI], seq, mo_AcquireRelease);
}
rc = tbl_setup(env, &env->kvs[MAIN_DBI], &txn->dbs[MAIN_DBI]);
if (likely(rc == MDBX_SUCCESS)) {
seq = dbi_seq_next(env, MAIN_DBI);
env->dbs_flags[MAIN_DBI] = DB_VALID | txn->dbs[MAIN_DBI].flags;
atomic_store32(&env->dbi_seqs[MAIN_DBI], seq, mo_AcquireRelease);
}
} else {
ERROR("MainDB db-flags changes 0x%x -> 0x%x ahead of read-txn "
"%" PRIaTXN,
txn->dbs[MAIN_DBI].flags, env->dbs_flags[MAIN_DBI] & ~DB_VALID, txn->txnid);
rc = MDBX_INCOMPATIBLE;
}
}
txn->dbi_seqs[MAIN_DBI] = seq;
ENSURE(env, osal_fastmutex_release(&env->dbi_lock) == MDBX_SUCCESS);
} else {
DEBUG("dbi_lock failed, err %d", rc);
}
if (should_unlock)
lck_txn_unlock(env);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
}
slr_t slr = latch_maindb_locked(txn, env);
ENSURE(env, osal_fastmutex_release(&env->dbi_lock) == MDBX_SUCCESS);
if (unlikely((rc = slr.err) != MDBX_SUCCESS))
goto bailout;
txn->dbi_seqs[MAIN_DBI] = slr.seq;
} }
tASSERT(txn, check_table_flags(txn->dbs[MAIN_DBI].flags) && txn->dbi_seqs[MAIN_DBI]);
if (unlikely(txn->dbs[FREE_DBI].flags != MDBX_INTEGERKEY)) { if (unlikely(txn->dbs[FREE_DBI].flags != MDBX_INTEGERKEY)) {
ERROR("unexpected/invalid db-flags 0x%x for %s", txn->dbs[FREE_DBI].flags, "GC/FreeDB"); ERROR("unexpected/invalid db-flags 0x%x for %s", txn->dbs[FREE_DBI].flags, "GC/FreeDB");
@ -210,8 +212,6 @@ int txn_renew(MDBX_txn *txn, unsigned flags) {
goto bailout; goto bailout;
} }
tASSERT(txn, txn->dbs[FREE_DBI].flags == MDBX_INTEGERKEY);
tASSERT(txn, check_table_flags(txn->dbs[MAIN_DBI].flags));
if (unlikely(env->flags & ENV_FATAL_ERROR)) { if (unlikely(env->flags & ENV_FATAL_ERROR)) {
WARNING("%s", "environment had fatal error, must shutdown!"); WARNING("%s", "environment had fatal error, must shutdown!");
rc = MDBX_PANIC; rc = MDBX_PANIC;