mdbx: fix/rework mdbx_reader_check().

This commit is contained in:
Leo Yuriev 2017-04-21 18:26:32 +03:00
parent 1b490fda24
commit c2087f186e
4 changed files with 79 additions and 50 deletions

View File

@ -171,6 +171,12 @@ int mdbx_rpid_clear(MDB_env *env) {
return mdbx_lck_op(env->me_lfd, F_SETLKW, F_UNLCK, env->me_pid); return mdbx_lck_op(env->me_lfd, F_SETLKW, F_UNLCK, env->me_pid);
} }
/* Checks reader by pid.
*
* Returns:
* MDBX_RESULT_TRUE, if pid is live (unable to acquire lock)
* MDBX_RESULT_FALSE, if pid is dead (lock acquired)
* or otherwise the errcode. */
int mdbx_rpid_check(MDB_env *env, mdbx_pid_t pid) { int mdbx_rpid_check(MDB_env *env, mdbx_pid_t pid) {
int rc = mdbx_lck_op(env->me_lfd, F_GETLK, F_WRLCK, pid); int rc = mdbx_lck_op(env->me_lfd, F_GETLK, F_WRLCK, pid);
if (rc == 0) if (rc == 0)
@ -205,7 +211,7 @@ static int __cold mdbx_mutex_failed(MDB_env *env, mdbx_mutex_t *mutex, int rc) {
int rlocked, rc2; int rlocked, rc2;
/* We own the mutex. Clean up after dead previous owner. */ /* We own the mutex. Clean up after dead previous owner. */
rc = MDB_SUCCESS; rc = MDBX_RESULT_TRUE;
rlocked = (mutex == &env->me_txns->mti_rmutex); rlocked = (mutex == &env->me_txns->mti_rmutex);
if (!rlocked) { if (!rlocked) {
/* Keep mtb.mti_txnid updated, otherwise next writer can /* Keep mtb.mti_txnid updated, otherwise next writer can

View File

@ -314,6 +314,12 @@ int mdbx_rpid_clear(MDB_env *env) {
return MDB_SUCCESS; return MDB_SUCCESS;
} }
/* Checks reader by pid.
*
* Returns:
* MDBX_RESULT_TRUE, if pid is live (unable to acquire lock)
* MDBX_RESULT_FALSE, if pid is dead (lock acquired)
* or otherwise the errcode. */
int mdbx_rpid_check(MDB_env *env, mdbx_pid_t pid) { int mdbx_rpid_check(MDB_env *env, mdbx_pid_t pid) {
(void)env; (void)env;
HANDLE hProcess = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, FALSE, pid); HANDLE hProcess = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, FALSE, pid);

View File

@ -2197,11 +2197,14 @@ static int mdbx_txn_renew0(MDB_txn *txn, unsigned flags) {
env->me_live_reader = pid; env->me_live_reader = pid;
} }
retry:
nr = env->me_txns->mti_numreaders; nr = env->me_txns->mti_numreaders;
for (i = 0; i < nr; i++) for (i = 0; i < nr; i++)
if (env->me_txns->mti_readers[i].mr_pid == 0) if (env->me_txns->mti_readers[i].mr_pid == 0)
break; break;
if (unlikely(i == env->me_maxreaders)) { if (unlikely(i == env->me_maxreaders)) {
if (mdbx_reader_check0(env, 1, NULL))
goto retry;
mdbx_rdt_unlock(env); mdbx_rdt_unlock(env);
return MDB_READERS_FULL; return MDB_READERS_FULL;
} }
@ -9195,11 +9198,8 @@ int __cold mdbx_reader_check(MDB_env *env, int *dead) {
return mdbx_reader_check0(env, 0, dead); return mdbx_reader_check0(env, 0, dead);
} }
int __cold mdbx_reader_check0(MDB_env *env, int rlocked, int *dead) { int __cold mdbx_reader_check0(MDB_env *env, int rdt_locked, int *dead) {
assert(rlocked >= 0); assert(rdt_locked >= 0);
unsigned i, j;
mdbx_pid_t *pids, pid;
int rc = MDB_SUCCESS, count = 0;
if (unlikely(env->me_pid != mdbx_getpid())) { if (unlikely(env->me_pid != mdbx_getpid())) {
env->me_flags |= MDB_FATAL_ERROR; env->me_flags |= MDB_FATAL_ERROR;
@ -9207,59 +9207,69 @@ int __cold mdbx_reader_check0(MDB_env *env, int rlocked, int *dead) {
} }
unsigned snap_nreaders = env->me_txns->mti_numreaders; unsigned snap_nreaders = env->me_txns->mti_numreaders;
pids = malloc((snap_nreaders + 1) * sizeof(mdbx_pid_t)); mdbx_pid_t *pids = alloca((snap_nreaders + 1) * sizeof(mdbx_pid_t));
if (!pids)
return ENOMEM;
pids[0] = 0; pids[0] = 0;
unsigned i;
int rc = MDBX_RESULT_FALSE, count = 0;
MDB_reader *mr = env->me_txns->mti_readers; MDB_reader *mr = env->me_txns->mti_readers;
for (i = 0; i < snap_nreaders; i++) { for (i = 0; i < snap_nreaders; i++) {
pid = mr[i].mr_pid; const mdbx_pid_t pid = mr[i].mr_pid;
if (pid && pid != env->me_pid) { if (pid == 0)
if (mdbx_pid_insert(pids, pid) == 0) { continue;
if (pid != env->me_pid)
continue;
if (mdbx_pid_insert(pids, pid) != 0)
continue;
rc = mdbx_rpid_check(env, pid);
if (rc == MDBX_RESULT_TRUE)
continue; /* reader is live */
if (rc != MDBX_RESULT_FALSE)
break; /* mdbx_rpid_check() failed */
/* stale reader found */
if (!rdt_locked) {
rdt_locked = -1;
rc = mdbx_rdt_lock(env);
if (rc != MDB_SUCCESS) {
if (rc != MDBX_RESULT_TRUE)
break; /* lock failed */
/* recovered after mutex owner died */
snap_nreaders = 0; /* the above checked all readers */
} else {
/* a other process may have clean and reused slot, recheck */
if (mr[i].mr_pid != pid)
continue;
rc = mdbx_rpid_check(env, pid); rc = mdbx_rpid_check(env, pid);
if (rc == MDBX_RESULT_FALSE) { if (rc != MDBX_RESULT_FALSE) {
/* stale reader found */ if (rc != MDBX_RESULT_TRUE)
j = i; break; /* mdbx_rpid_check() failed */
if (!rlocked) { /* the race with other process, slot reused */
rlocked = -1; rc = MDBX_RESULT_FALSE;
rc = mdbx_rdt_lock(env); continue;
if (rc != MDB_SUCCESS) { }
if (rc != MDBX_RESULT_TRUE) { }
break; /* lock failed */ }
} else {
/* recovered after mutex owner died */ assert(mr[i].mr_pid == pid);
snap_nreaders = 0; /* the above checked all readers */
} /* clean it */
} else { unsigned j;
/* a other process may have clean and reused slot, recheck */ for (j = i; j < snap_nreaders; j++) {
rc = mdbx_rpid_check(env, pid); if (mr[j].mr_pid == pid) {
if (rc != MDBX_RESULT_FALSE) { mdbx_debug("clear stale reader pid %u txn %zd", (unsigned)pid,
if (rc != MDBX_RESULT_TRUE) mr[j].mr_txnid);
break; /* mdbx_rpid_check() failed */ mr[j].mr_pid = 0;
/* the race with other process, slot reused */ count++;
rc = MDB_SUCCESS;
continue;
}
}
}
for (; j < snap_nreaders; j++) {
if (mr[j].mr_pid == pid) {
mdbx_debug("clear stale reader pid %u txn %zd", (unsigned)pid,
mr[j].mr_txnid);
mr[j].mr_pid = 0;
count++;
}
}
} else if (rc != MDBX_RESULT_TRUE)
break; /* mdbx_rpid_check() failed */
} }
} }
} }
if (rlocked < 0) if (rdt_locked < 0)
mdbx_rdt_unlock(env); mdbx_rdt_unlock(env);
free(pids);
if (dead) if (dead)
*dead = count; *dead = count;

View File

@ -421,6 +421,13 @@ void mdbx_txn_unlock(MDB_env *env);
int mdbx_rpid_set(MDB_env *env); int mdbx_rpid_set(MDB_env *env);
int mdbx_rpid_clear(MDB_env *env); int mdbx_rpid_clear(MDB_env *env);
/* Checks reader by pid.
*
* Returns:
* MDBX_RESULT_TRUE, if pid is live (unable to acquire lock)
* MDBX_RESULT_FALSE, if pid is dead (lock acquired)
* or otherwise the errcode. */
int mdbx_rpid_check(MDB_env *env, mdbx_pid_t pid); int mdbx_rpid_check(MDB_env *env, mdbx_pid_t pid);
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/