From 51a016245aa53fa00fdc76390870b6542d201ca9 Mon Sep 17 00:00:00 2001 From: Leonid Yuriev Date: Thu, 21 May 2020 22:15:30 +0300 Subject: [PATCH] mdbx: using couple of condvars/events to avoid glitches on Windows. Change-Id: I3256a8dcbb95c78e8dea3eb31ca73f42c58d2f61 --- src/core.c | 22 +++++----- src/osal.c | 122 +++++++++++++++++++++++++---------------------------- src/osal.h | 21 ++++----- 3 files changed, 79 insertions(+), 86 deletions(-) diff --git a/src/core.c b/src/core.c index 87a4b7e0..a3a599b2 100644 --- a/src/core.c +++ b/src/core.c @@ -15356,7 +15356,7 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data, typedef struct mdbx_copy { MDBX_env *mc_env; MDBX_txn *mc_txn; - mdbx_condmutex_t mc_condmutex; + mdbx_condpair_t mc_condpair; uint8_t *mc_wbuf[2]; uint8_t *mc_over[2]; size_t mc_wlen[2]; @@ -15376,10 +15376,10 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) { uint8_t *ptr; int toggle = 0; - mdbx_condmutex_lock(&my->mc_condmutex); + mdbx_condpair_lock(&my->mc_condpair); while (!my->mc_error) { while (!my->mc_new && !my->mc_error) { - int err = mdbx_condmutex_wait(&my->mc_condmutex); + int err = mdbx_condpair_wait(&my->mc_condpair, true); if (err != MDBX_SUCCESS) { my->mc_error = err; goto bailout; @@ -15409,10 +15409,10 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) { toggle ^= 1; /* Return the empty buffer to provider */ my->mc_new--; - mdbx_condmutex_signal(&my->mc_condmutex); + mdbx_condpair_signal(&my->mc_condpair, false); } bailout: - mdbx_condmutex_unlock(&my->mc_condmutex); + mdbx_condpair_unlock(&my->mc_condpair); return (THREAD_RESULT)0; } @@ -15421,15 +15421,15 @@ bailout: * [in] my control structure. * [in] adjust (1 to hand off 1 buffer) | (MDBX_EOF when ending). */ static int __cold mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) { - mdbx_condmutex_lock(&my->mc_condmutex); + mdbx_condpair_lock(&my->mc_condpair); my->mc_new += (short)adjust; - mdbx_condmutex_signal(&my->mc_condmutex); + mdbx_condpair_signal(&my->mc_condpair, true); while (!my->mc_error && (my->mc_new & 2) /* both buffers in use */) { - int err = mdbx_condmutex_wait(&my->mc_condmutex); + int err = mdbx_condpair_wait(&my->mc_condpair, false); if (err != MDBX_SUCCESS) my->mc_error = err; } - mdbx_condmutex_unlock(&my->mc_condmutex); + mdbx_condpair_unlock(&my->mc_condpair); my->mc_toggle ^= (adjust & 1); /* Both threads reset mc_wlen, to be safe from threading errors */ @@ -15694,7 +15694,7 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn, mdbx_copy ctx; memset(&ctx, 0, sizeof(ctx)); - rc = mdbx_condmutex_init(&ctx.mc_condmutex); + rc = mdbx_condpair_init(&ctx.mc_condpair); if (unlikely(rc != MDBX_SUCCESS)) return rc; @@ -15717,7 +15717,7 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn, rc = mdbx_env_cwalk(&ctx, &root, 0); mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF); thread_err = mdbx_thread_join(thread); - mdbx_condmutex_destroy(&ctx.mc_condmutex); + mdbx_condpair_destroy(&ctx.mc_condpair); } if (unlikely(thread_err != MDBX_SUCCESS)) return thread_err; diff --git a/src/osal.c b/src/osal.c index f6a0641b..b790029e 100644 --- a/src/osal.c +++ b/src/osal.c @@ -362,106 +362,98 @@ char *mdbx_strdup(const char *str) { /*----------------------------------------------------------------------------*/ -MDBX_INTERNAL_FUNC int mdbx_condmutex_init(mdbx_condmutex_t *condmutex) { +MDBX_INTERNAL_FUNC int mdbx_condpair_init(mdbx_condpair_t *condpair) { + int rc; + memset(condpair, 0, sizeof(mdbx_condpair_t)); #if defined(_WIN32) || defined(_WIN64) - int rc = MDBX_SUCCESS; - condmutex->event = NULL; - condmutex->mutex = CreateMutexW(NULL, FALSE, NULL); - if (!condmutex->mutex) - return GetLastError(); - - condmutex->event = CreateEventW(NULL, TRUE, FALSE, NULL); - if (!condmutex->event) { + if ((condpair->mutex = CreateMutexW(NULL, FALSE, NULL)) == NULL) { rc = GetLastError(); - (void)CloseHandle(condmutex->mutex); - condmutex->mutex = NULL; + goto bailout_mutex; } - return rc; + if ((condpair->event[0] = CreateEventW(NULL, FALSE, FALSE, NULL)) == NULL) { + rc = GetLastError(); + goto bailout_event; + } + if ((condpair->event[1] = CreateEventW(NULL, FALSE, FALSE, NULL)) != NULL) + return MDBX_SUCCESS; + + rc = GetLastError(); + (void)CloseHandle(condpair->event[0]); +bailout_event: + (void)CloseHandle(condpair->mutex); #else - memset(condmutex, 0, sizeof(mdbx_condmutex_t)); - int rc = pthread_mutex_init(&condmutex->mutex, NULL); - if (rc == 0) { - rc = pthread_cond_init(&condmutex->cond, NULL); - if (rc != 0) - (void)pthread_mutex_destroy(&condmutex->mutex); - } - return rc; + rc = pthread_mutex_init(&condpair->mutex, NULL); + if (unlikely(rc != 0)) + goto bailout_mutex; + rc = pthread_cond_init(&condpair->cond[0], NULL); + if (unlikely(rc != 0)) + goto bailout_cond; + rc = pthread_cond_init(&condpair->cond[1], NULL); + if (likely(rc == 0)) + return MDBX_SUCCESS; + + (void)pthread_cond_destroy(&condpair->cond[0]); +bailout_cond: + (void)pthread_mutex_destroy(&condpair->mutex); #endif +bailout_mutex: + memset(condpair, 0, sizeof(mdbx_condpair_t)); + return rc; } -static bool is_allzeros(const void *ptr, size_t bytes) { - const uint8_t *u8 = ptr; - for (size_t i = 0; i < bytes; ++i) - if (u8[i] != 0) - return false; - return true; -} - -MDBX_INTERNAL_FUNC int mdbx_condmutex_destroy(mdbx_condmutex_t *condmutex) { - int rc = MDBX_EINVAL; +MDBX_INTERNAL_FUNC int mdbx_condpair_destroy(mdbx_condpair_t *condpair) { #if defined(_WIN32) || defined(_WIN64) - if (condmutex->event) { - rc = CloseHandle(condmutex->event) ? MDBX_SUCCESS : GetLastError(); - if (rc == MDBX_SUCCESS) - condmutex->event = NULL; - } - if (condmutex->mutex) { - rc = CloseHandle(condmutex->mutex) ? MDBX_SUCCESS : GetLastError(); - if (rc == MDBX_SUCCESS) - condmutex->mutex = NULL; - } + int rc = CloseHandle(condpair->mutex) ? MDBX_SUCCESS : GetLastError(); + rc = CloseHandle(condpair->event[0]) ? rc : GetLastError(); + rc = CloseHandle(condpair->event[1]) ? rc : GetLastError(); #else - if (!is_allzeros(&condmutex->cond, sizeof(condmutex->cond))) { - rc = pthread_cond_destroy(&condmutex->cond); - if (rc == 0) - memset(&condmutex->cond, 0, sizeof(condmutex->cond)); - } - if (!is_allzeros(&condmutex->mutex, sizeof(condmutex->mutex))) { - rc = pthread_mutex_destroy(&condmutex->mutex); - if (rc == 0) - memset(&condmutex->mutex, 0, sizeof(condmutex->mutex)); - } + int err, rc = pthread_mutex_destroy(&condpair->mutex); + rc = (err = pthread_cond_destroy(&condpair->cond[0])) ? err : rc; + rc = (err = pthread_cond_destroy(&condpair->cond[1])) ? err : rc; #endif + memset(condpair, 0, sizeof(mdbx_condpair_t)); return rc; } -MDBX_INTERNAL_FUNC int mdbx_condmutex_lock(mdbx_condmutex_t *condmutex) { +MDBX_INTERNAL_FUNC int mdbx_condpair_lock(mdbx_condpair_t *condpair) { #if defined(_WIN32) || defined(_WIN64) - DWORD code = WaitForSingleObject(condmutex->mutex, INFINITE); + DWORD code = WaitForSingleObject(condpair->mutex, INFINITE); return waitstatus2errcode(code); #else - return pthread_mutex_lock(&condmutex->mutex); + return pthread_mutex_lock(&condpair->mutex); #endif } -MDBX_INTERNAL_FUNC int mdbx_condmutex_unlock(mdbx_condmutex_t *condmutex) { +MDBX_INTERNAL_FUNC int mdbx_condpair_unlock(mdbx_condpair_t *condpair) { #if defined(_WIN32) || defined(_WIN64) - return ReleaseMutex(condmutex->mutex) ? MDBX_SUCCESS : GetLastError(); + return ReleaseMutex(condpair->mutex) ? MDBX_SUCCESS : GetLastError(); #else - return pthread_mutex_unlock(&condmutex->mutex); + return pthread_mutex_unlock(&condpair->mutex); #endif } -MDBX_INTERNAL_FUNC int mdbx_condmutex_signal(mdbx_condmutex_t *condmutex) { +MDBX_INTERNAL_FUNC int mdbx_condpair_signal(mdbx_condpair_t *condpair, + bool part) { #if defined(_WIN32) || defined(_WIN64) - return SetEvent(condmutex->event) ? MDBX_SUCCESS : GetLastError(); + return SetEvent(condpair->event[part]) ? MDBX_SUCCESS : GetLastError(); #else - return pthread_cond_signal(&condmutex->cond); + return pthread_cond_signal(&condpair->cond[part]); #endif } -MDBX_INTERNAL_FUNC int mdbx_condmutex_wait(mdbx_condmutex_t *condmutex) { +MDBX_INTERNAL_FUNC int mdbx_condpair_wait(mdbx_condpair_t *condpair, + bool part) { #if defined(_WIN32) || defined(_WIN64) - DWORD code = - SignalObjectAndWait(condmutex->mutex, condmutex->event, INFINITE, FALSE); + DWORD code = SignalObjectAndWait(condpair->mutex, condpair->event[part], + INFINITE, FALSE); if (code == WAIT_OBJECT_0) { - code = WaitForSingleObject(condmutex->mutex, INFINITE); + code = WaitForSingleObject(condpair->mutex, INFINITE); if (code == WAIT_OBJECT_0) - return ResetEvent(condmutex->event) ? MDBX_SUCCESS : GetLastError(); + return MDBX_SUCCESS; } return waitstatus2errcode(code); #else - return pthread_cond_wait(&condmutex->cond, &condmutex->mutex); + return pthread_cond_wait(&condpair->cond[part], &condpair->mutex); #endif } diff --git a/src/osal.h b/src/osal.h index ddead087..85e84ebd 100644 --- a/src/osal.h +++ b/src/osal.h @@ -160,8 +160,8 @@ typedef unsigned mdbx_thread_key_t; #define THREAD_RESULT DWORD typedef struct { HANDLE mutex; - HANDLE event; -} mdbx_condmutex_t; + HANDLE event[2]; +} mdbx_condpair_t; typedef CRITICAL_SECTION mdbx_fastmutex_t; #if MDBX_AVOID_CRT @@ -222,8 +222,8 @@ typedef pthread_key_t mdbx_thread_key_t; #define THREAD_RESULT void * typedef struct { pthread_mutex_t mutex; - pthread_cond_t cond; -} mdbx_condmutex_t; + pthread_cond_t cond[2]; +} mdbx_condpair_t; typedef pthread_mutex_t mdbx_fastmutex_t; #define mdbx_malloc malloc #define mdbx_calloc calloc @@ -542,12 +542,13 @@ MDBX_INTERNAL_FUNC int mdbx_memalign_alloc(size_t alignment, size_t bytes, MDBX_INTERNAL_FUNC void mdbx_memalign_free(void *ptr); #endif -MDBX_INTERNAL_FUNC int mdbx_condmutex_init(mdbx_condmutex_t *condmutex); -MDBX_INTERNAL_FUNC int mdbx_condmutex_lock(mdbx_condmutex_t *condmutex); -MDBX_INTERNAL_FUNC int mdbx_condmutex_unlock(mdbx_condmutex_t *condmutex); -MDBX_INTERNAL_FUNC int mdbx_condmutex_signal(mdbx_condmutex_t *condmutex); -MDBX_INTERNAL_FUNC int mdbx_condmutex_wait(mdbx_condmutex_t *condmutex); -MDBX_INTERNAL_FUNC int mdbx_condmutex_destroy(mdbx_condmutex_t *condmutex); +MDBX_INTERNAL_FUNC int mdbx_condpair_init(mdbx_condpair_t *condpair); +MDBX_INTERNAL_FUNC int mdbx_condpair_lock(mdbx_condpair_t *condpair); +MDBX_INTERNAL_FUNC int mdbx_condpair_unlock(mdbx_condpair_t *condpair); +MDBX_INTERNAL_FUNC int mdbx_condpair_signal(mdbx_condpair_t *condpair, + bool part); +MDBX_INTERNAL_FUNC int mdbx_condpair_wait(mdbx_condpair_t *condpair, bool part); +MDBX_INTERNAL_FUNC int mdbx_condpair_destroy(mdbx_condpair_t *condpair); MDBX_INTERNAL_FUNC int mdbx_fastmutex_init(mdbx_fastmutex_t *fastmutex); MDBX_INTERNAL_FUNC int mdbx_fastmutex_acquire(mdbx_fastmutex_t *fastmutex);