mdbx: объединение lck-списка и rthc-таблицы для упрощения (де)регистрации TLS-деструкторов.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2023-11-11 23:54:21 +03:00
parent eddade7b99
commit 7ad54f54b4
2 changed files with 188 additions and 223 deletions

View File

@ -1128,10 +1128,12 @@ MDBX_MAYBE_UNUSED static
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
/* rthc (tls keys and destructors) */ /* rthc (tls keys and destructors) */
static int rthc_register(MDBX_env *const env);
static int rthc_remove(MDBX_env *const env);
static int rthc_uniq_check(const osal_mmap_t *pending, MDBX_env **found);
typedef struct rthc_entry_t { typedef struct rthc_entry_t {
MDBX_reader *begin; MDBX_env *env;
MDBX_reader *end;
osal_thread_key_t thr_tls_key;
} rthc_entry_t; } rthc_entry_t;
#if MDBX_DEBUG #if MDBX_DEBUG
@ -1144,10 +1146,8 @@ static bin128_t bootid;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
static CRITICAL_SECTION rthc_critical_section; static CRITICAL_SECTION rthc_critical_section;
static CRITICAL_SECTION lcklist_critical_section;
#else #else
static pthread_mutex_t lcklist_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t rthc_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t rthc_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t rthc_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t rthc_cond = PTHREAD_COND_INITIALIZER;
static osal_thread_key_t rthc_key; static osal_thread_key_t rthc_key;
@ -1346,17 +1346,24 @@ static void thread_rthc_set(osal_thread_key_t key, const void *value) {
/* dtor called for thread, i.e. for all mdbx's environment objects */ /* dtor called for thread, i.e. for all mdbx's environment objects */
__cold void thread_dtor(void *rthc) { __cold void thread_dtor(void *rthc) {
rthc_lock(); rthc_lock();
TRACE(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", osal_getpid(), const uint32_t self_pid = osal_getpid();
TRACE(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", self_pid,
osal_thread_self(), rthc); osal_thread_self(), rthc);
const uint32_t self_pid = osal_getpid();
for (size_t i = 0; i < rthc_count; ++i) { for (size_t i = 0; i < rthc_count; ++i) {
const osal_thread_key_t key = rthc_table[i].thr_tls_key; MDBX_env *const env = rthc_table[i].env;
MDBX_reader *const reader = thread_rthc_get(key); if (env->me_pid != self_pid)
if (reader < rthc_table[i].begin || reader >= rthc_table[i].end) continue;
if (!(env->me_flags & MDBX_ENV_TXKEY))
continue;
MDBX_reader *const reader = thread_rthc_get(env->me_txkey);
MDBX_reader *const begin = &env->me_lck_mmap.lck->mti_readers[0];
MDBX_reader *const end =
&env->me_lck_mmap.lck->mti_readers[env->me_maxreaders];
if (reader < begin || reader >= end)
continue; continue;
#if !defined(_WIN32) && !defined(_WIN64) #if !defined(_WIN32) && !defined(_WIN64)
if (pthread_setspecific(key, nullptr) != 0) { if (pthread_setspecific(env->me_txkey, nullptr) != 0) {
TRACE("== thread 0x%" PRIxPTR TRACE("== thread 0x%" PRIxPTR
", rthc %p: ignore race with tsd-key deletion", ", rthc %p: ignore race with tsd-key deletion",
osal_thread_self(), __Wpedantic_format_voidptr(reader)); osal_thread_self(), __Wpedantic_format_voidptr(reader));
@ -1368,13 +1375,13 @@ __cold void thread_dtor(void *rthc) {
", rthc %p, [%zi], %p ... %p (%+i), rtch-pid %i, " ", rthc %p, [%zi], %p ... %p (%+i), rtch-pid %i, "
"current-pid %i", "current-pid %i",
osal_thread_self(), __Wpedantic_format_voidptr(reader), i, osal_thread_self(), __Wpedantic_format_voidptr(reader), i,
__Wpedantic_format_voidptr(rthc_table[i].begin), __Wpedantic_format_voidptr(begin), __Wpedantic_format_voidptr(end),
__Wpedantic_format_voidptr(rthc_table[i].end), (int)(reader - begin), reader->mr_pid.weak, self_pid);
(int)(reader - rthc_table[i].begin), reader->mr_pid.weak, self_pid);
if (atomic_load32(&reader->mr_pid, mo_Relaxed) == self_pid) { if (atomic_load32(&reader->mr_pid, mo_Relaxed) == self_pid) {
TRACE("==== thread 0x%" PRIxPTR ", rthc %p, cleanup", osal_thread_self(), TRACE("==== thread 0x%" PRIxPTR ", rthc %p, cleanup", osal_thread_self(),
__Wpedantic_format_voidptr(reader)); __Wpedantic_format_voidptr(reader));
(void)atomic_cas32(&reader->mr_pid, self_pid, 0); (void)atomic_cas32(&reader->mr_pid, self_pid, 0);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true, mo_Relaxed);
} }
} }
@ -1419,14 +1426,15 @@ __cold void thread_dtor(void *rthc) {
MDBX_EXCLUDE_FOR_GPROF MDBX_EXCLUDE_FOR_GPROF
__cold void global_dtor(void) { __cold void global_dtor(void) {
TRACE(">> pid %d", osal_getpid()); const uint32_t self_pid = osal_getpid();
TRACE(">> pid %d", self_pid);
rthc_lock(); rthc_lock();
#if !defined(_WIN32) && !defined(_WIN64) #if !defined(_WIN32) && !defined(_WIN64)
uint64_t *rthc = pthread_getspecific(rthc_key); uint64_t *rthc = pthread_getspecific(rthc_key);
TRACE("== thread 0x%" PRIxPTR ", rthc %p, pid %d, self-status 0x%08" PRIx64 TRACE("== thread 0x%" PRIxPTR ", rthc %p, pid %d, self-status 0x%08" PRIx64
", left %d", ", left %d",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), osal_getpid(), osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
rthc ? rthc_read(rthc) : ~UINT64_C(0), rthc ? rthc_read(rthc) : ~UINT64_C(0),
atomic_load32(&rthc_pending, mo_Relaxed)); atomic_load32(&rthc_pending, mo_Relaxed));
if (rthc) { if (rthc) {
@ -1437,20 +1445,20 @@ __cold void global_dtor(void) {
rthc_compare_and_clean(rthc, sign_registered)) { rthc_compare_and_clean(rthc, sign_registered)) {
TRACE("== thread 0x%" PRIxPTR TRACE("== thread 0x%" PRIxPTR
", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")", ", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), osal_getpid(), osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
"registered", state); "registered", state);
} else if (state == sign_counted && } else if (state == sign_counted &&
rthc_compare_and_clean(rthc, sign_counted)) { rthc_compare_and_clean(rthc, sign_counted)) {
TRACE("== thread 0x%" PRIxPTR TRACE("== thread 0x%" PRIxPTR
", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")", ", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), osal_getpid(), osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
"counted", state); "counted", state);
ENSURE(nullptr, atomic_sub32(&rthc_pending, 1) > 0); ENSURE(nullptr, atomic_sub32(&rthc_pending, 1) > 0);
} else { } else {
WARNING("thread 0x%" PRIxPTR WARNING("thread 0x%" PRIxPTR
", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")", ", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
osal_getpid(), "wrong", state); "wrong", state);
} }
} }
@ -1467,7 +1475,7 @@ __cold void global_dtor(void) {
for (unsigned left; for (unsigned left;
(left = atomic_load32(&rthc_pending, mo_AcquireRelease)) > 0;) { (left = atomic_load32(&rthc_pending, mo_AcquireRelease)) > 0;) {
NOTICE("tls-cleanup: pid %d, pending %u, wait for...", osal_getpid(), left); NOTICE("tls-cleanup: pid %d, pending %u, wait for...", self_pid, left);
const int rc = pthread_cond_timedwait(&rthc_cond, &rthc_mutex, &abstime); const int rc = pthread_cond_timedwait(&rthc_cond, &rthc_mutex, &abstime);
if (rc && rc != EINTR) if (rc && rc != EINTR)
break; break;
@ -1475,23 +1483,31 @@ __cold void global_dtor(void) {
thread_key_delete(rthc_key); thread_key_delete(rthc_key);
#endif #endif
const uint32_t self_pid = osal_getpid();
for (size_t i = 0; i < rthc_count; ++i) { for (size_t i = 0; i < rthc_count; ++i) {
const osal_thread_key_t key = rthc_table[i].thr_tls_key; MDBX_env *const env = rthc_table[i].env;
thread_key_delete(key); if (env->me_pid != self_pid)
for (MDBX_reader *rthc = rthc_table[i].begin; rthc < rthc_table[i].end; continue;
++rthc) { if (!(env->me_flags & MDBX_ENV_TXKEY))
continue;
MDBX_reader *const begin = &env->me_lck_mmap.lck->mti_readers[0];
MDBX_reader *const end =
&env->me_lck_mmap.lck->mti_readers[env->me_maxreaders];
thread_key_delete(env->me_txkey);
bool cleaned = false;
for (MDBX_reader *reader = begin; reader < end; ++reader) {
TRACE("== [%zi] = key %" PRIuPTR ", %p ... %p, rthc %p (%+i), " TRACE("== [%zi] = key %" PRIuPTR ", %p ... %p, rthc %p (%+i), "
"rthc-pid %i, current-pid %i", "rthc-pid %i, current-pid %i",
i, (uintptr_t)key, __Wpedantic_format_voidptr(rthc_table[i].begin), i, (uintptr_t)env->me_txkey, __Wpedantic_format_voidptr(begin),
__Wpedantic_format_voidptr(rthc_table[i].end), __Wpedantic_format_voidptr(end), __Wpedantic_format_voidptr(reader),
__Wpedantic_format_voidptr(rthc), (int)(rthc - rthc_table[i].begin), (int)(reader - begin), reader->mr_pid.weak, self_pid);
rthc->mr_pid.weak, self_pid); if (atomic_load32(&reader->mr_pid, mo_Relaxed) == self_pid) {
if (atomic_load32(&rthc->mr_pid, mo_Relaxed) == self_pid) { (void)atomic_cas32(&reader->mr_pid, self_pid, 0);
atomic_store32(&rthc->mr_pid, 0, mo_AcquireRelease); TRACE("== cleanup %p", __Wpedantic_format_voidptr(reader));
TRACE("== cleanup %p", __Wpedantic_format_voidptr(rthc)); cleaned = true;
} }
} }
if (cleaned)
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true, mo_Relaxed);
} }
rthc_limit = rthc_count = 0; rthc_limit = rthc_count = 0;
@ -1501,7 +1517,6 @@ __cold void global_dtor(void) {
rthc_unlock(); rthc_unlock();
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
DeleteCriticalSection(&lcklist_critical_section);
DeleteCriticalSection(&rthc_critical_section); DeleteCriticalSection(&rthc_critical_section);
#else #else
/* LY: yielding a few timeslices to give a more chance /* LY: yielding a few timeslices to give a more chance
@ -1510,24 +1525,26 @@ __cold void global_dtor(void) {
#endif #endif
osal_dtor(); osal_dtor();
TRACE("<< pid %d\n", osal_getpid()); TRACE("<< pid %d\n", self_pid);
} }
__cold int rthc_alloc(osal_thread_key_t *pkey, MDBX_reader *begin, __cold int rthc_register(MDBX_env *const env) {
MDBX_reader *end) { TRACE(">> env %p, rthc_count %u, rthc_limit %u",
assert(pkey != NULL); __Wpedantic_format_voidptr(env), rthc_count, rthc_limit);
#ifndef NDEBUG
*pkey = (osal_thread_key_t)0xBADBADBAD;
#endif /* NDEBUG */
rthc_lock(); int rc = MDBX_SUCCESS;
TRACE(">> rthc_count %u, rthc_limit %u", rthc_count, rthc_limit); for (size_t i = 0; i < rthc_count; ++i)
int rc; if (unlikely(rthc_table[i].env == env)) {
if (rthc_count == rthc_limit) { rc = MDBX_PANIC;
goto bailout;
}
env->me_txkey = 0;
if (unlikely(rthc_count == rthc_limit)) {
rthc_entry_t *new_table = rthc_entry_t *new_table =
osal_realloc((rthc_table == rthc_table_static) ? nullptr : rthc_table, osal_realloc((rthc_table == rthc_table_static) ? nullptr : rthc_table,
sizeof(rthc_entry_t) * rthc_limit * 2); sizeof(rthc_entry_t) * rthc_limit * 2);
if (new_table == nullptr) { if (unlikely(new_table == nullptr)) {
rc = MDBX_ENOMEM; rc = MDBX_ENOMEM;
goto bailout; goto bailout;
} }
@ -1537,84 +1554,92 @@ __cold int rthc_alloc(osal_thread_key_t *pkey, MDBX_reader *begin,
rthc_limit *= 2; rthc_limit *= 2;
} }
rc = thread_key_create(&rthc_table[rthc_count].thr_tls_key); if ((env->me_flags & MDBX_NOTLS) == 0) {
if (rc != MDBX_SUCCESS) rc = thread_key_create(&env->me_txkey);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
env->me_flags |= MDBX_ENV_TXKEY;
}
*pkey = rthc_table[rthc_count].thr_tls_key; rthc_table[rthc_count].env = env;
TRACE("== [%i] = key %" PRIuPTR ", %p ... %p", rthc_count, (uintptr_t)*pkey, TRACE("== [%i] = env %p, key %" PRIuPTR, rthc_count,
__Wpedantic_format_voidptr(begin), __Wpedantic_format_voidptr(end)); __Wpedantic_format_voidptr(env), (uintptr_t)env->me_txkey);
rthc_table[rthc_count].begin = begin;
rthc_table[rthc_count].end = end;
++rthc_count; ++rthc_count;
TRACE("<< key %" PRIuPTR ", rthc_count %u, rthc_limit %u", (uintptr_t)*pkey,
rthc_count, rthc_limit);
rthc_unlock();
return MDBX_SUCCESS;
bailout: bailout:
rthc_unlock(); TRACE("<< env %p, key %" PRIuPTR ", rthc_count %u, rthc_limit %u, rc %d",
__Wpedantic_format_voidptr(env), (uintptr_t)env->me_txkey, rthc_count,
rthc_limit, rc);
return rc; return rc;
} }
__cold static int rthc_drown(MDBX_env *const env) {
const uint32_t self_pid = osal_getpid();
int rc = MDBX_SUCCESS;
MDBX_env *inprocess_neighbor = nullptr;
if (likely(env->me_lck_mmap.lck && self_pid == env->me_pid)) {
MDBX_reader *const begin = &env->me_lck_mmap.lck->mti_readers[0];
MDBX_reader *const end =
&env->me_lck_mmap.lck->mti_readers[env->me_maxreaders];
TRACE("== %s env %p pid %d, readers %p ...%p, current-pid %d",
(self_pid == env->me_pid) ? "cleanup" : "skip",
__Wpedantic_format_voidptr(env), env->me_pid,
__Wpedantic_format_voidptr(begin), __Wpedantic_format_voidptr(end),
self_pid);
bool cleaned = false;
for (MDBX_reader *r = begin; r < end; ++r) {
if (atomic_load32(&r->mr_pid, mo_Relaxed) == self_pid) {
atomic_store32(&r->mr_pid, 0, mo_AcquireRelease);
TRACE("== cleanup %p", __Wpedantic_format_voidptr(r));
cleaned = true;
}
}
if (cleaned)
atomic_store32(&env->me_lck_mmap.lck->mti_readers_refresh_flag, true,
mo_Relaxed);
rc = rthc_uniq_check(&env->me_lck_mmap, &inprocess_neighbor);
if (!inprocess_neighbor && env->me_live_reader &&
env->me_lfd != INVALID_HANDLE_VALUE) {
int err = osal_rpid_clear(env);
rc = rc ? rc : err;
}
}
int err = osal_lck_destroy(env, inprocess_neighbor);
env->me_pid = 0;
return rc ? rc : err;
}
__cold void rthc_remove(const osal_thread_key_t key) { __cold static int rthc_remove(MDBX_env *const env) {
thread_key_delete(key); TRACE(">>> env %p, key %zu, rthc_count %u, rthc_limit %u",
rthc_lock(); __Wpedantic_format_voidptr(env), (uintptr_t)env->me_txkey, rthc_count,
TRACE(">> key %zu, rthc_count %u, rthc_limit %u", (uintptr_t)key, rthc_count,
rthc_limit); rthc_limit);
for (size_t i = 0; i < rthc_count; ++i) { int rc = MDBX_SUCCESS;
if (key == rthc_table[i].thr_tls_key) { if (likely(env->me_pid))
const uint32_t self_pid = osal_getpid(); rc = rthc_drown(env);
TRACE("== [%zi], %p ...%p, current-pid %d", i,
__Wpedantic_format_voidptr(rthc_table[i].begin),
__Wpedantic_format_voidptr(rthc_table[i].end), self_pid);
for (MDBX_reader *rthc = rthc_table[i].begin; rthc < rthc_table[i].end; for (size_t i = 0; i < rthc_count; ++i) {
++rthc) { if (rthc_table[i].env == env) {
if (atomic_load32(&rthc->mr_pid, mo_Relaxed) == self_pid) {
atomic_store32(&rthc->mr_pid, 0, mo_AcquireRelease);
TRACE("== cleanup %p", __Wpedantic_format_voidptr(rthc));
}
}
if (--rthc_count > 0) if (--rthc_count > 0)
rthc_table[i] = rthc_table[rthc_count]; rthc_table[i] = rthc_table[rthc_count];
else if (rthc_table != rthc_table_static) { else if (rthc_table != rthc_table_static) {
osal_free(rthc_table); void *tmp = rthc_table;
rthc_table = rthc_table_static; rthc_table = rthc_table_static;
rthc_limit = RTHC_INITIAL_LIMIT; rthc_limit = RTHC_INITIAL_LIMIT;
osal_memory_barrier();
osal_free(tmp);
} }
break; break;
} }
} }
TRACE("<< key %zu, rthc_count %u, rthc_limit %u", (size_t)key, rthc_count, TRACE("<<< %p, key %zu, rthc_count %u, rthc_limit %u",
__Wpedantic_format_voidptr(env), (uintptr_t)env->me_txkey, rthc_count,
rthc_limit); rthc_limit);
rthc_unlock(); return rc;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
#define RTHC_ENVLIST_END ((MDBX_env *)((uintptr_t)50459))
static MDBX_env *inprocess_lcklist_head = RTHC_ENVLIST_END;
static __inline void lcklist_lock(void) {
#if defined(_WIN32) || defined(_WIN64)
EnterCriticalSection(&lcklist_critical_section);
#else
ENSURE(nullptr, osal_pthread_mutex_lock(&lcklist_mutex) == 0);
#endif
}
static __inline void lcklist_unlock(void) {
#if defined(_WIN32) || defined(_WIN64)
LeaveCriticalSection(&lcklist_critical_section);
#else
ENSURE(nullptr, pthread_mutex_unlock(&lcklist_mutex) == 0);
#endif
}
MDBX_NOTHROW_CONST_FUNCTION static uint64_t rrxmrrxmsx_0(uint64_t v) { MDBX_NOTHROW_CONST_FUNCTION static uint64_t rrxmrrxmsx_0(uint64_t v) {
/* Pelle Evensen's mixer, https://bit.ly/2HOfynt */ /* Pelle Evensen's mixer, https://bit.ly/2HOfynt */
v ^= (v << 39 | v >> 25) ^ (v << 14 | v >> 50); v ^= (v << 39 | v >> 25) ^ (v << 14 | v >> 50);
@ -1667,13 +1692,16 @@ static int uniq_poke(const osal_mmap_t *pending, osal_mmap_t *scan,
return uniq_peek(pending, scan); return uniq_peek(pending, scan);
} }
__cold static int uniq_check(const osal_mmap_t *pending, MDBX_env **found) { __cold static int rthc_uniq_check(const osal_mmap_t *pending,
MDBX_env **found) {
*found = nullptr; *found = nullptr;
uint64_t salt = 0; uint64_t salt = 0;
for (MDBX_env *scan = inprocess_lcklist_head; scan != RTHC_ENVLIST_END; for (size_t i = 0; i < rthc_count; ++i) {
scan = scan->me_lcklist_next) { MDBX_env *const scan = rthc_table[i].env;
MDBX_lockinfo *const scan_lck = scan->me_lck_mmap.lck; if (!scan->me_lck_mmap.lck || &scan->me_lck_mmap == pending)
int err = atomic_load64(&scan_lck->mti_bait_uniqueness, mo_AcquireRelease) continue;
int err = atomic_load64(&scan->me_lck_mmap.lck->mti_bait_uniqueness,
mo_AcquireRelease)
? uniq_peek(pending, &scan->me_lck_mmap) ? uniq_peek(pending, &scan->me_lck_mmap)
: uniq_poke(pending, &scan->me_lck_mmap, &salt); : uniq_poke(pending, &scan->me_lck_mmap, &salt);
if (err == MDBX_ENODATA) { if (err == MDBX_ENODATA) {
@ -1681,8 +1709,8 @@ __cold static int uniq_check(const osal_mmap_t *pending, MDBX_env **found) {
if (likely(osal_filesize(pending->fd, &length) == MDBX_SUCCESS && if (likely(osal_filesize(pending->fd, &length) == MDBX_SUCCESS &&
length == 0)) { length == 0)) {
/* LY: skip checking since LCK-file is empty, i.e. just created. */ /* LY: skip checking since LCK-file is empty, i.e. just created. */
DEBUG("uniq-probe: %s", "unique (new/empty lck)"); DEBUG("%s", "unique (new/empty lck)");
return MDBX_RESULT_TRUE; return MDBX_SUCCESS;
} }
} }
if (err == MDBX_RESULT_TRUE) if (err == MDBX_RESULT_TRUE)
@ -1695,44 +1723,17 @@ __cold static int uniq_check(const osal_mmap_t *pending, MDBX_env **found) {
if (err == MDBX_RESULT_TRUE) { if (err == MDBX_RESULT_TRUE) {
err = uniq_poke(pending, &scan->me_lck_mmap, &salt); err = uniq_poke(pending, &scan->me_lck_mmap, &salt);
*found = scan; *found = scan;
DEBUG("uniq-probe: found %p", __Wpedantic_format_voidptr(*found)); DEBUG("found %p", __Wpedantic_format_voidptr(*found));
return MDBX_RESULT_FALSE; return MDBX_SUCCESS;
} }
if (unlikely(err != MDBX_SUCCESS)) { if (unlikely(err != MDBX_SUCCESS)) {
DEBUG("uniq-probe: failed rc %d", err); DEBUG("failed rc %d", err);
return err; return err;
} }
} }
DEBUG("uniq-probe: %s", "unique"); DEBUG("%s", "unique");
return MDBX_RESULT_TRUE; return MDBX_SUCCESS;
}
static int lcklist_detach_locked(MDBX_env *env) {
MDBX_env *inprocess_neighbor = nullptr;
int rc = MDBX_SUCCESS;
if (env->me_lcklist_next != nullptr) {
ENSURE(env, env->me_lcklist_next != nullptr);
ENSURE(env, inprocess_lcklist_head != RTHC_ENVLIST_END);
for (MDBX_env **ptr = &inprocess_lcklist_head; *ptr != RTHC_ENVLIST_END;
ptr = &(*ptr)->me_lcklist_next) {
if (*ptr == env) {
*ptr = env->me_lcklist_next;
env->me_lcklist_next = nullptr;
break;
}
}
ENSURE(env, env->me_lcklist_next == nullptr);
}
rc = likely(osal_getpid() == env->me_pid)
? uniq_check(&env->me_lck_mmap, &inprocess_neighbor)
: MDBX_PANIC;
if (!inprocess_neighbor && env->me_live_reader)
(void)osal_rpid_clear(env);
if (!MDBX_IS_ERROR(rc))
rc = osal_lck_destroy(env, inprocess_neighbor);
return rc;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
@ -14567,59 +14568,28 @@ __cold static int setup_lck(MDBX_env *env, mdbx_mode_t mode) {
} }
/* LY: without-lck mode (e.g. exclusive or on read-only filesystem) */ /* LY: without-lck mode (e.g. exclusive or on read-only filesystem) */
/* beginning of a locked section ---------------------------------------- */
lcklist_lock();
eASSERT(env, env->me_lcklist_next == nullptr);
env->me_lfd = INVALID_HANDLE_VALUE; env->me_lfd = INVALID_HANDLE_VALUE;
const int rc = osal_lck_seize(env);
if (MDBX_IS_ERROR(rc)) {
/* Calling lcklist_detach_locked() is required to restore POSIX-filelock
* and this job will be done by env_close(). */
lcklist_unlock();
return rc;
}
/* insert into inprocess lck-list */
env->me_lcklist_next = inprocess_lcklist_head;
inprocess_lcklist_head = env;
lcklist_unlock();
/* end of a locked section ---------------------------------------------- */
env->me_lck = lckless_stub(env);
env->me_maxreaders = UINT_MAX;
DEBUG("lck-setup:%s%s%s", " lck-less",
(env->me_flags & MDBX_RDONLY) ? " readonly" : "",
(rc == MDBX_RESULT_TRUE) ? " exclusive" : " cooperative");
return rc;
} }
/* beginning of a locked section ------------------------------------------ */ /* beginning of a locked section ------------------------------------------ */
lcklist_lock(); rthc_lock();
eASSERT(env, env->me_lcklist_next == nullptr); err = rthc_register(env);
if (likely(err == MDBX_SUCCESS))
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it. */
err = osal_lck_seize(env); err = osal_lck_seize(env);
if (MDBX_IS_ERROR(err)) {
bailout:
/* Calling lcklist_detach_locked() is required to restore POSIX-filelock
* and this job will be done by env_close(). */
lcklist_unlock();
return err;
}
MDBX_env *inprocess_neighbor = nullptr; const int lck_seize_rc = err;
if (err == MDBX_RESULT_TRUE) {
err = uniq_check(&env->me_lck_mmap, &inprocess_neighbor);
if (MDBX_IS_ERROR(err)) if (MDBX_IS_ERROR(err))
goto bailout; goto bailout;
if (inprocess_neighbor &&
((runtime_flags & MDBX_DBG_LEGACY_MULTIOPEN) == 0 || struct MDBX_lockinfo *lck = nullptr;
(inprocess_neighbor->me_flags & MDBX_EXCLUSIVE) != 0)) { if (env->me_lfd == INVALID_HANDLE_VALUE) {
err = MDBX_BUSY; lck = lckless_stub(env);
goto bailout; env->me_maxreaders = UINT_MAX;
DEBUG("lck-setup:%s%s%s", " lck-less",
(env->me_flags & MDBX_RDONLY) ? " readonly" : "",
(lck_seize_rc == MDBX_RESULT_TRUE) ? " exclusive" : " cooperative");
goto done;
} }
}
const int lck_seize_rc = err;
DEBUG("lck-setup:%s%s%s", " with-lck", DEBUG("lck-setup:%s%s%s", " with-lck",
(env->me_flags & MDBX_RDONLY) ? " readonly" : "", (env->me_flags & MDBX_RDONLY) ? " readonly" : "",
@ -14688,9 +14658,10 @@ __cold static int setup_lck(MDBX_env *env, mdbx_mode_t mode) {
#endif /* MADV_WILLNEED */ #endif /* MADV_WILLNEED */
#endif /* MDBX_ENABLE_MADVISE */ #endif /* MDBX_ENABLE_MADVISE */
struct MDBX_lockinfo *const lck = env->me_lck_mmap.lck; lck = env->me_lck_mmap.lck;
if (lck_seize_rc == MDBX_RESULT_TRUE) { if (lck_seize_rc == MDBX_RESULT_TRUE) {
/* LY: exclusive mode, check and reset lck content */ /* If we succeed got exclusive lock, then nobody is using the lock region
* and we should initialize it. */
memset(lck, 0, (size_t)size); memset(lck, 0, (size_t)size);
jitter4testing(false); jitter4testing(false);
lck->mti_magic_and_version = MDBX_LOCK_MAGIC; lck->mti_magic_and_version = MDBX_LOCK_MAGIC;
@ -14724,19 +14695,32 @@ __cold static int setup_lck(MDBX_env *env, mdbx_mode_t mode) {
} }
} }
MDBX_env *inprocess_neighbor = nullptr;
if (lck_seize_rc == MDBX_RESULT_TRUE) {
err = rthc_uniq_check(&env->me_lck_mmap, &inprocess_neighbor);
if (MDBX_IS_ERROR(err))
goto bailout;
if (inprocess_neighbor &&
((runtime_flags & MDBX_DBG_LEGACY_MULTIOPEN) == 0 ||
(inprocess_neighbor->me_flags & MDBX_EXCLUSIVE) != 0)) {
err = MDBX_BUSY;
goto bailout;
}
}
err = osal_lck_init(env, inprocess_neighbor, lck_seize_rc); err = osal_lck_init(env, inprocess_neighbor, lck_seize_rc);
if (MDBX_IS_ERROR(err)) if (MDBX_IS_ERROR(err))
goto bailout; goto bailout;
ENSURE(env, env->me_lcklist_next == nullptr); done:
/* insert into inprocess lck-list */
env->me_lcklist_next = inprocess_lcklist_head;
inprocess_lcklist_head = env;
lcklist_unlock();
/* end of a locked section ------------------------------------------------ */
eASSERT(env, !MDBX_IS_ERROR(lck_seize_rc));
env->me_lck = lck; env->me_lck = lck;
eASSERT(env, !MDBX_IS_ERROR(lck_seize_rc));
bailout:
/* Calling osal_lck_destroy() is required to restore POSIX-filelock
* and this job will be done by env_close(). */
rthc_unlock();
/* end of a locked section ------------------------------------------------ */
return lck_seize_rc; return lck_seize_rc;
} }
@ -15603,14 +15587,6 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
if (MDBX_IS_ERROR(rc)) if (MDBX_IS_ERROR(rc))
goto bailout; goto bailout;
} }
if ((env->me_flags & MDBX_NOTLS) == 0) {
rc = rthc_alloc(&env->me_txkey, &lck->mti_readers[0],
&lck->mti_readers[env->me_maxreaders]);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
env->me_flags |= MDBX_ENV_TXKEY;
}
} }
if ((flags & MDBX_RDONLY) == 0) { if ((flags & MDBX_RDONLY) == 0) {
@ -15704,17 +15680,19 @@ bailout:
/* Destroy resources from mdbx_env_open(), clear our readers & DBIs */ /* Destroy resources from mdbx_env_open(), clear our readers & DBIs */
__cold static int env_close(MDBX_env *env) { __cold static int env_close(MDBX_env *env) {
const unsigned flags = env->me_flags; const unsigned flags = env->me_flags;
if (!(flags & MDBX_ENV_ACTIVE)) {
ENSURE(env, env->me_lcklist_next == nullptr);
return MDBX_SUCCESS;
}
env->me_flags &= ~ENV_INTERNAL_FLAGS; env->me_flags &= ~ENV_INTERNAL_FLAGS;
if (flags & MDBX_ENV_TXKEY) { if (flags & MDBX_ENV_TXKEY) {
rthc_remove(env->me_txkey); thread_key_delete(env->me_txkey);
env->me_txkey = (osal_thread_key_t)0; env->me_txkey = 0;
} }
if (env->me_lck)
munlock_all(env);
rthc_lock();
int rc = rthc_remove(env);
rthc_unlock();
#if MDBX_ENABLE_DBI_LOCKFREE #if MDBX_ENABLE_DBI_LOCKFREE
for (struct mdbx_defer_free_item *next, *ptr = env->me_defer_free; ptr; for (struct mdbx_defer_free_item *next, *ptr = env->me_defer_free; ptr;
ptr = next) { ptr = next) {
@ -15723,14 +15701,9 @@ __cold static int env_close(MDBX_env *env) {
} }
#endif /* MDBX_ENABLE_DBI_LOCKFREE */ #endif /* MDBX_ENABLE_DBI_LOCKFREE */
munlock_all(env);
if (!(env->me_flags & MDBX_RDONLY)) if (!(env->me_flags & MDBX_RDONLY))
osal_ioring_destroy(&env->me_ioring); osal_ioring_destroy(&env->me_ioring);
lcklist_lock();
const int rc = lcklist_detach_locked(env);
lcklist_unlock();
env->me_lck = nullptr; env->me_lck = nullptr;
if (env->me_lck_mmap.lck) if (env->me_lck_mmap.lck)
osal_munmap(&env->me_lck_mmap); osal_munmap(&env->me_lck_mmap);
@ -15882,8 +15855,6 @@ __cold int mdbx_env_close_ex(MDBX_env *env, bool dont_sync) {
osal_free(ptr); osal_free(ptr);
} }
VALGRIND_DESTROY_MEMPOOL(env); VALGRIND_DESTROY_MEMPOOL(env);
ENSURE(env, env->me_lcklist_next == nullptr);
env->me_pid = 0;
osal_free(env); osal_free(env);
return rc; return rc;
@ -25943,7 +25914,6 @@ __cold void global_ctor(void) {
rthc_table = rthc_table_static; rthc_table = rthc_table_static;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
InitializeCriticalSection(&rthc_critical_section); InitializeCriticalSection(&rthc_critical_section);
InitializeCriticalSection(&lcklist_critical_section);
#else #else
ENSURE(nullptr, pthread_key_create(&rthc_key, thread_dtor) == 0); ENSURE(nullptr, pthread_key_create(&rthc_key, thread_dtor) == 0);
TRACE("pid %d, &mdbx_rthc_key = %p, value 0x%x", osal_getpid(), TRACE("pid %d, &mdbx_rthc_key = %p, value 0x%x", osal_getpid(),

View File

@ -1462,7 +1462,6 @@ struct MDBX_env {
bool me_incore; bool me_incore;
bool me_prefault_write; bool me_prefault_write;
MDBX_env *me_lcklist_next;
#if MDBX_ENABLE_DBI_LOCKFREE #if MDBX_ENABLE_DBI_LOCKFREE
struct mdbx_defer_free_item *me_defer_free; struct mdbx_defer_free_item *me_defer_free;
#endif /* MDBX_ENABLE_DBI_LOCKFREE */ #endif /* MDBX_ENABLE_DBI_LOCKFREE */
@ -1560,10 +1559,6 @@ osal_flush_incoherent_mmap(const void *addr, size_t nbytes,
MDBX_INTERNAL_FUNC int cleanup_dead_readers(MDBX_env *env, int rlocked, MDBX_INTERNAL_FUNC int cleanup_dead_readers(MDBX_env *env, int rlocked,
int *dead); int *dead);
MDBX_INTERNAL_FUNC int rthc_alloc(osal_thread_key_t *key, MDBX_reader *begin,
MDBX_reader *end);
MDBX_INTERNAL_FUNC void rthc_remove(const osal_thread_key_t key);
MDBX_INTERNAL_FUNC void global_ctor(void); MDBX_INTERNAL_FUNC void global_ctor(void);
MDBX_INTERNAL_FUNC void osal_ctor(void); MDBX_INTERNAL_FUNC void osal_ctor(void);
MDBX_INTERNAL_FUNC void global_dtor(void); MDBX_INTERNAL_FUNC void global_dtor(void);