mdbx: mdbx_condmutex_t instead of mutex/condvar pair.

This commit is contained in:
Leo Yuriev 2017-05-23 18:40:21 +03:00
parent 2f97939efd
commit 8828e90ff9
4 changed files with 99 additions and 75 deletions

View File

@ -32,7 +32,7 @@
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
/* rthc */ /* rthc */
static mdbx_mutex_t mdbx_rthc_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t mdbx_rthc_mutex = PTHREAD_MUTEX_INITIALIZER;
void mdbx_rthc_lock(void) { void mdbx_rthc_lock(void) {
mdbx_ensure(NULL, pthread_mutex_lock(&mdbx_rthc_mutex) == 0); mdbx_ensure(NULL, pthread_mutex_lock(&mdbx_rthc_mutex) == 0);
@ -273,7 +273,8 @@ int mdbx_lck_seize(MDB_env *env) {
#define pthread_mutex_consistent(mutex) pthread_mutex_consistent_np(mutex) #define pthread_mutex_consistent(mutex) pthread_mutex_consistent_np(mutex)
#endif #endif
static int __cold mdbx_mutex_failed(MDB_env *env, mdbx_mutex_t *mutex, int rc) { static int __cold mdbx_mutex_failed(MDB_env *env, pthread_mutex_t *mutex,
int rc) {
#if MDB_USE_ROBUST #if MDB_USE_ROBUST
if (rc == EOWNERDEAD) { if (rc == EOWNERDEAD) {
/* We own the mutex. Clean up after dead previous owner. */ /* We own the mutex. Clean up after dead previous owner. */

View File

@ -8137,8 +8137,7 @@ int mdbx_put(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data,
typedef struct mdbx_copy { typedef struct mdbx_copy {
MDB_env *mc_env; MDB_env *mc_env;
MDB_txn *mc_txn; MDB_txn *mc_txn;
mdbx_mutex_t mc_mutex; mdbx_condmutex_t mc_condmutex;
mdbx_cond_t mc_cond; /* Condition variable for mc_new */
char *mc_wbuf[2]; char *mc_wbuf[2];
char *mc_over[2]; char *mc_over[2];
int mc_wlen[2]; int mc_wlen[2];
@ -8158,10 +8157,10 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
char *ptr; char *ptr;
int toggle = 0, wsize; int toggle = 0, wsize;
mdbx_mutex_lock(&my->mc_mutex); mdbx_condmutex_lock(&my->mc_condmutex);
while (!my->mc_error) { while (!my->mc_error) {
while (!my->mc_new) while (!my->mc_new)
mdbx_cond_wait(&my->mc_cond, &my->mc_mutex); mdbx_condmutex_wait(&my->mc_condmutex);
if (my->mc_new == 0 + MDB_EOF) /* 0 buffers, just EOF */ if (my->mc_new == 0 + MDB_EOF) /* 0 buffers, just EOF */
break; break;
wsize = my->mc_wlen[toggle]; wsize = my->mc_wlen[toggle];
@ -8184,9 +8183,9 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
toggle ^= 1; toggle ^= 1;
/* Return the empty buffer to provider */ /* Return the empty buffer to provider */
my->mc_new--; my->mc_new--;
mdbx_cond_signal(&my->mc_cond); mdbx_condmutex_signal(&my->mc_condmutex);
} }
mdbx_mutex_unlock(&my->mc_mutex); mdbx_condmutex_unlock(&my->mc_condmutex);
return (THREAD_RESULT)0; return (THREAD_RESULT)0;
} }
@ -8195,12 +8194,12 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
* [in] my control structure. * [in] my control structure.
* [in] adjust (1 to hand off 1 buffer) | (MDB_EOF when ending). */ * [in] adjust (1 to hand off 1 buffer) | (MDB_EOF when ending). */
static int __cold mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) { static int __cold mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) {
mdbx_mutex_lock(&my->mc_mutex); mdbx_condmutex_lock(&my->mc_condmutex);
my->mc_new += adjust; my->mc_new += adjust;
mdbx_cond_signal(&my->mc_cond); mdbx_condmutex_signal(&my->mc_condmutex);
while (my->mc_new & 2) /* both buffers in use */ while (my->mc_new & 2) /* both buffers in use */
mdbx_cond_wait(&my->mc_cond, &my->mc_mutex); mdbx_condmutex_wait(&my->mc_condmutex);
mdbx_mutex_unlock(&my->mc_mutex); mdbx_condmutex_unlock(&my->mc_condmutex);
my->mc_toggle ^= (adjust & 1); my->mc_toggle ^= (adjust & 1);
/* Both threads reset mc_wlen, to be safe from threading errors */ /* Both threads reset mc_wlen, to be safe from threading errors */
@ -8376,10 +8375,8 @@ static int __cold mdbx_env_compact(MDB_env *env, mdbx_filehandle_t fd) {
int rc; int rc;
memset(&my, 0, sizeof(my)); memset(&my, 0, sizeof(my));
if ((rc = mdbx_mutex_init(&my.mc_mutex)) != 0) if ((rc = mdbx_condmutex_init(&my.mc_condmutex)) != 0)
return rc; return rc;
if ((rc = mdbx_cond_init(&my.mc_cond)) != 0)
goto done2;
rc = mdbx_memalign_alloc(env->me_os_psize, MDB_WBUF * 2, rc = mdbx_memalign_alloc(env->me_os_psize, MDB_WBUF * 2,
(void **)&my.mc_wbuf[0]); (void **)&my.mc_wbuf[0]);
if (rc != MDB_SUCCESS) if (rc != MDB_SUCCESS)
@ -8457,9 +8454,7 @@ finish:
done: done:
mdbx_memalign_free(my.mc_wbuf[0]); mdbx_memalign_free(my.mc_wbuf[0]);
mdbx_cond_destroy(&my.mc_cond); mdbx_condmutex_destroy(&my.mc_condmutex);
done2:
mdbx_mutex_destroy(&my.mc_mutex);
return rc ? rc : my.mc_error; return rc ? rc : my.mc_error;
} }

View File

@ -164,77 +164,104 @@ void mdbx_memalign_free(void *ptr) {
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
int mdbx_mutex_init(mdbx_mutex_t *mutex) { int mdbx_condmutex_init(mdbx_condmutex_t *condmutex) {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
*mutex = CreateMutex(NULL, FALSE, NULL); int rc = MDB_SUCCESS;
return *mutex ? MDB_SUCCESS : mdbx_get_errno_checked(); condmutex->event = NULL;
condmutex->mutex = CreateMutex(NULL, FALSE, NULL);
if (!condmutex->mutex)
return mdbx_get_errno_checked();
condmutex->event = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!condmutex->event) {
rc = mdbx_get_errno_checked();
(void)CloseHandle(condmutex->mutex);
condmutex->mutex = NULL;
}
return rc;
#else #else
return pthread_mutex_init(mutex, NULL); memset(condmutex, 0, sizeof(mdbx_condmutex_t));
int rc = pthread_mutex_init(&condmutex->mutex, NULL);
if (rc == 0) {
rc = pthread_cond_init(&condmutex->cond, NULL);
if (rc != 0)
(void)pthread_mutex_destroy(&condmutex->mutex);
}
return rc;
#endif #endif
} }
int mdbx_mutex_destroy(mdbx_mutex_t *mutex) { static bool is_allzeros(const void *ptr, size_t bytes) {
#if defined(_WIN32) || defined(_WIN64) const uint8_t *u8 = ptr;
return CloseHandle(*mutex) ? MDB_SUCCESS : mdbx_get_errno_checked(); for (size_t i = 0; i < bytes; ++i)
#else if (u8[i] != 0)
return pthread_mutex_destroy(mutex); return false;
#endif return true;
} }
int mdbx_mutex_lock(mdbx_mutex_t *mutex) { int mdbx_condmutex_destroy(mdbx_condmutex_t *condmutex) {
int rc = MDBX_EINVAL;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
DWORD code = WaitForSingleObject(*mutex, INFINITE); if (condmutex->event) {
rc = CloseHandle(condmutex->event) ? MDB_SUCCESS : mdbx_get_errno_checked();
if (rc == MDB_SUCCESS)
condmutex->event = NULL;
}
if (condmutex->mutex) {
rc = CloseHandle(condmutex->mutex) ? MDB_SUCCESS : mdbx_get_errno_checked();
if (rc == MDB_SUCCESS)
condmutex->mutex = NULL;
}
#else
if (!is_allzeros(&condmutex->cond, sizeof(condmutex->cond))) {
rc = pthread_cond_destroy(&condmutex->cond);
if (rc == 0)
memset(&condmutex->cond, 0, sizeof(condmutex->cond));
}
if (!is_allzeros(&condmutex->mutex, sizeof(condmutex->mutex))) {
rc = pthread_mutex_destroy(&condmutex->mutex);
if (rc == 0)
memset(&condmutex->mutex, 0, sizeof(condmutex->mutex));
}
#endif
return rc;
}
int mdbx_condmutex_lock(mdbx_condmutex_t *condmutex) {
#if defined(_WIN32) || defined(_WIN64)
DWORD code = WaitForSingleObject(condmutex->mutex, INFINITE);
return waitstatus2errcode(code); return waitstatus2errcode(code);
#else #else
return pthread_mutex_lock(mutex); return pthread_mutex_lock(&condmutex->mutex);
#endif #endif
} }
int mdbx_mutex_unlock(mdbx_mutex_t *mutex) { int mdbx_condmutex_unlock(mdbx_condmutex_t *condmutex) {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
return ReleaseMutex(*mutex) ? MDB_SUCCESS : mdbx_get_errno_checked(); return ReleaseMutex(condmutex->mutex) ? MDB_SUCCESS
: mdbx_get_errno_checked();
#else #else
return pthread_mutex_unlock(mutex); return pthread_mutex_unlock(&condmutex->mutex);
#endif #endif
} }
/*----------------------------------------------------------------------------*/ int mdbx_condmutex_signal(mdbx_condmutex_t *condmutex) {
int mdbx_cond_init(mdbx_cond_t *cond) {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
*cond = CreateEvent(NULL, FALSE, FALSE, NULL); return SetEvent(condmutex->event) ? MDB_SUCCESS : mdbx_get_errno_checked();
return *cond ? MDB_SUCCESS : mdbx_get_errno_checked();
#else #else
return pthread_cond_init(cond, NULL); return pthread_cond_signal(&condmutex->cond);
#endif #endif
} }
#ifndef mdbx_cond_destroy int mdbx_condmutex_wait(mdbx_condmutex_t *condmutex) {
int mdbx_cond_destroy(mdbx_cond_t *cond) {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
return CloseHandle(*cond) ? MDB_SUCCESS : mdbx_get_errno_checked(); DWORD code =
#else SignalObjectAndWait(condmutex->mutex, condmutex->event, INFINITE, FALSE);
return pthread_cond_destroy(cond);
#endif
}
#endif /* mdbx_cond_destroy */
int mdbx_cond_signal(mdbx_cond_t *cond) {
#if defined(_WIN32) || defined(_WIN64)
return SetEvent(*cond) ? MDB_SUCCESS : mdbx_get_errno_checked();
#else
return pthread_cond_signal(cond);
#endif
}
int mdbx_cond_wait(mdbx_cond_t *cond, mdbx_mutex_t *mutex) {
#if defined(_WIN32) || defined(_WIN64)
DWORD code = SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE);
if (code == WAIT_OBJECT_0) if (code == WAIT_OBJECT_0)
code = WaitForSingleObject(*mutex, INFINITE); code = WaitForSingleObject(condmutex->mutex, INFINITE);
return waitstatus2errcode(code); return waitstatus2errcode(code);
#else #else
return pthread_cond_wait(cond, mutex); return pthread_cond_wait(&condmutex->cond, &condmutex->mutex);
#endif #endif
} }

View File

@ -57,8 +57,6 @@
#include <winnt.h> #include <winnt.h>
#define HAVE_SYS_STAT_H #define HAVE_SYS_STAT_H
#define HAVE_SYS_TYPES_H #define HAVE_SYS_TYPES_H
typedef HANDLE mdbx_mutex_t;
typedef HANDLE mdbx_cond_t;
typedef HANDLE mdbx_thread_t; typedef HANDLE mdbx_thread_t;
typedef unsigned mdbx_thread_key_t; typedef unsigned mdbx_thread_key_t;
typedef SSIZE_T ssize_t; typedef SSIZE_T ssize_t;
@ -66,6 +64,10 @@ typedef SSIZE_T ssize_t;
#define HIGH_DWORD(v) ((DWORD)((sizeof(v) > 4) ? ((uint64_t)(v) >> 32) : 0)) #define HIGH_DWORD(v) ((DWORD)((sizeof(v) > 4) ? ((uint64_t)(v) >> 32) : 0))
#define THREAD_CALL WINAPI #define THREAD_CALL WINAPI
#define THREAD_RESULT DWORD #define THREAD_RESULT DWORD
typedef struct {
HANDLE mutex;
HANDLE event;
} mdbx_condmutex_t;
#else #else
#include <pthread.h> #include <pthread.h>
#include <sys/file.h> #include <sys/file.h>
@ -74,13 +76,15 @@ typedef SSIZE_T ssize_t;
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/uio.h> #include <sys/uio.h>
#include <unistd.h> #include <unistd.h>
typedef pthread_mutex_t mdbx_mutex_t;
typedef pthread_cond_t mdbx_cond_t;
typedef pthread_t mdbx_thread_t; typedef pthread_t mdbx_thread_t;
typedef pthread_key_t mdbx_thread_key_t; typedef pthread_key_t mdbx_thread_key_t;
#define INVALID_HANDLE_VALUE (-1) #define INVALID_HANDLE_VALUE (-1)
#define THREAD_CALL #define THREAD_CALL
#define THREAD_RESULT void * #define THREAD_RESULT void *
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
} mdbx_condmutex_t;
#endif /* Platform */ #endif /* Platform */
#ifndef SSIZE_MAX #ifndef SSIZE_MAX
@ -384,15 +388,12 @@ static __inline int __mdbx_get_errno_checked(const char *file, unsigned line) {
int mdbx_memalign_alloc(size_t alignment, size_t bytes, void **result); int mdbx_memalign_alloc(size_t alignment, size_t bytes, void **result);
void mdbx_memalign_free(void *ptr); void mdbx_memalign_free(void *ptr);
int mdbx_mutex_init(mdbx_mutex_t *mutex); int mdbx_condmutex_init(mdbx_condmutex_t *condmutex);
int mdbx_mutex_destroy(mdbx_mutex_t *mutex); int mdbx_condmutex_lock(mdbx_condmutex_t *condmutex);
int mdbx_mutex_lock(mdbx_mutex_t *mutex); int mdbx_condmutex_unlock(mdbx_condmutex_t *condmutex);
int mdbx_mutex_unlock(mdbx_mutex_t *mutex); int mdbx_condmutex_signal(mdbx_condmutex_t *condmutex);
int mdbx_condmutex_wait(mdbx_condmutex_t *condmutex);
int mdbx_cond_init(mdbx_cond_t *cond); int mdbx_condmutex_destroy(mdbx_condmutex_t *condmutex);
int mdbx_cond_destroy(mdbx_cond_t *cond);
int mdbx_cond_signal(mdbx_cond_t *cond);
int mdbx_cond_wait(mdbx_cond_t *cond, mdbx_mutex_t *mutex);
int mdbx_pwritev(mdbx_filehandle_t fd, struct iovec *iov, int iovcnt, int mdbx_pwritev(mdbx_filehandle_t fd, struct iovec *iov, int iovcnt,
off_t offset, size_t expected_written); off_t offset, size_t expected_written);