mdbx: отложенное освобождение имен связанных c dbi-хендлами и добавление опции MDBX_ENABLE_DBI_LOCKFREE.

Отложенное освобождение позволяет реализовать безопасное выполнение
fastpath/lockfree при повторном открытии из других потоков/транзакцйий
уже открытых subDB, что и происходит при активации добавленной опции
сборки `MDBX_ENABLE_DBI_LOCKFREE`.
This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2023-11-04 23:45:29 +03:00
parent 3622669a9f
commit 96504bf338
5 changed files with 173 additions and 40 deletions

View File

@ -532,6 +532,7 @@ add_mdbx_option(MDBX_ENABLE_PGOP_STAT "Gathering statistics for page operations"
add_mdbx_option(MDBX_ENABLE_PROFGC "Profiling of GC search and updates" OFF)
mark_as_advanced(MDBX_ENABLE_PROFGC)
add_mdbx_option(MDBX_ENABLE_DBI_SPARSE "FIXME" ON)
add_mdbx_option(MDBX_ENABLE_DBI_LOCKFREE "FIXME" ON)
if(NOT MDBX_AMALGAMATED_SOURCE)
if(CMAKE_CONFIGURATION_TYPES OR CMAKE_BUILD_TYPE_UPPERCASE STREQUAL "DEBUG")

View File

@ -34,6 +34,7 @@
#cmakedefine01 MDBX_ENABLE_PGOP_STAT
#cmakedefine01 MDBX_ENABLE_PROFGC
#cmakedefine01 MDBX_ENABLE_DBI_SPARSE
#cmakedefine01 MDBX_ENABLE_DBI_LOCKFREE
/* Windows */
#cmakedefine01 MDBX_WITHOUT_MSVC_CRT

View File

@ -3781,6 +3781,58 @@ MDBX_MAYBE_UNUSED static bool cursor_is_tracked(const MDBX_cursor *mc) {
*tracking_head = tracked->mc_next; \
} while (0)
static int
env_defer_free_and_release(MDBX_env *const env,
struct mdbx_defer_free_item *const chain) {
size_t length = 0;
struct mdbx_defer_free_item *obsolete_chain = nullptr;
#if MDBX_ENABLE_DBI_LOCKFREE
const uint64_t now = osal_monotime();
struct mdbx_defer_free_item **scan = &env->me_defer_free;
if (env->me_defer_free) {
const uint64_t threshold_1second = osal_16dot16_to_monotime(1 * 65536);
do {
struct mdbx_defer_free_item *item = *scan;
if (now - item->timestamp < threshold_1second) {
scan = &item->next;
length += 1;
} else {
*scan = item->next;
item->next = obsolete_chain;
obsolete_chain = item;
}
} while (*scan);
}
eASSERT(env, *scan == nullptr);
if (chain) {
struct mdbx_defer_free_item *item = chain;
do {
item->timestamp = now;
item = item->next;
} while (item);
*scan = chain;
}
#else /* MDBX_ENABLE_DBI_LOCKFREE */
obsolete_chain = chain;
#endif /* MDBX_ENABLE_DBI_LOCKFREE */
ENSURE(env, osal_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
if (length > 42) {
#if defined(_WIN32) || defined(_WIN64)
SwitchToThread();
#else
sched_yield();
#endif /* Windows */
}
while (obsolete_chain) {
struct mdbx_defer_free_item *item = obsolete_chain;
obsolete_chain = obsolete_chain->next;
osal_free(item);
}
return chain ? MDBX_SUCCESS : MDBX_BAD_DBI;
}
#if MDBX_ENABLE_DBI_SPARSE
static __inline size_t dbi_bitmap_ctz(const MDBX_txn *txn, intptr_t bmi) {
@ -4136,7 +4188,7 @@ static int dbi_update(MDBX_txn *txn, int keep) {
MDBX_env *const env = txn->mt_env;
tASSERT(txn, !txn->mt_parent && txn == env->me_txn0);
bool locked = false;
void *defer_free = nullptr;
struct mdbx_defer_free_item *defer_chain = nullptr;
TXN_FOREACH_DBI_USER(txn, dbi) {
if (likely((txn->mt_dbi_state[dbi] & DBI_CREAT) == 0))
continue;
@ -4154,15 +4206,15 @@ static int dbi_update(MDBX_txn *txn, int keep) {
env->me_db_flags[dbi] = txn->mt_dbs[dbi].md_flags | DB_VALID;
} else {
uint32_t seq = dbi_seq_next(env, dbi);
void *ptr = env->me_dbxs[dbi].md_name.iov_base;
if (ptr) {
struct mdbx_defer_free_item *item = env->me_dbxs[dbi].md_name.iov_base;
if (item) {
env->me_db_flags[dbi] = 0;
env->me_dbxs[dbi].md_name.iov_len = 0;
env->me_dbxs[dbi].md_name.iov_base = nullptr;
atomic_store32(&env->me_dbi_seqs[dbi], seq, mo_AcquireRelease);
osal_flush_incoherent_cpu_writeback();
osal_free(defer_free);
defer_free = ptr;
item->next = defer_chain;
defer_chain = item;
} else {
eASSERT(env, env->me_dbxs[dbi].md_name.iov_len == 0);
eASSERT(env, env->me_db_flags[dbi] == 0);
@ -4179,9 +4231,7 @@ static int dbi_update(MDBX_txn *txn, int keep) {
!env->me_dbxs[i].md_name.iov_base);
}
env->me_numdbs = (unsigned)i;
ENSURE(txn->mt_env,
osal_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
osal_free(defer_free);
env_defer_free_and_release(env, defer_chain);
}
return MDBX_SUCCESS;
}
@ -15651,6 +15701,14 @@ __cold static int env_close(MDBX_env *env) {
env->me_txkey = (osal_thread_key_t)0;
}
#if MDBX_ENABLE_DBI_LOCKFREE
for (struct mdbx_defer_free_item *next, *ptr = env->me_defer_free; ptr;
ptr = next) {
next = ptr->next;
osal_free(ptr);
}
#endif /* MDBX_ENABLE_DBI_LOCKFREE */
munlock_all(env);
if (!(env->me_flags & MDBX_RDONLY))
osal_ioring_destroy(&env->me_ioring);
@ -23057,7 +23115,9 @@ static int dbi_open_locked(MDBX_txn *txn, unsigned user_flags, MDBX_dbi *dbi,
/* Done here so we cannot fail after creating a new DB */
void *clone = nullptr;
if (name.iov_len) {
clone = osal_malloc(name.iov_len);
clone = osal_malloc((name.iov_len > sizeof(struct mdbx_defer_free_item))
? name.iov_len
: sizeof(struct mdbx_defer_free_item));
if (unlikely(!clone))
return MDBX_ENOMEM;
name.iov_base = memcpy(clone, name.iov_base, name.iov_len);
@ -23174,6 +23234,67 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const name,
txn->mt_env->me_leaf_nodemax - NODESIZE - sizeof(MDBX_db)))
return MDBX_EINVAL;
#if MDBX_ENABLE_DBI_LOCKFREE
/* Is the DB already open? */
const MDBX_env *const env = txn->mt_env;
size_t free_slot = env->me_numdbs;
for (size_t i = CORE_DBS; i < env->me_numdbs; ++i) {
retry:
if ((env->me_db_flags[i] & DB_VALID) == 0) {
free_slot = i;
continue;
}
const uint32_t snap_seq =
atomic_load32(&env->me_dbi_seqs[i], mo_AcquireRelease);
const uint16_t snap_flags = env->me_db_flags[i];
const MDBX_val snap_name = env->me_dbxs[i].md_name;
if (user_flags != MDBX_ACCEDE &&
(((user_flags ^ snap_flags) & DB_PERSISTENT_FLAGS) ||
(keycmp && keycmp != env->me_dbxs[i].md_cmp) ||
(datacmp && datacmp != env->me_dbxs[i].md_dcmp)))
continue;
const uint32_t main_seq =
atomic_load32(&env->me_dbi_seqs[MAIN_DBI], mo_AcquireRelease);
MDBX_cmp_func *const snap_cmp = env->me_dbxs[MAIN_DBI].md_cmp;
if (unlikely(!(snap_flags & DB_VALID) || !snap_name.iov_base ||
!snap_name.iov_len || !snap_cmp))
continue;
const bool name_match = snap_cmp(&snap_name, name) == 0;
osal_flush_incoherent_cpu_writeback();
if (unlikely(snap_seq !=
atomic_load32(&env->me_dbi_seqs[i], mo_AcquireRelease) ||
main_seq != atomic_load32(&env->me_dbi_seqs[MAIN_DBI],
mo_AcquireRelease) ||
snap_flags != env->me_db_flags[i] ||
snap_name.iov_base != env->me_dbxs[i].md_name.iov_base ||
snap_name.iov_len != env->me_dbxs[i].md_name.iov_len))
goto retry;
if (name_match) {
rc = dbi_check(txn, i);
if (rc == MDBX_BAD_DBI &&
txn->mt_dbi_state[i] == (DBI_OLDEN | DBI_LINDO)) {
/* хендл использовался, стал невалидным,
* но теперь явно пере-открывается в этой транзакци */
eASSERT(env, !txn->mt_cursors[i]);
txn->mt_dbi_state[i] = DBI_LINDO;
rc = dbi_check(txn, i);
}
if (likely(rc == MDBX_SUCCESS)) {
rc = dbi_bind(txn, i, user_flags, keycmp, datacmp);
if (likely(rc == MDBX_SUCCESS))
*dbi = (MDBX_dbi)i;
}
return rc;
}
}
/* Fail, if no free slot and max hit */
if (unlikely(free_slot >= env->me_maxdbs))
return MDBX_DBS_FULL;
#endif /* MDBX_ENABLE_DBI_LOCKFREE */
rc = osal_fastmutex_acquire(&txn->mt_env->me_dbi_lock);
if (likely(rc == MDBX_SUCCESS)) {
rc = dbi_open_locked(txn, user_flags, dbi, keycmp, datacmp, *name);
@ -23251,35 +23372,35 @@ __cold int mdbx_dbi_stat(const MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *dest,
return MDBX_SUCCESS;
}
static int dbi_close_locked(MDBX_env *env, MDBX_dbi dbi) {
static struct mdbx_defer_free_item *dbi_close_locked(MDBX_env *env,
MDBX_dbi dbi) {
eASSERT(env, dbi >= CORE_DBS);
if (unlikely(dbi >= env->me_numdbs))
return MDBX_BAD_DBI;
return nullptr;
char *const ptr = env->me_dbxs[dbi].md_name.iov_base;
/* If there was no name, this was already closed */
if (unlikely(!ptr))
return MDBX_BAD_DBI;
const uint32_t seq = dbi_seq_next(env, dbi);
struct mdbx_defer_free_item *defer_item = env->me_dbxs[dbi].md_name.iov_base;
if (likely(defer_item)) {
env->me_db_flags[dbi] = 0;
env->me_dbxs[dbi].md_name.iov_len = 0;
env->me_dbxs[dbi].md_name.iov_base = nullptr;
atomic_store32(&env->me_dbi_seqs[dbi], seq, mo_AcquireRelease);
osal_flush_incoherent_cpu_writeback();
defer_item->next = nullptr;
env->me_db_flags[dbi] = 0;
env->me_dbxs[dbi].md_name.iov_len = 0;
osal_memory_fence(mo_AcquireRelease, true);
env->me_dbxs[dbi].md_name.iov_base = NULL;
osal_flush_incoherent_cpu_writeback();
osal_free(ptr);
if (env->me_numdbs == dbi + 1) {
size_t i = env->me_numdbs;
do {
--i;
eASSERT(env, i >= CORE_DBS);
eASSERT(env, !env->me_db_flags[i] && !env->me_dbxs[i].md_name.iov_len &&
!env->me_dbxs[i].md_name.iov_base);
} while ((env->me_db_flags[i - 1] & DB_VALID) == 0);
env->me_numdbs = (unsigned)i;
if (env->me_numdbs == dbi + 1) {
size_t i = env->me_numdbs;
do {
--i;
eASSERT(env, i >= CORE_DBS);
eASSERT(env, !env->me_db_flags[i] && !env->me_dbxs[i].md_name.iov_len &&
!env->me_dbxs[i].md_name.iov_base);
} while (i > CORE_DBS && !env->me_dbxs[i - 1].md_name.iov_base);
env->me_numdbs = (unsigned)i;
}
}
return MDBX_SUCCESS;
return defer_item;
}
int mdbx_dbi_close(MDBX_env *env, MDBX_dbi dbi) {
@ -23297,12 +23418,8 @@ int mdbx_dbi_close(MDBX_env *env, MDBX_dbi dbi) {
return MDBX_BAD_DBI;
rc = osal_fastmutex_acquire(&env->me_dbi_lock);
if (likely(rc == MDBX_SUCCESS)) {
rc = (dbi < env->me_maxdbs && (env->me_db_flags[dbi] & DB_VALID))
? dbi_close_locked(env, dbi)
: MDBX_BAD_DBI;
ENSURE(env, osal_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
}
if (likely(rc == MDBX_SUCCESS))
rc = env_defer_free_and_release(env, dbi_close_locked(env, dbi));
return rc;
}
@ -23449,8 +23566,7 @@ int mdbx_drop(MDBX_txn *txn, MDBX_dbi dbi, bool del) {
MDBX_env *env = txn->mt_env;
rc = osal_fastmutex_acquire(&env->me_dbi_lock);
if (likely(rc == MDBX_SUCCESS)) {
dbi_close_locked(env, dbi);
ENSURE(env, osal_fastmutex_release(&env->me_dbi_lock) == MDBX_SUCCESS);
rc = env_defer_free_and_release(env, dbi_close_locked(env, dbi));
goto bailout;
}
}

View File

@ -1348,6 +1348,11 @@ typedef struct MDBX_cursor_couple {
MDBX_xcursor inner;
} MDBX_cursor_couple;
struct mdbx_defer_free_item {
struct mdbx_defer_free_item *next;
uint64_t timestamp;
};
/* The database environment. */
struct MDBX_env {
/* ----------------------------------------------------- mostly static part */
@ -1452,6 +1457,9 @@ struct MDBX_env {
bool me_prefault_write;
MDBX_env *me_lcklist_next;
#if MDBX_ENABLE_DBI_LOCKFREE
struct mdbx_defer_free_item *me_defer_free;
#endif /* MDBX_ENABLE_DBI_LOCKFREE */
/* --------------------------------------------------- mostly volatile part */

View File

@ -170,6 +170,13 @@
#error MDBX_ENABLE_DBI_SPARSE must be defined as 0 or 1
#endif /* MDBX_ENABLE_DBI_SPARSE */
/** FIXME */
#ifndef MDBX_ENABLE_DBI_LOCKFREE
#define MDBX_ENABLE_DBI_LOCKFREE 1
#elif !(MDBX_ENABLE_DBI_LOCKFREE == 0 || MDBX_ENABLE_DBI_LOCKFREE == 1)
#error MDBX_ENABLE_DBI_LOCKFREE must be defined as 0 or 1
#endif /* MDBX_ENABLE_DBI_LOCKFREE */
/** Controls sort order of internal page number lists.
* This mostly experimental/advanced option with not for regular MDBX users.
* \warning The database format depend on this option and libmdbx built with