mdbx: add LCK-tracking to resolve double-open issue with POSIX-filelocks.

Change-Id: I29377000e4dde3c43527302b55d0daec58b709f5
This commit is contained in:
Leonid Yuriev 2019-08-31 00:55:15 +03:00
parent a66cefb198
commit 113b29e68d
6 changed files with 523 additions and 182 deletions

View File

@ -457,6 +457,12 @@ typedef struct MDBX_lockinfo {
* Zero means timed auto-sync is disabled. */
volatile uint64_t mti_autosync_period;
/* Marker to distinguish uniqueness of DB/CLK.*/
volatile uint64_t mti_bait_uniqueness;
/* /proc/sys/kernel/random/boot_id */
volatile uint64_t mti_boot_id;
alignas(MDBX_CACHELINE_SIZE) /* cacheline ---------------------------------*/
#ifdef MDBX_OSAL_LOCK
/* Mutex protecting write-txn. */
@ -842,6 +848,7 @@ struct MDBX_env {
#ifdef USE_VALGRIND
int me_valgrind_handle;
#endif
MDBX_env *me_lcklist_next;
struct {
size_t lower; /* minimal size of datafile */

View File

@ -211,7 +211,10 @@ int mdbx_rpid_check(MDBX_env *env, mdbx_pid_t pid) {
static int mdbx_mutex_failed(MDBX_env *env, pthread_mutex_t *mutex,
const int rc);
int __cold mdbx_lck_init(MDBX_env *env) {
int __cold mdbx_lck_init(MDBX_env *env, int global_uniqueness_flag) {
if (global_uniqueness_flag == MDBX_RESULT_FALSE)
return MDBX_SUCCESS;
pthread_mutexattr_t ma;
int rc = pthread_mutexattr_init(&ma);
if (rc)
@ -254,22 +257,75 @@ bailout:
return rc;
}
void __cold mdbx_lck_destroy(MDBX_env *env) {
if (env->me_lfd != INVALID_HANDLE_VALUE) {
/* try get exclusive access */
if (env->me_lck && mdbx_lck_exclusive(env->me_lfd, false) == 0) {
int __cold mdbx_lck_destroy(MDBX_env *env, MDBX_env *inprocess_neighbor) {
if (env->me_lfd != INVALID_HANDLE_VALUE && !inprocess_neighbor &&
env->me_lck &&
/* try get exclusive access */ mdbx_lck_exclusive(env->me_lfd, false) ==
0) {
mdbx_info("%s: got exclusive, drown mutexes", mdbx_func_);
int rc = pthread_mutex_destroy(&env->me_lck->mti_rmutex);
if (rc == 0)
rc = pthread_mutex_destroy(&env->me_lck->mti_wmutex);
assert(rc == 0);
(void)rc;
msync(env->me_lck, env->me_os_psize, MS_ASYNC);
/* file locks would be released (by kernel)
* while the me_lfd will be closed */
}
if (op_setlk == F_SETLK) {
/* File locks would be released (by kernel) while the file-descriptors
* will be closed. But to avoid false-positive EDEADLK from the kernel,
* locks should be released here explicitly with properly order. */
/* POSIX's fcntl() locks should be restored after file was closed.
* FIXME: This code should be rethinked and retested, since it will
* executed in really rare cases.
*
* On the other hand, seems more reasonable to disallow multi-open feature
* by default, and describe it as "use at your own risk". Currently
* multi-open required only for libfpta's unit-tests. */
int rc = MDBX_SUCCESS;
/* close clk and restore locks */
if (env->me_lfd != INVALID_HANDLE_VALUE) {
(void)close(env->me_lfd);
env->me_lfd = INVALID_HANDLE_VALUE;
if (inprocess_neighbor) {
/* restore file-locks */
if (rc == MDBX_SUCCESS)
rc = mdbx_lck_op(inprocess_neighbor->me_lfd, F_SETLKW, F_RDLCK, 0, 1);
if (rc == MDBX_SUCCESS)
rc = mdbx_rpid_set(inprocess_neighbor);
}
}
/* close dxb and restore lock */
if (env->me_fd != INVALID_HANDLE_VALUE) {
(void)close(env->me_fd);
env->me_fd = INVALID_HANDLE_VALUE;
if (inprocess_neighbor && rc == MDBX_SUCCESS) {
/* restore file-lock */
rc = mdbx_lck_op(
inprocess_neighbor->me_fd, F_SETLKW,
(inprocess_neighbor->me_flags & MDBX_RDONLY) ? F_RDLCK : F_WRLCK,
(inprocess_neighbor->me_lfd == INVALID_HANDLE_VALUE)
? 0
: inprocess_neighbor->me_pid,
(inprocess_neighbor->me_lfd == INVALID_HANDLE_VALUE) ? OFF_T_MAX
: 1);
}
}
if (inprocess_neighbor && rc != MDBX_SUCCESS) {
inprocess_neighbor->me_flags |= MDBX_FATAL_ERROR;
return rc;
}
}
return MDBX_SUCCESS;
}
static int mdbx_robust_lock(MDBX_env *env, pthread_mutex_t *mutex) {
int rc = pthread_mutex_lock(mutex);
if (unlikely(rc != 0))

View File

@ -229,7 +229,10 @@ bailout:
static int mdbx_mutex_failed(MDBX_env *env, pthread_mutex_t *mutex,
const int rc);
int __cold mdbx_lck_init(MDBX_env *env) {
int __cold mdbx_lck_init(MDBX_env *env, int global_uniqueness_flag) {
if (global_uniqueness_flag == MDBX_RESULT_FALSE)
return MDBX_SUCCESS;
pthread_mutexattr_t ma;
int rc = pthread_mutexattr_init(&ma);
if (rc)
@ -267,13 +270,14 @@ bailout:
return rc;
}
void __cold mdbx_lck_destroy(MDBX_env *env) {
int __cold mdbx_lck_destroy(MDBX_env *env, MDBX_env *inprocess_neighbor) {
/* File locks would be released (by kernel) while the file-descriptors
* will be closed. But to avoid false-positive EDEADLK from the kernel,
* locks should be released here explicitly with properly order. */
if (env->me_lfd != INVALID_HANDLE_VALUE) {
if (env->me_lfd != INVALID_HANDLE_VALUE && !inprocess_neighbor &&
env->me_lck &&
/* try get exclusive access */
if (env->me_lck &&
mdbx_lck_op(env->me_fd, OP_SETLK,
(env->me_flags & MDBX_RDONLY) ? F_RDLCK : F_WRLCK, 0,
OFF_T_MAX) == 0 &&
@ -286,10 +290,51 @@ void __cold mdbx_lck_destroy(MDBX_env *env) {
(void)rc;
msync(env->me_lck, env->me_os_psize, MS_ASYNC);
}
(void)mdbx_lck_op(env->me_lfd, OP_SETLK, F_UNLCK, 0, OFF_T_MAX);
/* POSIX's fcntl() locks should be restored after file was closed.
* FIXME: This code should be rethinked and retested, since it will executed
* in really rare cases. For instance, this code could wait a lot, if other
* process get exclusive access immediately after the close().
*
* On the other hand, seems more reasonable to disallow multi-open feature
* by default, and describe it as "use at your own risk". Currently
* multi-open required only for libfpta's unit-tests. */
int rc = MDBX_SUCCESS;
/* close clk and restore locks */
if (env->me_lfd != INVALID_HANDLE_VALUE) {
(void)close(env->me_lfd);
env->me_lfd = INVALID_HANDLE_VALUE;
if (inprocess_neighbor) {
/* restore file-locks */
if (rc == MDBX_SUCCESS)
rc = mdbx_lck_op(inprocess_neighbor->me_lfd, OP_SETLKW, F_RDLCK, 0, 1);
if (rc == MDBX_SUCCESS)
rc = mdbx_rpid_set(inprocess_neighbor);
}
if (env->me_fd != INVALID_HANDLE_VALUE)
(void)mdbx_lck_op(env->me_fd, OP_SETLK, F_UNLCK, 0, OFF_T_MAX);
}
/* close dxb and restore lock */
if (env->me_fd != INVALID_HANDLE_VALUE) {
(void)close(env->me_fd);
env->me_fd = INVALID_HANDLE_VALUE;
if (inprocess_neighbor && rc == MDBX_SUCCESS) {
/* restore file-lock */
rc = mdbx_lck_op(
inprocess_neighbor->me_fd, OP_SETLKW,
(inprocess_neighbor->me_flags & MDBX_RDONLY) ? F_RDLCK : F_WRLCK,
(inprocess_neighbor->me_lfd == INVALID_HANDLE_VALUE)
? 0
: inprocess_neighbor->me_pid,
(inprocess_neighbor->me_lfd == INVALID_HANDLE_VALUE) ? OFF_T_MAX : 1);
}
}
if (inprocess_neighbor && rc != MDBX_SUCCESS) {
inprocess_neighbor->me_flags |= MDBX_FATAL_ERROR;
return rc;
}
return MDBX_SUCCESS;
}
static int mdbx_robust_lock(MDBX_env *env, pthread_mutex_t *mutex) {

View File

@ -352,8 +352,61 @@ int mdbx_resume_threads_after_remap(mdbx_handle_array_t *array) {
E-E = exclusive-write
*/
int mdbx_lck_init(MDBX_env *env) {
static void lck_unlock(MDBX_env *env) {
int rc;
if (env->me_lfd != INVALID_HANDLE_VALUE) {
/* double `unlock` for robustly remove overlapped shared/exclusive locks */
while (funlock(env->me_lfd, LCK_LOWER))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
while (funlock(env->me_lfd, LCK_UPPER))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
}
if (env->me_fd != INVALID_HANDLE_VALUE) {
/* explicitly unlock to avoid latency for other processes (windows kernel
* releases such locks via deferred queues) */
while (funlock(env->me_fd, LCK_BODY))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
while (funlock(env->me_fd, LCK_META))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
while (funlock(env->me_fd, LCK_WHOLE))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
}
}
int mdbx_lck_init(MDBX_env *env, int global_uniqueness_flag) {
(void)env;
(void)global_uniqueness_flag;
return MDBX_SUCCESS;
}
int mdbx_lck_destroy(MDBX_env *env, MDBX_env *inprocess_neighbor) {
(void)inprocess_neighbor;
lck_unlock(env);
return MDBX_SUCCESS;
}
@ -443,7 +496,7 @@ int mdbx_lck_seize(MDBX_env *env) {
mdbx_error("%s(%s) failed: errcode %u", mdbx_func_,
"lock-against-without-lck", rc);
mdbx_jitter4testing(false);
mdbx_lck_destroy(env);
lck_unlock(env);
} else {
mdbx_jitter4testing(false);
if (!funlock(env->me_fd, LCK_BODY))
@ -494,52 +547,6 @@ int mdbx_lck_downgrade(MDBX_env *env, bool complete) {
return MDBX_SUCCESS /* 7) now at S-? (used), done */;
}
void mdbx_lck_destroy(MDBX_env *env) {
int rc;
if (env->me_lfd != INVALID_HANDLE_VALUE) {
/* double `unlock` for robustly remove overlapped shared/exclusive locks */
while (funlock(env->me_lfd, LCK_LOWER))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
while (funlock(env->me_lfd, LCK_UPPER))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
}
if (env->me_fd != INVALID_HANDLE_VALUE) {
/* explicitly unlock to avoid latency for other processes (windows kernel
* releases such locks via deferred queues) */
while (funlock(env->me_fd, LCK_BODY))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
while (funlock(env->me_fd, LCK_META))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
while (funlock(env->me_fd, LCK_WHOLE))
;
rc = GetLastError();
assert(rc == ERROR_NOT_LOCKED);
(void)rc;
SetLastError(ERROR_SUCCESS);
}
}
/*----------------------------------------------------------------------------*/
/* reader checking (by pid) */
@ -574,7 +581,7 @@ int mdbx_rpid_check(MDBX_env *env, mdbx_pid_t pid) {
switch (rc) {
case ERROR_INVALID_PARAMETER:
/* pid seem invalid */
/* pid seems invalid */
return MDBX_RESULT_FALSE;
case WAIT_OBJECT_0:
/* process just exited */

View File

@ -278,7 +278,8 @@ size_t __hot mdbx_e2k_strnlen_bug_workaround(const char *s, size_t maxlen) {
typedef struct rthc_entry_t {
MDBX_reader *begin;
MDBX_reader *end;
mdbx_thread_key_t key;
mdbx_thread_key_t thr_tls_key;
bool key_valid;
} rthc_entry_t;
#if MDBX_DEBUG
@ -289,6 +290,7 @@ typedef struct rthc_entry_t {
#if defined(_WIN32) || defined(_WIN64)
static CRITICAL_SECTION rthc_critical_section;
static CRITICAL_SECTION lcklist_critical_section;
#else
int __cxa_thread_atexit_impl(void (*dtor)(void *), void *obj, void *dso_symbol)
__attribute__((__weak__));
@ -302,12 +304,13 @@ int __cxa_thread_atexit_impl(void (*dtor)(void *), void *obj,
}
#endif /* __APPLE__ */
static pthread_mutex_t mdbx_rthc_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t mdbx_rthc_cond = PTHREAD_COND_INITIALIZER;
static mdbx_thread_key_t mdbx_rthc_key;
static volatile uint32_t mdbx_rthc_pending;
static pthread_mutex_t lcklist_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t rthc_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t rthc_cond = PTHREAD_COND_INITIALIZER;
static mdbx_thread_key_t rthc_key;
static volatile uint32_t rthc_pending;
static void __cold mdbx_workaround_glibc_bug21031(void) {
static void __cold workaround_glibc_bug21031(void) {
/* Workaround for https://sourceware.org/bugzilla/show_bug.cgi?id=21031
*
* Due race between pthread_key_delete() and __nptl_deallocate_tsd()
@ -327,23 +330,23 @@ static unsigned rthc_count, rthc_limit;
static rthc_entry_t *rthc_table;
static rthc_entry_t rthc_table_static[RTHC_INITIAL_LIMIT];
static __cold void mdbx_rthc_lock(void) {
static __inline void rthc_lock(void) {
#if defined(_WIN32) || defined(_WIN64)
EnterCriticalSection(&rthc_critical_section);
#else
mdbx_ensure(nullptr, pthread_mutex_lock(&mdbx_rthc_mutex) == 0);
mdbx_ensure(nullptr, pthread_mutex_lock(&rthc_mutex) == 0);
#endif
}
static __cold void mdbx_rthc_unlock(void) {
static __inline void rthc_unlock(void) {
#if defined(_WIN32) || defined(_WIN64)
LeaveCriticalSection(&rthc_critical_section);
#else
mdbx_ensure(nullptr, pthread_mutex_unlock(&mdbx_rthc_mutex) == 0);
mdbx_ensure(nullptr, pthread_mutex_unlock(&rthc_mutex) == 0);
#endif
}
static __inline int mdbx_thread_key_create(mdbx_thread_key_t *key) {
static __inline int thread_key_create(mdbx_thread_key_t *key) {
int rc;
#if defined(_WIN32) || defined(_WIN64)
*key = TlsAlloc();
@ -355,17 +358,17 @@ static __inline int mdbx_thread_key_create(mdbx_thread_key_t *key) {
return rc;
}
static __inline void mdbx_thread_key_delete(mdbx_thread_key_t key) {
static __inline void thread_key_delete(mdbx_thread_key_t key) {
mdbx_trace("key = 0x%x", (unsigned)key);
#if defined(_WIN32) || defined(_WIN64)
mdbx_ensure(nullptr, TlsFree(key));
#else
mdbx_ensure(nullptr, pthread_key_delete(key) == 0);
mdbx_workaround_glibc_bug21031();
workaround_glibc_bug21031();
#endif
}
static __inline void *mdbx_thread_rthc_get(mdbx_thread_key_t key) {
static __inline void *thread_rthc_get(mdbx_thread_key_t key) {
#if defined(_WIN32) || defined(_WIN64)
return TlsGetValue(key);
#else
@ -373,7 +376,7 @@ static __inline void *mdbx_thread_rthc_get(mdbx_thread_key_t key) {
#endif
}
static void mdbx_thread_rthc_set(mdbx_thread_key_t key, const void *value) {
static void thread_rthc_set(mdbx_thread_key_t key, const void *value) {
#if defined(_WIN32) || defined(_WIN64)
mdbx_ensure(nullptr, TlsSetValue(key, (void *)value));
#else
@ -389,12 +392,12 @@ static void mdbx_thread_rthc_set(mdbx_thread_key_t key, const void *value) {
&thread_registration_state,
(void *)&mdbx_version /* dso_anchor */)) {
mdbx_ensure(nullptr, pthread_setspecific(
mdbx_rthc_key, &thread_registration_state) == 0);
rthc_key, &thread_registration_state) == 0);
thread_registration_state = MDBX_THREAD_RTHC_COUNTED;
const unsigned count_before = mdbx_atomic_add32(&mdbx_rthc_pending, 1);
const unsigned count_before = mdbx_atomic_add32(&rthc_pending, 1);
mdbx_ensure(nullptr, count_before < INT_MAX);
mdbx_trace("fallback to pthreads' tsd, key 0x%x, count %u",
(unsigned)mdbx_rthc_key, count_before);
(unsigned)rthc_key, count_before);
(void)count_before;
}
}
@ -407,24 +410,27 @@ __cold void mdbx_rthc_global_init(void) {
rthc_table = rthc_table_static;
#if defined(_WIN32) || defined(_WIN64)
InitializeCriticalSection(&rthc_critical_section);
InitializeCriticalSection(&lcklist_critical_section);
#else
mdbx_ensure(nullptr,
pthread_key_create(&mdbx_rthc_key, mdbx_rthc_thread_dtor) == 0);
pthread_key_create(&rthc_key, mdbx_rthc_thread_dtor) == 0);
mdbx_trace("pid %d, &mdbx_rthc_key = %p, value 0x%x", mdbx_getpid(),
&mdbx_rthc_key, (unsigned)mdbx_rthc_key);
&rthc_key, (unsigned)rthc_key);
#endif
}
/* dtor called for thread, i.e. for all mdbx's environment objects */
__cold void mdbx_rthc_thread_dtor(void *ptr) {
mdbx_rthc_lock();
rthc_lock();
mdbx_trace(">> pid %d, thread 0x%" PRIxPTR ", rthc %p", mdbx_getpid(),
(uintptr_t)mdbx_thread_self(), ptr);
const mdbx_pid_t self_pid = mdbx_getpid();
for (unsigned i = 0; i < rthc_count; ++i) {
const mdbx_thread_key_t key = rthc_table[i].key;
MDBX_reader *const rthc = mdbx_thread_rthc_get(key);
if (!rthc_table[i].key_valid)
continue;
const mdbx_thread_key_t key = rthc_table[i].thr_tls_key;
MDBX_reader *const rthc = thread_rthc_get(key);
if (rthc < rthc_table[i].begin || rthc >= rthc_table[i].end)
continue;
#if !defined(_WIN32) && !defined(_WIN64)
@ -452,7 +458,7 @@ __cold void mdbx_rthc_thread_dtor(void *ptr) {
#if defined(_WIN32) || defined(_WIN64)
mdbx_trace("<< thread 0x%" PRIxPTR ", rthc %p", (uintptr_t)mdbx_thread_self(),
ptr);
mdbx_rthc_unlock();
rthc_unlock();
#else
const char self_registration = *(char *)ptr;
*(char *)ptr = MDBX_THREAD_RTHC_ZERO;
@ -460,12 +466,12 @@ __cold void mdbx_rthc_thread_dtor(void *ptr) {
(uintptr_t)mdbx_thread_self(), ptr, mdbx_getpid(),
self_registration);
if (self_registration == MDBX_THREAD_RTHC_COUNTED)
mdbx_ensure(nullptr, mdbx_atomic_sub32(&mdbx_rthc_pending, 1) > 0);
mdbx_ensure(nullptr, mdbx_atomic_sub32(&rthc_pending, 1) > 0);
if (mdbx_rthc_pending == 0) {
if (rthc_pending == 0) {
mdbx_trace("== thread 0x%" PRIxPTR ", rthc %p, pid %d, wake",
(uintptr_t)mdbx_thread_self(), ptr, mdbx_getpid());
mdbx_ensure(nullptr, pthread_cond_broadcast(&mdbx_rthc_cond) == 0);
mdbx_ensure(nullptr, pthread_cond_broadcast(&rthc_cond) == 0);
}
mdbx_trace("<< thread 0x%" PRIxPTR ", rthc %p", (uintptr_t)mdbx_thread_self(),
@ -474,7 +480,7 @@ __cold void mdbx_rthc_thread_dtor(void *ptr) {
* instead of a call for pthread_mutex_unlock() and therefore CPU could not
* return to current DSO's code section, which may be unloaded immediately
* after the mutex got released. */
pthread_mutex_unlock(&mdbx_rthc_mutex);
pthread_mutex_unlock(&rthc_mutex);
#endif
}
@ -485,9 +491,9 @@ __cold void mdbx_rthc_global_dtor(void) {
mdbx_getpid(), &mdbx_rthc_global_dtor, &mdbx_rthc_thread_dtor,
&mdbx_rthc_remove);
mdbx_rthc_lock();
rthc_lock();
#if !defined(_WIN32) && !defined(_WIN64)
char *rthc = (char *)pthread_getspecific(mdbx_rthc_key);
char *rthc = (char *)pthread_getspecific(rthc_key);
mdbx_trace("== thread 0x%" PRIxPTR ", rthc %p, pid %d, self-status %d",
(uintptr_t)mdbx_thread_self(), rthc, mdbx_getpid(),
rthc ? *rthc : -1);
@ -495,7 +501,7 @@ __cold void mdbx_rthc_global_dtor(void) {
const char self_registration = *(char *)rthc;
*rthc = MDBX_THREAD_RTHC_ZERO;
if (self_registration == MDBX_THREAD_RTHC_COUNTED)
mdbx_ensure(nullptr, mdbx_atomic_sub32(&mdbx_rthc_pending, 1) > 0);
mdbx_ensure(nullptr, mdbx_atomic_sub32(&rthc_pending, 1) > 0);
}
struct timespec abstime;
@ -509,20 +515,21 @@ __cold void mdbx_rthc_global_dtor(void) {
abstime.tv_sec += 600;
#endif
for (unsigned left; (left = mdbx_rthc_pending) > 0;) {
for (unsigned left; (left = rthc_pending) > 0;) {
mdbx_trace("pid %d, pending %u, wait for...", mdbx_getpid(), left);
const int rc =
pthread_cond_timedwait(&mdbx_rthc_cond, &mdbx_rthc_mutex, &abstime);
const int rc = pthread_cond_timedwait(&rthc_cond, &rthc_mutex, &abstime);
if (rc && rc != EINTR)
break;
}
mdbx_thread_key_delete(mdbx_rthc_key);
thread_key_delete(rthc_key);
#endif
const mdbx_pid_t self_pid = mdbx_getpid();
for (unsigned i = 0; i < rthc_count; ++i) {
const mdbx_thread_key_t key = rthc_table[i].key;
mdbx_thread_key_delete(key);
if (!rthc_table[i].key_valid)
continue;
const mdbx_thread_key_t key = rthc_table[i].thr_tls_key;
thread_key_delete(key);
for (MDBX_reader *rthc = rthc_table[i].begin; rthc < rthc_table[i].end;
++rthc) {
mdbx_trace("== [%i] = key %zu, %p ... %p, rthc %p (%+i), "
@ -540,14 +547,15 @@ __cold void mdbx_rthc_global_dtor(void) {
if (rthc_table != rthc_table_static)
mdbx_free(rthc_table);
rthc_table = nullptr;
mdbx_rthc_unlock();
rthc_unlock();
#if defined(_WIN32) || defined(_WIN64)
DeleteCriticalSection(&lcklist_critical_section);
DeleteCriticalSection(&rthc_critical_section);
#else
/* LY: yielding a few timeslices to give a more chance
* to racing destructor(s) for completion. */
mdbx_workaround_glibc_bug21031();
workaround_glibc_bug21031();
#endif
mdbx_trace("<< pid %d\n", mdbx_getpid());
@ -555,15 +563,19 @@ __cold void mdbx_rthc_global_dtor(void) {
__cold int mdbx_rthc_alloc(mdbx_thread_key_t *key, MDBX_reader *begin,
MDBX_reader *end) {
int rc;
if (key) {
#ifndef NDEBUG
*key = (mdbx_thread_key_t)0xBADBADBAD;
#endif /* NDEBUG */
int rc = mdbx_thread_key_create(key);
rc = thread_key_create(key);
if (rc != MDBX_SUCCESS)
return rc;
}
mdbx_rthc_lock();
mdbx_trace(">> key %zu, rthc_count %u, rthc_limit %u", (size_t)*key,
rthc_lock();
const mdbx_thread_key_t new_key = key ? *key : 0;
mdbx_trace(">> key %zu, rthc_count %u, rthc_limit %u", (size_t)new_key,
rthc_count, rthc_limit);
if (rthc_count == rthc_limit) {
rthc_entry_t *new_table =
@ -578,31 +590,33 @@ __cold int mdbx_rthc_alloc(mdbx_thread_key_t *key, MDBX_reader *begin,
rthc_table = new_table;
rthc_limit *= 2;
}
mdbx_trace("== [%i] = key %zu, %p ... %p", rthc_count, (size_t)*key, begin,
mdbx_trace("== [%i] = key %zu, %p ... %p", rthc_count, (size_t)new_key, begin,
end);
rthc_table[rthc_count].key = *key;
rthc_table[rthc_count].key_valid = key ? true : false;
rthc_table[rthc_count].thr_tls_key = key ? new_key : 0;
rthc_table[rthc_count].begin = begin;
rthc_table[rthc_count].end = end;
++rthc_count;
mdbx_trace("<< key %zu, rthc_count %u, rthc_limit %u", (size_t)*key,
mdbx_trace("<< key %zu, rthc_count %u, rthc_limit %u", (size_t)new_key,
rthc_count, rthc_limit);
mdbx_rthc_unlock();
rthc_unlock();
return MDBX_SUCCESS;
bailout:
mdbx_thread_key_delete(*key);
mdbx_rthc_unlock();
if (key)
thread_key_delete(*key);
rthc_unlock();
return rc;
}
__cold void mdbx_rthc_remove(const mdbx_thread_key_t key) {
mdbx_thread_key_delete(key);
mdbx_rthc_lock();
thread_key_delete(key);
rthc_lock();
mdbx_trace(">> key %zu, rthc_count %u, rthc_limit %u", (size_t)key,
rthc_count, rthc_limit);
for (unsigned i = 0; i < rthc_count; ++i) {
if (key == rthc_table[i].key) {
if (rthc_table[i].key_valid && key == rthc_table[i].thr_tls_key) {
const mdbx_pid_t self_pid = mdbx_getpid();
mdbx_trace("== [%i], %p ...%p, current-pid %d", i, rthc_table[i].begin,
rthc_table[i].end, self_pid);
@ -627,7 +641,147 @@ __cold void mdbx_rthc_remove(const mdbx_thread_key_t key) {
mdbx_trace("<< key %zu, rthc_count %u, rthc_limit %u", (size_t)key,
rthc_count, rthc_limit);
mdbx_rthc_unlock();
rthc_unlock();
}
//------------------------------------------------------------------------------
#define RTHC_ENVLIST_END ((MDBX_env *)((size_t)50459))
static MDBX_env *inprocess_lcklist_head = RTHC_ENVLIST_END;
static __inline void lcklist_lock(void) {
#if defined(_WIN32) || defined(_WIN64)
EnterCriticalSection(&lcklist_critical_section);
#else
mdbx_ensure(nullptr, pthread_mutex_lock(&lcklist_mutex) == 0);
#endif
}
static __inline void lcklist_unlock(void) {
#if defined(_WIN32) || defined(_WIN64)
LeaveCriticalSection(&lcklist_critical_section);
#else
mdbx_ensure(nullptr, pthread_mutex_unlock(&lcklist_mutex) == 0);
#endif
}
static uint64_t rrxmrrxmsx_0(uint64_t v) {
/* Pelle Evensen's mixer, https://bit.ly/2HOfynt */
v ^= (v << 39 | v >> 25) ^ (v << 14 | v >> 50);
v *= UINT64_C(0xA24BAED4963EE407);
v ^= (v << 40 | v >> 24) ^ (v << 15 | v >> 49);
v *= UINT64_C(0x9FB21C651E98DF25);
return v ^ v >> 28;
}
static int uniq_poke(const mdbx_mmap_t *map, const uint64_t cadabra) {
int rc;
if (map->lck) {
map->lck->mti_bait_uniqueness = cadabra;
mdbx_flush_noncoherent_cpu_writeback();
rc = MDBX_SUCCESS;
} else {
rc = mdbx_pwrite(map->fd, &cadabra, sizeof(map->lck->mti_bait_uniqueness),
offsetof(MDBX_lockinfo, mti_bait_uniqueness));
}
mdbx_trace("uniq-poke: %s, cadabra 0x016%" PRIx64 ", rc %d",
map->lck ? "mem" : "file", cadabra, rc);
return rc;
}
static int uniq_peek(const mdbx_mmap_t *map, const uint64_t cadabra) {
int rc;
uint64_t bait;
if (map->lck) {
mdbx_invalidate_mmap_noncoherent_cache(map->lck, sizeof(*map->lck));
bait = map->lck->mti_bait_uniqueness;
rc = MDBX_SUCCESS;
} else {
rc = mdbx_pread(map->fd, &bait, sizeof(map->lck->mti_bait_uniqueness),
offsetof(MDBX_lockinfo, mti_bait_uniqueness));
}
if (unlikely(!MDBX_IS_ERROR(rc)))
rc = (bait == cadabra) ? MDBX_RESULT_TRUE : MDBX_RESULT_FALSE;
mdbx_trace("uniq-peek: %s, cadabra 0x%016" PRIx64 ", bait 0x%016" PRIx64
",%s rc %d",
map->lck ? "mem" : "file", cadabra, bait,
(rc == MDBX_RESULT_TRUE) ? " found," : (rc ? " FAILED," : ""), rc);
return rc;
}
__cold static int uniq_probe(const mdbx_mmap_t *map, const mdbx_pid_t pid,
MDBX_env **found) {
if (inprocess_lcklist_head == RTHC_ENVLIST_END) {
mdbx_info("<< uniq-probe: pid %u, env-list empty, skip probing, rc %d",
(unsigned)pid, MDBX_RESULT_TRUE);
return MDBX_RESULT_TRUE;
}
const mdbx_tid_t tid = mdbx_thread_self();
size_t uit = 0;
memcpy(&uit, &tid, (sizeof(tid) < sizeof(uit)) ? sizeof(tid) : sizeof(uit));
uint64_t abra =
rrxmrrxmsx_0(mdbx_osal_monotime() + UINT64_C(5873865991930747) * uit);
for (unsigned bits = 4; bits; bits >>= 1) {
abra = abra * UINT64_C(6364136223846793005) + 1;
const uint64_t cadabra =
rrxmrrxmsx_0(abra + UINT64_C(7680760450171793) * pid) << 20 |
abra >> 44;
int err = uniq_poke(map, cadabra);
*found = nullptr;
for (MDBX_env *env = inprocess_lcklist_head;
err == MDBX_SUCCESS && env != RTHC_ENVLIST_END;
env = env->me_lcklist_next) {
err = uniq_peek(&env->me_lck_mmap, cadabra);
if (err == MDBX_RESULT_TRUE)
*found = env;
}
if (unlikely(MDBX_IS_ERROR(err))) {
mdbx_verbose("<< uniq-probe: pid %u, uit %zu, failed rc %d",
(unsigned)pid, uit, err);
return err;
}
bits += 8 & err;
if (bits == 15) {
mdbx_info("<< uniq-probe: pid %u, uit %zu, found %p", (unsigned)pid, uit,
*found);
return MDBX_RESULT_FALSE;
}
}
mdbx_info("<< uniq-probe: pid %u, uit %zu, unique", (unsigned)pid, uit);
return MDBX_RESULT_TRUE;
}
static int lcklist_detach_locked(MDBX_env *env) {
MDBX_env *dup = nullptr;
int rc = MDBX_SUCCESS;
if (env->me_lcklist_next != nullptr) {
mdbx_ensure(env, env->me_lcklist_next != nullptr);
mdbx_ensure(env, inprocess_lcklist_head != RTHC_ENVLIST_END);
for (MDBX_env **ptr = &inprocess_lcklist_head; *ptr != RTHC_ENVLIST_END;
ptr = &(*ptr)->me_lcklist_next) {
if (*ptr == env) {
*ptr = env->me_lcklist_next;
env->me_lcklist_next = nullptr;
break;
}
}
mdbx_ensure(env, env->me_lcklist_next == nullptr);
}
rc = uniq_probe(&env->me_lck_mmap, env->me_pid, &dup);
if (!dup && env->me_live_reader)
(void)mdbx_rpid_clear(env);
if (!MDBX_IS_ERROR(rc))
rc = mdbx_lck_destroy(env, dup);
return rc;
}
/*----------------------------------------------------------------------------*/
@ -1204,7 +1358,7 @@ static int __must_check_result mdbx_read_header(MDBX_env *env, MDBX_meta *meta,
uint64_t *filesize);
static int __must_check_result mdbx_sync_locked(MDBX_env *env, unsigned flags,
MDBX_meta *const pending);
static void mdbx_env_close0(MDBX_env *env);
static int mdbx_env_close0(MDBX_env *env);
static MDBX_node *mdbx_node_search(MDBX_cursor *mc, MDBX_val *key, int *exactp);
@ -3212,7 +3366,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
MDBX_reader *r = txn->mt_ro_reader;
if (likely(env->me_flags & MDBX_ENV_TXKEY)) {
mdbx_assert(env, !(env->me_flags & MDBX_NOTLS));
r = mdbx_thread_rthc_get(env->me_txkey);
r = thread_rthc_get(env->me_txkey);
if (likely(r)) {
mdbx_assert(env, r->mr_pid == env->me_pid);
mdbx_assert(env, r->mr_tid == mdbx_thread_self());
@ -3287,7 +3441,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
if (likely(env->me_flags & MDBX_ENV_TXKEY)) {
mdbx_assert(env, env->me_live_reader == env->me_pid);
mdbx_thread_rthc_set(env->me_txkey, r);
thread_rthc_set(env->me_txkey, r);
}
}
@ -6537,10 +6691,22 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
return err;
/* LY: without-lck mode (e.g. exclusive or on read-only filesystem) */
/* beginning of a locked section ---------------------------------------- */
lcklist_lock();
mdbx_assert(env, env->me_lcklist_next == nullptr);
env->me_lfd = INVALID_HANDLE_VALUE;
const int rc = mdbx_lck_seize(env);
if (MDBX_IS_ERROR(rc))
if (MDBX_IS_ERROR(rc)) {
/* Calling lcklist_detach_locked() is required to restore POSIX-filelock
* and this job will be done by mdbx_env_close0(). */
lcklist_unlock();
return rc;
}
/* insert into inprocess lck-list */
env->me_lcklist_next = inprocess_lcklist_head;
inprocess_lcklist_head = env;
lcklist_unlock();
/* end of a locked section ---------------------------------------------- */
env->me_oldest = &env->me_lckless_stub.oldest;
env->me_unsynced_timeout = &env->me_lckless_stub.unsynced_timeout;
@ -6558,59 +6724,81 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
return rc;
}
/* beginning of a locked section ------------------------------------------ */
lcklist_lock();
mdbx_assert(env, env->me_lcklist_next == nullptr);
/* Try to get exclusive lock. If we succeed, then
* nobody is using the lock region and we should initialize it. */
const int rc = mdbx_lck_seize(env);
if (MDBX_IS_ERROR(rc))
return rc;
err = mdbx_lck_seize(env);
if (MDBX_IS_ERROR(err)) {
bailout:
/* Calling lcklist_detach_locked() is required to restore POSIX-filelock
* and this job will be done by mdbx_env_close0(). */
lcklist_unlock();
return err;
}
if (err == MDBX_RESULT_TRUE) {
MDBX_env *unused_lckdup_found;
err = uniq_probe(&env->me_lck_mmap, env->me_pid, &unused_lckdup_found);
if (MDBX_IS_ERROR(err))
goto bailout;
}
const int lck_seize_rc = err;
mdbx_debug("lck-setup:%s%s%s", " with-lck",
(env->me_flags & MDBX_RDONLY) ? " readonly" : "",
(rc == MDBX_RESULT_TRUE) ? " exclusive" : " cooperative");
(lck_seize_rc == MDBX_RESULT_TRUE) ? " exclusive"
: " cooperative");
uint64_t size;
err = mdbx_filesize(env->me_lfd, &size);
if (unlikely(err != MDBX_SUCCESS))
return err;
goto bailout;
if (rc == MDBX_RESULT_TRUE) {
if (lck_seize_rc == MDBX_RESULT_TRUE) {
uint64_t wanna = mdbx_roundup2(
(env->me_maxreaders - 1) * sizeof(MDBX_reader) + sizeof(MDBX_lockinfo),
env->me_os_psize);
#ifndef NDEBUG
err = mdbx_ftruncate(env->me_lfd, size = 0);
if (unlikely(err != MDBX_SUCCESS))
return err;
goto bailout;
#endif
mdbx_jitter4testing(false);
if (size != wanna) {
err = mdbx_ftruncate(env->me_lfd, wanna);
if (unlikely(err != MDBX_SUCCESS))
return err;
goto bailout;
size = wanna;
}
} else {
if (env->me_flags & MDBX_EXCLUSIVE)
return MDBX_BUSY;
if (env->me_flags & MDBX_EXCLUSIVE) {
err = MDBX_BUSY;
goto bailout;
}
if (size > PTRDIFF_MAX || (size & (env->me_os_psize - 1)) ||
size < env->me_os_psize) {
mdbx_notice("lck-file has invalid size %" PRIu64 " bytes", size);
return MDBX_PROBLEM;
err = MDBX_PROBLEM;
goto bailout;
}
}
const size_t maxreaders =
((size_t)size - sizeof(MDBX_lockinfo)) / sizeof(MDBX_reader) + 1;
if (maxreaders > UINT16_MAX) {
if (maxreaders < 2 || maxreaders > UINT16_MAX) {
mdbx_error("lck-size too big (up to %" PRIuPTR " readers)", maxreaders);
return MDBX_PROBLEM;
err = MDBX_PROBLEM;
goto bailout;
}
env->me_maxreaders = (unsigned)maxreaders;
err = mdbx_mmap(MDBX_WRITEMAP, &env->me_lck_mmap, (size_t)size, (size_t)size);
if (unlikely(err != MDBX_SUCCESS))
return err;
goto bailout;
#ifdef MADV_DODUMP
(void)madvise(env->me_lck, size, MADV_DODUMP);
@ -6618,43 +6806,52 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
#ifdef MADV_DONTFORK
if (madvise(env->me_lck, size, MADV_DONTFORK) < 0)
return errno;
goto bailout;
#endif
#ifdef MADV_WILLNEED
if (madvise(env->me_lck, size, MADV_WILLNEED) < 0)
return errno;
goto bailout;
#endif
#ifdef MADV_RANDOM
if (madvise(env->me_lck, size, MADV_RANDOM) < 0)
return errno;
goto bailout;
#endif
if (rc == MDBX_RESULT_TRUE) {
/* LY: exlcusive mode, init lck */
if (lck_seize_rc == MDBX_RESULT_TRUE) {
/* LY: exlcusive mode, reset lck */
memset(env->me_lck, 0, (size_t)size);
err = mdbx_lck_init(env);
if (err)
return err;
env->me_lck->mti_magic_and_version = MDBX_LOCK_MAGIC;
env->me_lck->mti_os_and_format = MDBX_LOCK_FORMAT;
} else {
if (env->me_lck->mti_magic_and_version != MDBX_LOCK_MAGIC) {
mdbx_error("lock region has invalid magic/version");
return ((env->me_lck->mti_magic_and_version >> 8) != MDBX_MAGIC)
err = ((env->me_lck->mti_magic_and_version >> 8) != MDBX_MAGIC)
? MDBX_INVALID
: MDBX_VERSION_MISMATCH;
goto bailout;
}
if (env->me_lck->mti_os_and_format != MDBX_LOCK_FORMAT) {
mdbx_error("lock region has os/format 0x%" PRIx32 ", expected 0x%" PRIx32,
env->me_lck->mti_os_and_format, MDBX_LOCK_FORMAT);
return MDBX_VERSION_MISMATCH;
err = MDBX_VERSION_MISMATCH;
goto bailout;
}
}
mdbx_assert(env, !MDBX_IS_ERROR(rc));
err = mdbx_lck_init(env, lck_seize_rc);
if (MDBX_IS_ERROR(err))
goto bailout;
mdbx_ensure(env, env->me_lcklist_next == nullptr);
/* insert into inprocess lck-list */
env->me_lcklist_next = inprocess_lcklist_head;
inprocess_lcklist_head = env;
lcklist_unlock();
/* end of a locked section ------------------------------------------------ */
mdbx_assert(env, !MDBX_IS_ERROR(lck_seize_rc));
env->me_oldest = &env->me_lck->mti_oldest_reader;
env->me_unsynced_timeout = &env->me_lck->mti_unsynced_timeout;
env->me_autosync_period = &env->me_lck->mti_autosync_period;
@ -6664,7 +6861,7 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
#ifdef MDBX_OSAL_LOCK
env->me_wmutex = &env->me_lck->mti_wmutex;
#endif
return rc;
return lck_seize_rc;
}
/* Only a subset of the mdbx_env flags can be changed
@ -6856,7 +7053,7 @@ int __cold mdbx_env_open(MDBX_env *env, const char *path, unsigned flags,
bailout:
if (rc) {
mdbx_env_close0(env);
rc = mdbx_env_close0(env) ? MDBX_PANIC : rc;
env->me_flags = saved_me_flags | MDBX_FATAL_ERROR;
}
mdbx_free(lck_pathname);
@ -6864,10 +7061,15 @@ bailout:
}
/* Destroy resources from mdbx_env_open(), clear our readers & DBIs */
static void __cold mdbx_env_close0(MDBX_env *env) {
if (!(env->me_flags & MDBX_ENV_ACTIVE))
return;
static int __cold mdbx_env_close0(MDBX_env *env) {
if (!(env->me_flags & MDBX_ENV_ACTIVE)) {
mdbx_ensure(env, env->me_lcklist_next == nullptr);
return MDBX_SUCCESS;
}
env->me_flags &= ~MDBX_ENV_ACTIVE;
lcklist_lock();
const int rc = lcklist_detach_locked(env);
lcklist_unlock();
/* Doing this here since me_dbxs may not exist during mdbx_env_close */
if (env->me_dbxs) {
@ -6889,8 +7091,6 @@ static void __cold mdbx_env_close0(MDBX_env *env) {
if (env->me_flags & MDBX_ENV_TXKEY)
mdbx_rthc_remove(env->me_txkey);
if (env->me_live_reader)
(void)mdbx_rpid_clear(env);
if (env->me_map) {
mdbx_munmap(&env->me_dxb_mmap);
@ -6912,12 +7112,12 @@ static void __cold mdbx_env_close0(MDBX_env *env) {
env->me_unsynced_pages = nullptr;
env->me_autosync_threshold = nullptr;
mdbx_lck_destroy(env);
if (env->me_lfd != INVALID_HANDLE_VALUE) {
(void)mdbx_closefile(env->me_lfd);
env->me_lfd = INVALID_HANDLE_VALUE;
}
env->me_flags = 0;
return rc;
}
int __cold mdbx_env_close_ex(MDBX_env *env, int dont_sync) {
@ -6959,7 +7159,7 @@ int __cold mdbx_env_close_ex(MDBX_env *env, int dont_sync) {
mdbx_free(dp);
}
mdbx_env_close0(env);
rc = mdbx_env_close0(env) ? MDBX_PANIC : rc;
mdbx_ensure(env, mdbx_fastmutex_destroy(&env->me_dbi_lock) == MDBX_SUCCESS);
#if defined(_WIN32) || defined(_WIN64)
/* me_remap_guard don't have destructor (Slim Reader/Writer Lock) */
@ -6974,6 +7174,7 @@ int __cold mdbx_env_close_ex(MDBX_env *env, int dont_sync) {
MDBX_SUCCESS);
#endif
mdbx_ensure(env, env->me_lcklist_next == nullptr);
env->me_pid = 0;
env->me_signature = 0;
mdbx_free(env);

View File

@ -633,14 +633,39 @@ uint64_t mdbx_osal_16dot16_to_monotime(uint32_t seconds_16dot16);
#define MDBX_OSAL_LOCK_SIGN UINT32_C(0x8017)
#endif /* MDBX_OSAL_LOCK */
/// \brief Инициализация объектов синхронизации внутри текущего процесса
/// связанных с экземпляром MDBX_env.
/// \brief Инициализация объектов синхронизации связанных с экземпляром MDBX_env
/// как общик в LCK-файле, так и внутри текущего процесса.
/// \param
/// global_uniqueness_flag = true - означает что сейчас в системе нет других
/// процессов работающих с БД и LCK-файлом. Соответственно функция ДОЛЖНА
/// инициализировать разделяемые объекты синхронизации расположенные
/// в отображенном в память LCK-файле.
/// global_uniqueness_flag = false - означает что в системе есть хотя-бы
/// один другой процесс уже работающий с БД и LCK-файлом, в том числе
/// БД уже может быть открыта текущим процессом. Соответственно функция
/// НЕ должна инициализировать уже используемые разделяемые объекты
/// синхронизации расположенные в отображенном в память LCK-файле.
/// \return Код ошибки или 0 в случае успеха.
int mdbx_lck_init(MDBX_env *env);
int mdbx_lck_init(MDBX_env *env, int global_uniqueness_flag);
/// \brief Отключение от общих межпроцесных объектов и разрушение объектов
/// синхронизации внутри текущего процесса связанных с экземпляром MDBX_env.
void mdbx_lck_destroy(MDBX_env *env);
/// \param
/// inprocess_neighbor = NULL - если в текущем процессе нет других экземпляров
/// MDBX_env связанных с закрываемой БД. Соответственно функция ДОЛЖНА
/// своими средствами проверить, есть ли другие процесса работающие с БД
/// и LCK-файлом, и в зависимости от этого разрушить или сохранить бъекты
/// синхронизации расположенные в отображенном в память LCK-файле.
/// inprocess_neighbor = не-NULL - тогда он указывает на другой (любой
/// в случае нескольких) экземпляр MDBX_env работающей с БД и LCK-файлом
/// внутри текущего процесса. Соответственно, функция НЕ должна пытаться
/// захватывать эксклюзивную блокировки и/или пытаться разрушить общие
/// объекты синхронизации связанные с БД и LCK-файлом. Кроме этого,
/// реализация ДОЛЖНА обеспечить корректность работы других экземпляров
/// MDBX_env внутри процесса - например, восстановить POSIX-fcntl блокировки
/// после закрытия файловых дескрипторов.
/// \return Код ошибки (MDBX_PANIC) или 0 в случае успеха.
int mdbx_lck_destroy(MDBX_env *env, MDBX_env *inprocess_neighbor);
/// \brief Подключение к общим межпроцесным объектам блокировки с попыткой
/// захвата блокировки максимального уровня (разделяемой при недоступности