mirror of
https://github.com/isar/libmdbx.git
synced 2025-01-21 18:18:21 +08:00
mdbx: using couple of condvars/events to avoid glitches on Windows.
Change-Id: I3256a8dcbb95c78e8dea3eb31ca73f42c58d2f61
This commit is contained in:
parent
a2bdbc97dc
commit
51a016245a
22
src/core.c
22
src/core.c
@ -15356,7 +15356,7 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
|
|||||||
typedef struct mdbx_copy {
|
typedef struct mdbx_copy {
|
||||||
MDBX_env *mc_env;
|
MDBX_env *mc_env;
|
||||||
MDBX_txn *mc_txn;
|
MDBX_txn *mc_txn;
|
||||||
mdbx_condmutex_t mc_condmutex;
|
mdbx_condpair_t mc_condpair;
|
||||||
uint8_t *mc_wbuf[2];
|
uint8_t *mc_wbuf[2];
|
||||||
uint8_t *mc_over[2];
|
uint8_t *mc_over[2];
|
||||||
size_t mc_wlen[2];
|
size_t mc_wlen[2];
|
||||||
@ -15376,10 +15376,10 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
|
|||||||
uint8_t *ptr;
|
uint8_t *ptr;
|
||||||
int toggle = 0;
|
int toggle = 0;
|
||||||
|
|
||||||
mdbx_condmutex_lock(&my->mc_condmutex);
|
mdbx_condpair_lock(&my->mc_condpair);
|
||||||
while (!my->mc_error) {
|
while (!my->mc_error) {
|
||||||
while (!my->mc_new && !my->mc_error) {
|
while (!my->mc_new && !my->mc_error) {
|
||||||
int err = mdbx_condmutex_wait(&my->mc_condmutex);
|
int err = mdbx_condpair_wait(&my->mc_condpair, true);
|
||||||
if (err != MDBX_SUCCESS) {
|
if (err != MDBX_SUCCESS) {
|
||||||
my->mc_error = err;
|
my->mc_error = err;
|
||||||
goto bailout;
|
goto bailout;
|
||||||
@ -15409,10 +15409,10 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
|
|||||||
toggle ^= 1;
|
toggle ^= 1;
|
||||||
/* Return the empty buffer to provider */
|
/* Return the empty buffer to provider */
|
||||||
my->mc_new--;
|
my->mc_new--;
|
||||||
mdbx_condmutex_signal(&my->mc_condmutex);
|
mdbx_condpair_signal(&my->mc_condpair, false);
|
||||||
}
|
}
|
||||||
bailout:
|
bailout:
|
||||||
mdbx_condmutex_unlock(&my->mc_condmutex);
|
mdbx_condpair_unlock(&my->mc_condpair);
|
||||||
return (THREAD_RESULT)0;
|
return (THREAD_RESULT)0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -15421,15 +15421,15 @@ bailout:
|
|||||||
* [in] my control structure.
|
* [in] my control structure.
|
||||||
* [in] adjust (1 to hand off 1 buffer) | (MDBX_EOF when ending). */
|
* [in] adjust (1 to hand off 1 buffer) | (MDBX_EOF when ending). */
|
||||||
static int __cold mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) {
|
static int __cold mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) {
|
||||||
mdbx_condmutex_lock(&my->mc_condmutex);
|
mdbx_condpair_lock(&my->mc_condpair);
|
||||||
my->mc_new += (short)adjust;
|
my->mc_new += (short)adjust;
|
||||||
mdbx_condmutex_signal(&my->mc_condmutex);
|
mdbx_condpair_signal(&my->mc_condpair, true);
|
||||||
while (!my->mc_error && (my->mc_new & 2) /* both buffers in use */) {
|
while (!my->mc_error && (my->mc_new & 2) /* both buffers in use */) {
|
||||||
int err = mdbx_condmutex_wait(&my->mc_condmutex);
|
int err = mdbx_condpair_wait(&my->mc_condpair, false);
|
||||||
if (err != MDBX_SUCCESS)
|
if (err != MDBX_SUCCESS)
|
||||||
my->mc_error = err;
|
my->mc_error = err;
|
||||||
}
|
}
|
||||||
mdbx_condmutex_unlock(&my->mc_condmutex);
|
mdbx_condpair_unlock(&my->mc_condpair);
|
||||||
|
|
||||||
my->mc_toggle ^= (adjust & 1);
|
my->mc_toggle ^= (adjust & 1);
|
||||||
/* Both threads reset mc_wlen, to be safe from threading errors */
|
/* Both threads reset mc_wlen, to be safe from threading errors */
|
||||||
@ -15694,7 +15694,7 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
|
|||||||
|
|
||||||
mdbx_copy ctx;
|
mdbx_copy ctx;
|
||||||
memset(&ctx, 0, sizeof(ctx));
|
memset(&ctx, 0, sizeof(ctx));
|
||||||
rc = mdbx_condmutex_init(&ctx.mc_condmutex);
|
rc = mdbx_condpair_init(&ctx.mc_condpair);
|
||||||
if (unlikely(rc != MDBX_SUCCESS))
|
if (unlikely(rc != MDBX_SUCCESS))
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
@ -15717,7 +15717,7 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
|
|||||||
rc = mdbx_env_cwalk(&ctx, &root, 0);
|
rc = mdbx_env_cwalk(&ctx, &root, 0);
|
||||||
mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF);
|
mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF);
|
||||||
thread_err = mdbx_thread_join(thread);
|
thread_err = mdbx_thread_join(thread);
|
||||||
mdbx_condmutex_destroy(&ctx.mc_condmutex);
|
mdbx_condpair_destroy(&ctx.mc_condpair);
|
||||||
}
|
}
|
||||||
if (unlikely(thread_err != MDBX_SUCCESS))
|
if (unlikely(thread_err != MDBX_SUCCESS))
|
||||||
return thread_err;
|
return thread_err;
|
||||||
|
122
src/osal.c
122
src/osal.c
@ -362,106 +362,98 @@ char *mdbx_strdup(const char *str) {
|
|||||||
|
|
||||||
/*----------------------------------------------------------------------------*/
|
/*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_init(mdbx_condmutex_t *condmutex) {
|
MDBX_INTERNAL_FUNC int mdbx_condpair_init(mdbx_condpair_t *condpair) {
|
||||||
|
int rc;
|
||||||
|
memset(condpair, 0, sizeof(mdbx_condpair_t));
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
int rc = MDBX_SUCCESS;
|
if ((condpair->mutex = CreateMutexW(NULL, FALSE, NULL)) == NULL) {
|
||||||
condmutex->event = NULL;
|
|
||||||
condmutex->mutex = CreateMutexW(NULL, FALSE, NULL);
|
|
||||||
if (!condmutex->mutex)
|
|
||||||
return GetLastError();
|
|
||||||
|
|
||||||
condmutex->event = CreateEventW(NULL, TRUE, FALSE, NULL);
|
|
||||||
if (!condmutex->event) {
|
|
||||||
rc = GetLastError();
|
rc = GetLastError();
|
||||||
(void)CloseHandle(condmutex->mutex);
|
goto bailout_mutex;
|
||||||
condmutex->mutex = NULL;
|
|
||||||
}
|
}
|
||||||
return rc;
|
if ((condpair->event[0] = CreateEventW(NULL, FALSE, FALSE, NULL)) == NULL) {
|
||||||
|
rc = GetLastError();
|
||||||
|
goto bailout_event;
|
||||||
|
}
|
||||||
|
if ((condpair->event[1] = CreateEventW(NULL, FALSE, FALSE, NULL)) != NULL)
|
||||||
|
return MDBX_SUCCESS;
|
||||||
|
|
||||||
|
rc = GetLastError();
|
||||||
|
(void)CloseHandle(condpair->event[0]);
|
||||||
|
bailout_event:
|
||||||
|
(void)CloseHandle(condpair->mutex);
|
||||||
#else
|
#else
|
||||||
memset(condmutex, 0, sizeof(mdbx_condmutex_t));
|
rc = pthread_mutex_init(&condpair->mutex, NULL);
|
||||||
int rc = pthread_mutex_init(&condmutex->mutex, NULL);
|
if (unlikely(rc != 0))
|
||||||
if (rc == 0) {
|
goto bailout_mutex;
|
||||||
rc = pthread_cond_init(&condmutex->cond, NULL);
|
rc = pthread_cond_init(&condpair->cond[0], NULL);
|
||||||
if (rc != 0)
|
if (unlikely(rc != 0))
|
||||||
(void)pthread_mutex_destroy(&condmutex->mutex);
|
goto bailout_cond;
|
||||||
}
|
rc = pthread_cond_init(&condpair->cond[1], NULL);
|
||||||
return rc;
|
if (likely(rc == 0))
|
||||||
|
return MDBX_SUCCESS;
|
||||||
|
|
||||||
|
(void)pthread_cond_destroy(&condpair->cond[0]);
|
||||||
|
bailout_cond:
|
||||||
|
(void)pthread_mutex_destroy(&condpair->mutex);
|
||||||
#endif
|
#endif
|
||||||
|
bailout_mutex:
|
||||||
|
memset(condpair, 0, sizeof(mdbx_condpair_t));
|
||||||
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool is_allzeros(const void *ptr, size_t bytes) {
|
MDBX_INTERNAL_FUNC int mdbx_condpair_destroy(mdbx_condpair_t *condpair) {
|
||||||
const uint8_t *u8 = ptr;
|
|
||||||
for (size_t i = 0; i < bytes; ++i)
|
|
||||||
if (u8[i] != 0)
|
|
||||||
return false;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_destroy(mdbx_condmutex_t *condmutex) {
|
|
||||||
int rc = MDBX_EINVAL;
|
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
if (condmutex->event) {
|
int rc = CloseHandle(condpair->mutex) ? MDBX_SUCCESS : GetLastError();
|
||||||
rc = CloseHandle(condmutex->event) ? MDBX_SUCCESS : GetLastError();
|
rc = CloseHandle(condpair->event[0]) ? rc : GetLastError();
|
||||||
if (rc == MDBX_SUCCESS)
|
rc = CloseHandle(condpair->event[1]) ? rc : GetLastError();
|
||||||
condmutex->event = NULL;
|
|
||||||
}
|
|
||||||
if (condmutex->mutex) {
|
|
||||||
rc = CloseHandle(condmutex->mutex) ? MDBX_SUCCESS : GetLastError();
|
|
||||||
if (rc == MDBX_SUCCESS)
|
|
||||||
condmutex->mutex = NULL;
|
|
||||||
}
|
|
||||||
#else
|
#else
|
||||||
if (!is_allzeros(&condmutex->cond, sizeof(condmutex->cond))) {
|
int err, rc = pthread_mutex_destroy(&condpair->mutex);
|
||||||
rc = pthread_cond_destroy(&condmutex->cond);
|
rc = (err = pthread_cond_destroy(&condpair->cond[0])) ? err : rc;
|
||||||
if (rc == 0)
|
rc = (err = pthread_cond_destroy(&condpair->cond[1])) ? err : rc;
|
||||||
memset(&condmutex->cond, 0, sizeof(condmutex->cond));
|
|
||||||
}
|
|
||||||
if (!is_allzeros(&condmutex->mutex, sizeof(condmutex->mutex))) {
|
|
||||||
rc = pthread_mutex_destroy(&condmutex->mutex);
|
|
||||||
if (rc == 0)
|
|
||||||
memset(&condmutex->mutex, 0, sizeof(condmutex->mutex));
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
memset(condpair, 0, sizeof(mdbx_condpair_t));
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_lock(mdbx_condmutex_t *condmutex) {
|
MDBX_INTERNAL_FUNC int mdbx_condpair_lock(mdbx_condpair_t *condpair) {
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
DWORD code = WaitForSingleObject(condmutex->mutex, INFINITE);
|
DWORD code = WaitForSingleObject(condpair->mutex, INFINITE);
|
||||||
return waitstatus2errcode(code);
|
return waitstatus2errcode(code);
|
||||||
#else
|
#else
|
||||||
return pthread_mutex_lock(&condmutex->mutex);
|
return pthread_mutex_lock(&condpair->mutex);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_unlock(mdbx_condmutex_t *condmutex) {
|
MDBX_INTERNAL_FUNC int mdbx_condpair_unlock(mdbx_condpair_t *condpair) {
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
return ReleaseMutex(condmutex->mutex) ? MDBX_SUCCESS : GetLastError();
|
return ReleaseMutex(condpair->mutex) ? MDBX_SUCCESS : GetLastError();
|
||||||
#else
|
#else
|
||||||
return pthread_mutex_unlock(&condmutex->mutex);
|
return pthread_mutex_unlock(&condpair->mutex);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_signal(mdbx_condmutex_t *condmutex) {
|
MDBX_INTERNAL_FUNC int mdbx_condpair_signal(mdbx_condpair_t *condpair,
|
||||||
|
bool part) {
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
return SetEvent(condmutex->event) ? MDBX_SUCCESS : GetLastError();
|
return SetEvent(condpair->event[part]) ? MDBX_SUCCESS : GetLastError();
|
||||||
#else
|
#else
|
||||||
return pthread_cond_signal(&condmutex->cond);
|
return pthread_cond_signal(&condpair->cond[part]);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_wait(mdbx_condmutex_t *condmutex) {
|
MDBX_INTERNAL_FUNC int mdbx_condpair_wait(mdbx_condpair_t *condpair,
|
||||||
|
bool part) {
|
||||||
#if defined(_WIN32) || defined(_WIN64)
|
#if defined(_WIN32) || defined(_WIN64)
|
||||||
DWORD code =
|
DWORD code = SignalObjectAndWait(condpair->mutex, condpair->event[part],
|
||||||
SignalObjectAndWait(condmutex->mutex, condmutex->event, INFINITE, FALSE);
|
INFINITE, FALSE);
|
||||||
if (code == WAIT_OBJECT_0) {
|
if (code == WAIT_OBJECT_0) {
|
||||||
code = WaitForSingleObject(condmutex->mutex, INFINITE);
|
code = WaitForSingleObject(condpair->mutex, INFINITE);
|
||||||
if (code == WAIT_OBJECT_0)
|
if (code == WAIT_OBJECT_0)
|
||||||
return ResetEvent(condmutex->event) ? MDBX_SUCCESS : GetLastError();
|
return MDBX_SUCCESS;
|
||||||
}
|
}
|
||||||
return waitstatus2errcode(code);
|
return waitstatus2errcode(code);
|
||||||
#else
|
#else
|
||||||
return pthread_cond_wait(&condmutex->cond, &condmutex->mutex);
|
return pthread_cond_wait(&condpair->cond[part], &condpair->mutex);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
21
src/osal.h
21
src/osal.h
@ -160,8 +160,8 @@ typedef unsigned mdbx_thread_key_t;
|
|||||||
#define THREAD_RESULT DWORD
|
#define THREAD_RESULT DWORD
|
||||||
typedef struct {
|
typedef struct {
|
||||||
HANDLE mutex;
|
HANDLE mutex;
|
||||||
HANDLE event;
|
HANDLE event[2];
|
||||||
} mdbx_condmutex_t;
|
} mdbx_condpair_t;
|
||||||
typedef CRITICAL_SECTION mdbx_fastmutex_t;
|
typedef CRITICAL_SECTION mdbx_fastmutex_t;
|
||||||
|
|
||||||
#if MDBX_AVOID_CRT
|
#if MDBX_AVOID_CRT
|
||||||
@ -222,8 +222,8 @@ typedef pthread_key_t mdbx_thread_key_t;
|
|||||||
#define THREAD_RESULT void *
|
#define THREAD_RESULT void *
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
pthread_cond_t cond;
|
pthread_cond_t cond[2];
|
||||||
} mdbx_condmutex_t;
|
} mdbx_condpair_t;
|
||||||
typedef pthread_mutex_t mdbx_fastmutex_t;
|
typedef pthread_mutex_t mdbx_fastmutex_t;
|
||||||
#define mdbx_malloc malloc
|
#define mdbx_malloc malloc
|
||||||
#define mdbx_calloc calloc
|
#define mdbx_calloc calloc
|
||||||
@ -542,12 +542,13 @@ MDBX_INTERNAL_FUNC int mdbx_memalign_alloc(size_t alignment, size_t bytes,
|
|||||||
MDBX_INTERNAL_FUNC void mdbx_memalign_free(void *ptr);
|
MDBX_INTERNAL_FUNC void mdbx_memalign_free(void *ptr);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_init(mdbx_condmutex_t *condmutex);
|
MDBX_INTERNAL_FUNC int mdbx_condpair_init(mdbx_condpair_t *condpair);
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_lock(mdbx_condmutex_t *condmutex);
|
MDBX_INTERNAL_FUNC int mdbx_condpair_lock(mdbx_condpair_t *condpair);
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_unlock(mdbx_condmutex_t *condmutex);
|
MDBX_INTERNAL_FUNC int mdbx_condpair_unlock(mdbx_condpair_t *condpair);
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_signal(mdbx_condmutex_t *condmutex);
|
MDBX_INTERNAL_FUNC int mdbx_condpair_signal(mdbx_condpair_t *condpair,
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_wait(mdbx_condmutex_t *condmutex);
|
bool part);
|
||||||
MDBX_INTERNAL_FUNC int mdbx_condmutex_destroy(mdbx_condmutex_t *condmutex);
|
MDBX_INTERNAL_FUNC int mdbx_condpair_wait(mdbx_condpair_t *condpair, bool part);
|
||||||
|
MDBX_INTERNAL_FUNC int mdbx_condpair_destroy(mdbx_condpair_t *condpair);
|
||||||
|
|
||||||
MDBX_INTERNAL_FUNC int mdbx_fastmutex_init(mdbx_fastmutex_t *fastmutex);
|
MDBX_INTERNAL_FUNC int mdbx_fastmutex_init(mdbx_fastmutex_t *fastmutex);
|
||||||
MDBX_INTERNAL_FUNC int mdbx_fastmutex_acquire(mdbx_fastmutex_t *fastmutex);
|
MDBX_INTERNAL_FUNC int mdbx_fastmutex_acquire(mdbx_fastmutex_t *fastmutex);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user