mirror of
https://github.com/isar/libmdbx.git
synced 2025-01-04 18:54:13 +08:00
mdbx: rework reader_check0() and mutex recovery.
This commit is contained in:
parent
41c51fdac2
commit
55226499a8
@ -829,3 +829,6 @@ static __inline size_t roundup2(size_t value, size_t granularity) {
|
|||||||
assert(is_power2(granularity));
|
assert(is_power2(granularity));
|
||||||
return (value + granularity - 1) & ~(granularity - 1);
|
return (value + granularity - 1) & ~(granularity - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define MDBX_IS_ERROR(rc) \
|
||||||
|
((rc) != MDBX_RESULT_TRUE && (rc) != MDBX_RESULT_FALSE)
|
||||||
|
@ -121,17 +121,18 @@ int mdbx_rdt_lock(MDB_env *env) {
|
|||||||
|
|
||||||
void mdbx_rdt_unlock(MDB_env *env) {
|
void mdbx_rdt_unlock(MDB_env *env) {
|
||||||
int rc = mdbx_robust_unlock(env, &env->me_txns->mti_rmutex);
|
int rc = mdbx_robust_unlock(env, &env->me_txns->mti_rmutex);
|
||||||
if (unlikely(rc != 0))
|
if (unlikely(MDBX_IS_ERROR(rc)))
|
||||||
mdbx_panic("%s() failed: errcode %d\n", mdbx_func_, rc);
|
mdbx_panic("%s() failed: errcode %d\n", mdbx_func_, rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
int mdbx_txn_lock(MDB_env *env) {
|
int mdbx_txn_lock(MDB_env *env) {
|
||||||
return mdbx_robust_lock(env, &env->me_txns->mti_wmutex);
|
int rc = mdbx_robust_lock(env, &env->me_txns->mti_wmutex);
|
||||||
|
return MDBX_IS_ERROR(rc) ? rc : MDB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mdbx_txn_unlock(MDB_env *env) {
|
void mdbx_txn_unlock(MDB_env *env) {
|
||||||
int rc = mdbx_robust_unlock(env, &env->me_txns->mti_wmutex);
|
int rc = mdbx_robust_unlock(env, &env->me_txns->mti_wmutex);
|
||||||
if (unlikely(rc != 0))
|
if (unlikely(MDBX_IS_ERROR(rc)))
|
||||||
mdbx_panic("%s() failed: errcode %d\n", mdbx_func_, rc);
|
mdbx_panic("%s() failed: errcode %d\n", mdbx_func_, rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,42 +206,46 @@ static int mdbx_lck_op(mdbx_filehandle_t fd, int op, int lck, off_t offset) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if !__GLIBC_PREREQ(2, 12) && !defined(pthread_mutex_consistent)
|
||||||
|
#define pthread_mutex_consistent(mutex) pthread_mutex_consistent_np(mutex)
|
||||||
|
#endif
|
||||||
|
|
||||||
static int __cold mdbx_mutex_failed(MDB_env *env, mdbx_mutex_t *mutex, int rc) {
|
static int __cold mdbx_mutex_failed(MDB_env *env, mdbx_mutex_t *mutex, int rc) {
|
||||||
#if MDB_USE_ROBUST
|
#if MDB_USE_ROBUST
|
||||||
if (unlikely(rc == EOWNERDEAD)) {
|
if (rc == EOWNERDEAD) {
|
||||||
/* We own the mutex. Clean up after dead previous owner. */
|
/* We own the mutex. Clean up after dead previous owner. */
|
||||||
rc = MDB_SUCCESS;
|
|
||||||
int rlocked = (mutex == &env->me_txns->mti_rmutex);
|
int rlocked = (mutex == &env->me_txns->mti_rmutex);
|
||||||
|
rc = MDB_SUCCESS;
|
||||||
if (!rlocked) {
|
if (!rlocked) {
|
||||||
/* env is hosed if the dead thread was ours */
|
if (unlikely(env->me_txn)) {
|
||||||
if (env->me_txn) {
|
/* env is hosed if the dead thread was ours */
|
||||||
env->me_flags |= MDB_FATAL_ERROR;
|
env->me_flags |= MDB_FATAL_ERROR;
|
||||||
env->me_txn = NULL;
|
env->me_txn = NULL;
|
||||||
rc = MDB_PANIC;
|
rc = MDB_PANIC;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mdbx_debug("%cmutex owner died, %s", (rlocked ? 'r' : 'w'),
|
mdbx_notice("%cmutex owner died, %s", (rlocked ? 'r' : 'w'),
|
||||||
(rc ? "this process' env is hosed" : "recovering"));
|
(rc ? "this process' env is hosed" : "recovering"));
|
||||||
int rc2 = mdbx_reader_check0(env, rlocked, NULL);
|
|
||||||
if (rc2 == 0)
|
int check_rc = mdbx_reader_check0(env, rlocked, NULL);
|
||||||
#if __GLIBC_PREREQ(2, 12)
|
int mreco_rc = pthread_mutex_consistent(mutex);
|
||||||
rc2 = pthread_mutex_consistent(mutex);
|
check_rc = (mreco_rc == 0) ? check_rc : mreco_rc;
|
||||||
#else
|
|
||||||
rc2 = pthread_mutex_consistent_np(mutex);
|
if (unlikely(mreco_rc))
|
||||||
#endif
|
mdbx_error("mutex recovery failed, %s", mdbx_strerror(mreco_rc));
|
||||||
if (rc || (rc = rc2)) {
|
|
||||||
mdbx_debug("mutex recovery failed, %s", mdbx_strerror(rc));
|
rc = (rc == MDB_SUCCESS) ? check_rc : rc;
|
||||||
|
if (MDBX_IS_ERROR(rc))
|
||||||
pthread_mutex_unlock(mutex);
|
pthread_mutex_unlock(mutex);
|
||||||
}
|
return rc;
|
||||||
}
|
}
|
||||||
#endif /* MDB_USE_ROBUST */
|
#endif /* MDB_USE_ROBUST */
|
||||||
|
|
||||||
if (unlikely(rc)) {
|
mdbx_error("lock mutex failed, %s", mdbx_strerror(rc));
|
||||||
mdbx_debug("lock mutex failed, %s", mdbx_strerror(rc));
|
if (rc != EDEADLK) {
|
||||||
if (rc != EDEADLK) {
|
env->me_flags |= MDB_FATAL_ERROR;
|
||||||
env->me_flags |= MDB_FATAL_ERROR;
|
rc = MDB_PANIC;
|
||||||
rc = MDB_PANIC;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
49
src/mdbx.c
49
src/mdbx.c
@ -2129,8 +2129,9 @@ static int mdbx_txn_renew0(MDB_txn *txn, unsigned flags) {
|
|||||||
mdbx_tid_t tid = mdbx_thread_self();
|
mdbx_tid_t tid = mdbx_thread_self();
|
||||||
|
|
||||||
rc = mdbx_rdt_lock(env);
|
rc = mdbx_rdt_lock(env);
|
||||||
if (unlikely(rc != MDB_SUCCESS))
|
if (unlikely(MDBX_IS_ERROR(rc)))
|
||||||
return rc;
|
return rc;
|
||||||
|
rc = MDB_SUCCESS;
|
||||||
|
|
||||||
if (unlikely(env->me_live_reader != pid)) {
|
if (unlikely(env->me_live_reader != pid)) {
|
||||||
rc = mdbx_rpid_set(env);
|
rc = mdbx_rpid_set(env);
|
||||||
@ -9129,11 +9130,10 @@ int __cold mdbx_reader_check0(MDB_env *env, int rdt_locked, int *dead) {
|
|||||||
mdbx_pid_t *pids = alloca((snap_nreaders + 1) * sizeof(mdbx_pid_t));
|
mdbx_pid_t *pids = alloca((snap_nreaders + 1) * sizeof(mdbx_pid_t));
|
||||||
pids[0] = 0;
|
pids[0] = 0;
|
||||||
|
|
||||||
unsigned i;
|
|
||||||
int rc = MDBX_RESULT_FALSE, count = 0;
|
int rc = MDBX_RESULT_FALSE, count = 0;
|
||||||
MDB_reader *mr = env->me_txns->mti_readers;
|
MDB_reader *mr = env->me_txns->mti_readers;
|
||||||
|
|
||||||
for (i = 0; i < snap_nreaders; i++) {
|
for (unsigned i = 0; i < snap_nreaders; i++) {
|
||||||
const mdbx_pid_t pid = mr[i].mr_pid;
|
const mdbx_pid_t pid = mr[i].mr_pid;
|
||||||
if (pid == 0)
|
if (pid == 0)
|
||||||
continue;
|
continue;
|
||||||
@ -9151,33 +9151,32 @@ int __cold mdbx_reader_check0(MDB_env *env, int rdt_locked, int *dead) {
|
|||||||
|
|
||||||
/* stale reader found */
|
/* stale reader found */
|
||||||
if (!rdt_locked) {
|
if (!rdt_locked) {
|
||||||
rdt_locked = -1;
|
|
||||||
rc = mdbx_rdt_lock(env);
|
rc = mdbx_rdt_lock(env);
|
||||||
if (rc != MDB_SUCCESS) {
|
if (MDBX_IS_ERROR(rc))
|
||||||
if (rc != MDBX_RESULT_TRUE)
|
break;
|
||||||
break; /* lock failed */
|
|
||||||
/* recovered after mutex owner died */
|
rdt_locked = -1;
|
||||||
snap_nreaders = 0; /* the above checked all readers */
|
if (rc == MDBX_RESULT_TRUE)
|
||||||
} else {
|
/* the above checked all readers */
|
||||||
/* a other process may have clean and reused slot, recheck */
|
break;
|
||||||
if (mr[i].mr_pid != pid)
|
|
||||||
continue;
|
/* a other process may have clean and reused slot, recheck */
|
||||||
rc = mdbx_rpid_check(env, pid);
|
if (mr[i].mr_pid != pid)
|
||||||
if (rc != MDBX_RESULT_FALSE) {
|
continue;
|
||||||
if (rc != MDBX_RESULT_TRUE)
|
|
||||||
break; /* mdbx_rpid_check() failed */
|
rc = mdbx_rpid_check(env, pid);
|
||||||
/* the race with other process, slot reused */
|
if (MDBX_IS_ERROR(rc))
|
||||||
rc = MDBX_RESULT_FALSE;
|
break;
|
||||||
continue;
|
|
||||||
}
|
if (rc != MDBX_RESULT_FALSE) {
|
||||||
|
/* the race with other process, slot reused */
|
||||||
|
rc = MDBX_RESULT_FALSE;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(mr[i].mr_pid == pid);
|
|
||||||
|
|
||||||
/* clean it */
|
/* clean it */
|
||||||
unsigned j;
|
for (unsigned j = i; j < snap_nreaders; j++) {
|
||||||
for (j = i; j < snap_nreaders; j++) {
|
|
||||||
if (mr[j].mr_pid == pid) {
|
if (mr[j].mr_pid == pid) {
|
||||||
mdbx_debug("clear stale reader pid %u txn %zd", (unsigned)pid,
|
mdbx_debug("clear stale reader pid %u txn %zd", (unsigned)pid,
|
||||||
mr[j].mr_txnid);
|
mr[j].mr_txnid);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user