mdbx: возможность вызова osal_lck_destroy() в дочернем процессе после fork().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2023-11-12 00:03:43 +03:00
parent 7ad54f54b4
commit a3e2300f58
4 changed files with 46 additions and 35 deletions

View File

@ -1346,13 +1346,13 @@ static void thread_rthc_set(osal_thread_key_t key, const void *value) {
/* dtor called for thread, i.e. for all mdbx's environment objects */
__cold void thread_dtor(void *rthc) {
rthc_lock();
const uint32_t self_pid = osal_getpid();
TRACE(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", self_pid,
const uint32_t current_pid = osal_getpid();
TRACE(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", current_pid,
osal_thread_self(), rthc);
for (size_t i = 0; i < rthc_count; ++i) {
MDBX_env *const env = rthc_table[i].env;
if (env->me_pid != self_pid)
if (env->me_pid != current_pid)
continue;
if (!(env->me_flags & MDBX_ENV_TXKEY))
continue;
@ -1376,11 +1376,11 @@ __cold void thread_dtor(void *rthc) {
"current-pid %i",
osal_thread_self(), __Wpedantic_format_voidptr(reader), i,
__Wpedantic_format_voidptr(begin), __Wpedantic_format_voidptr(end),
(int)(reader - begin), reader->mr_pid.weak, self_pid);
if (atomic_load32(&reader->mr_pid, mo_Relaxed) == self_pid) {
(int)(reader - begin), reader->mr_pid.weak, current_pid);
if (atomic_load32(&reader->mr_pid, mo_Relaxed) == current_pid) {
TRACE("==== thread 0x%" PRIxPTR ", rthc %p, cleanup", osal_thread_self(),
__Wpedantic_format_voidptr(reader));
(void)atomic_cas32(&reader->mr_pid, self_pid, 0);
(void)atomic_cas32(&reader->mr_pid, current_pid, 0);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true, mo_Relaxed);
}
}
@ -1426,15 +1426,15 @@ __cold void thread_dtor(void *rthc) {
MDBX_EXCLUDE_FOR_GPROF
__cold void global_dtor(void) {
const uint32_t self_pid = osal_getpid();
TRACE(">> pid %d", self_pid);
const uint32_t current_pid = osal_getpid();
TRACE(">> pid %d", current_pid);
rthc_lock();
#if !defined(_WIN32) && !defined(_WIN64)
uint64_t *rthc = pthread_getspecific(rthc_key);
TRACE("== thread 0x%" PRIxPTR ", rthc %p, pid %d, self-status 0x%08" PRIx64
", left %d",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
osal_thread_self(), __Wpedantic_format_voidptr(rthc), current_pid,
rthc ? rthc_read(rthc) : ~UINT64_C(0),
atomic_load32(&rthc_pending, mo_Relaxed));
if (rthc) {
@ -1445,19 +1445,19 @@ __cold void global_dtor(void) {
rthc_compare_and_clean(rthc, sign_registered)) {
TRACE("== thread 0x%" PRIxPTR
", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
osal_thread_self(), __Wpedantic_format_voidptr(rthc), current_pid,
"registered", state);
} else if (state == sign_counted &&
rthc_compare_and_clean(rthc, sign_counted)) {
TRACE("== thread 0x%" PRIxPTR
", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
osal_thread_self(), __Wpedantic_format_voidptr(rthc), current_pid,
"counted", state);
ENSURE(nullptr, atomic_sub32(&rthc_pending, 1) > 0);
} else {
WARNING("thread 0x%" PRIxPTR
", rthc %p, pid %d, self-status %s (0x%08" PRIx64 ")",
osal_thread_self(), __Wpedantic_format_voidptr(rthc), self_pid,
osal_thread_self(), __Wpedantic_format_voidptr(rthc), current_pid,
"wrong", state);
}
}
@ -1475,7 +1475,7 @@ __cold void global_dtor(void) {
for (unsigned left;
(left = atomic_load32(&rthc_pending, mo_AcquireRelease)) > 0;) {
NOTICE("tls-cleanup: pid %d, pending %u, wait for...", self_pid, left);
NOTICE("tls-cleanup: pid %d, pending %u, wait for...", current_pid, left);
const int rc = pthread_cond_timedwait(&rthc_cond, &rthc_mutex, &abstime);
if (rc && rc != EINTR)
break;
@ -1485,7 +1485,7 @@ __cold void global_dtor(void) {
for (size_t i = 0; i < rthc_count; ++i) {
MDBX_env *const env = rthc_table[i].env;
if (env->me_pid != self_pid)
if (env->me_pid != current_pid)
continue;
if (!(env->me_flags & MDBX_ENV_TXKEY))
continue;
@ -1499,9 +1499,9 @@ __cold void global_dtor(void) {
"rthc-pid %i, current-pid %i",
i, (uintptr_t)env->me_txkey, __Wpedantic_format_voidptr(begin),
__Wpedantic_format_voidptr(end), __Wpedantic_format_voidptr(reader),
(int)(reader - begin), reader->mr_pid.weak, self_pid);
if (atomic_load32(&reader->mr_pid, mo_Relaxed) == self_pid) {
(void)atomic_cas32(&reader->mr_pid, self_pid, 0);
(int)(reader - begin), reader->mr_pid.weak, current_pid);
if (atomic_load32(&reader->mr_pid, mo_Relaxed) == current_pid) {
(void)atomic_cas32(&reader->mr_pid, current_pid, 0);
TRACE("== cleanup %p", __Wpedantic_format_voidptr(reader));
cleaned = true;
}
@ -1525,7 +1525,7 @@ __cold void global_dtor(void) {
#endif
osal_dtor();
TRACE("<< pid %d\n", self_pid);
TRACE("<< pid %d\n", current_pid);
}
__cold int rthc_register(MDBX_env *const env) {
@ -1573,21 +1573,21 @@ bailout:
return rc;
}
__cold static int rthc_drown(MDBX_env *const env) {
const uint32_t self_pid = osal_getpid();
const uint32_t current_pid = osal_getpid();
int rc = MDBX_SUCCESS;
MDBX_env *inprocess_neighbor = nullptr;
if (likely(env->me_lck_mmap.lck && self_pid == env->me_pid)) {
if (likely(env->me_lck_mmap.lck && current_pid == env->me_pid)) {
MDBX_reader *const begin = &env->me_lck_mmap.lck->mti_readers[0];
MDBX_reader *const end =
&env->me_lck_mmap.lck->mti_readers[env->me_maxreaders];
TRACE("== %s env %p pid %d, readers %p ...%p, current-pid %d",
(self_pid == env->me_pid) ? "cleanup" : "skip",
(current_pid == env->me_pid) ? "cleanup" : "skip",
__Wpedantic_format_voidptr(env), env->me_pid,
__Wpedantic_format_voidptr(begin), __Wpedantic_format_voidptr(end),
self_pid);
current_pid);
bool cleaned = false;
for (MDBX_reader *r = begin; r < end; ++r) {
if (atomic_load32(&r->mr_pid, mo_Relaxed) == self_pid) {
if (atomic_load32(&r->mr_pid, mo_Relaxed) == current_pid) {
atomic_store32(&r->mr_pid, 0, mo_AcquireRelease);
TRACE("== cleanup %p", __Wpedantic_format_voidptr(r));
cleaned = true;
@ -1603,7 +1603,7 @@ __cold static int rthc_drown(MDBX_env *const env) {
rc = rc ? rc : err;
}
}
int err = osal_lck_destroy(env, inprocess_neighbor);
int err = osal_lck_destroy(env, inprocess_neighbor, current_pid);
env->me_pid = 0;
return rc ? rc : err;
}
@ -15844,7 +15844,8 @@ __cold int mdbx_env_close_ex(MDBX_env *env, bool dont_sync) {
#if MDBX_LOCKING > MDBX_LOCKING_SYSV
MDBX_lockinfo *const stub = lckless_stub(env);
ENSURE(env, osal_ipclock_destroy(&stub->mti_wlock) == 0);
/* может вернуть ошибку в дочернем процессе после fork() */
osal_ipclock_destroy(&stub->mti_wlock);
#endif /* MDBX_LOCKING */
while ((dp = env->me_dp_reserve) != NULL) {

View File

@ -556,14 +556,13 @@ MDBX_INTERNAL_FUNC int osal_lck_upgrade(MDBX_env *env, bool dont_wait) {
}
__cold MDBX_INTERNAL_FUNC int osal_lck_destroy(MDBX_env *env,
MDBX_env *inprocess_neighbor) {
if (unlikely(osal_getpid() != env->me_pid))
return MDBX_PANIC;
MDBX_env *inprocess_neighbor,
const uint32_t current_pid) {
eASSERT(env, osal_getpid() == current_pid);
int rc = MDBX_SUCCESS;
struct stat lck_info;
MDBX_lockinfo *lck = env->me_lck_mmap.lck;
if (env->me_lfd != INVALID_HANDLE_VALUE && !inprocess_neighbor && lck &&
MDBX_lockinfo *lck = env->me_lck;
if (lck && lck == env->me_lck_mmap.lck && !inprocess_neighbor &&
/* try get exclusive access */
lck_op(env->me_lfd, op_setlk, F_WRLCK, 0, OFF_T_MAX) == 0 &&
/* if LCK was not removed */
@ -572,7 +571,8 @@ __cold MDBX_INTERNAL_FUNC int osal_lck_destroy(MDBX_env *env,
(env->me_flags & MDBX_RDONLY) ? F_RDLCK : F_WRLCK, 0,
OFF_T_MAX) == 0) {
VERBOSE("%p got exclusive, drown locks", (void *)env);
VERBOSE("%p got exclusive, drown ipc-locks", (void *)env);
eASSERT(env, current_pid == env->me_pid);
#if MDBX_LOCKING == MDBX_LOCKING_SYSV
if (env->me_sysv_ipc.semid != -1)
rc = semctl(env->me_sysv_ipc.semid, 2, IPC_RMID) ? errno : 0;
@ -586,13 +586,20 @@ __cold MDBX_INTERNAL_FUNC int osal_lck_destroy(MDBX_env *env,
if (rc == 0) {
const bool synced = lck->mti_unsynced_pages.weak == 0;
osal_munmap(&env->me_lck_mmap);
if (synced)
if (synced && env->me_lfd != INVALID_HANDLE_VALUE)
rc = ftruncate(env->me_lfd, 0) ? errno : 0;
}
jitter4testing(false);
}
if (current_pid != env->me_pid) {
eASSERT(env, !inprocess_neighbor);
NOTICE("drown env %p after-fork pid %d -> %d",
__Wpedantic_format_voidptr(env), env->me_pid, current_pid);
inprocess_neighbor = nullptr;
}
/* 1) POSIX's fcntl() locks (i.e. when op_setlk == F_SETLK) should be restored
* after file was closed.
*

View File

@ -682,7 +682,9 @@ MDBX_INTERNAL_FUNC int osal_lck_init(MDBX_env *env,
}
MDBX_INTERNAL_FUNC int osal_lck_destroy(MDBX_env *env,
MDBX_env *inprocess_neighbor) {
MDBX_env *inprocess_neighbor,
const uint32_t current_pid) {
(void)current_pid;
/* LY: should unmap before releasing the locks to avoid race condition and
* STATUS_USER_MAPPED_FILE/ERROR_USER_MAPPED_FILE */
if (env->me_map)

View File

@ -690,7 +690,8 @@ MDBX_INTERNAL_FUNC int osal_lck_init(MDBX_env *env,
/// restore POSIX-fcntl locks after the closing of file descriptors.
/// \return Error code (MDBX_PANIC) or zero on success.
MDBX_INTERNAL_FUNC int osal_lck_destroy(MDBX_env *env,
MDBX_env *inprocess_neighbor);
MDBX_env *inprocess_neighbor,
const uint32_t current_pid);
/// \brief Connects to shared interprocess locking objects and tries to acquire
/// the maximum lock level (shared if exclusive is not available)