mdbx: add mdbx_thread_register() and mdbx_thread_unregister().

Change-Id: I605bc75a20631e781043fafcc26f5e59cb40adaa
This commit is contained in:
Leonid Yuriev 2020-07-30 01:17:03 +03:00
parent b91918b027
commit 0a01b46112
3 changed files with 184 additions and 65 deletions

View File

@ -4,7 +4,7 @@ ChangeLog
## v0.9.x (in the development): ## v0.9.x (in the development):
- Since v0.9 usage of custom comparators and the `mdbx_dbi_open_ex()` are deprecated. - Since v0.9 usage of custom comparators and the `mdbx_dbi_open_ex()` are deprecated.
- Support for Doxygen & [online API reference](https://erthink.github.io/libmdbx/). - Support for Doxygen & [online API reference](https://erthink.github.io/libmdbx/).
- TODO: API for explicit threads (de)registration. - Functions to explicit reader threads (de)registration.
- TODO: Native bindings for C++. - TODO: Native bindings for C++.
- TODO: Packages for AltLinux, Fedora/RHEL, Debian/Ubuntu. - TODO: Packages for AltLinux, Fedora/RHEL, Debian/Ubuntu.

38
mdbx.h
View File

@ -3605,8 +3605,7 @@ LIBMDBX_API int mdbx_reader_list(const MDBX_env *env,
* \param [out] dead Number of stale slots that were cleared. * \param [out] dead Number of stale slots that were cleared.
* *
* \returns A non-zero error value on failure and 0 on success, * \returns A non-zero error value on failure and 0 on success,
* or \ref MDBX_RESULT_TRUE if a dead reader(s) found or mutex was recovered. * or \ref MDBX_RESULT_TRUE if a dead reader(s) found or mutex was recovered. */
*/
LIBMDBX_API int mdbx_reader_check(MDBX_env *env, int *dead); LIBMDBX_API int mdbx_reader_check(MDBX_env *env, int *dead);
/** Returns a lag of the reading for the given transaction. /** Returns a lag of the reading for the given transaction.
@ -3624,6 +3623,41 @@ LIBMDBX_API int mdbx_reader_check(MDBX_env *env, int *dead);
MDBX_DEPRECATED LIBMDBX_API int mdbx_txn_straggler(const MDBX_txn *txn, MDBX_DEPRECATED LIBMDBX_API int mdbx_txn_straggler(const MDBX_txn *txn,
int *percent); int *percent);
/** Registers the current thread as a reader for the environment.
* \ingroup c_extra
*
* To perform read operations without blocking, a reader slot must be assigned
* for each thread. However, this assignment requires a short-term lock
* acquisition which is performed automatically. This function allows you to
* assign the reader slot in advance and thus avoid capturing the blocker when
* the read transaction starts firstly from current thread.
* \see mdbx_thread_unregister()
*
* \note Threads are registered automatically the first time a read transaction
* starts. Therefore, there is no need to use this function, except in
* special cases.
*
* \param [in] env An environment handle returned by \ref mdbx_env_create().
*
* \returns A non-zero error value on failure and 0 on success,
* or \ref MDBX_RESULT_TRUE if thread is already registered. */
LIBMDBX_API int mdbx_thread_register(MDBX_env *env);
/** Unregisters the current thread as a reader for the environment.
* \ingroup c_extra
*
* To perform read operations without blocking, a reader slot must be assigned
* for each thread. However, the assigned reader slot will remain occupied until
* the thread ends or the environment closes. This function allows you to
* explicitly release the assigned reader slot.
* \see mdbx_thread_register()
*
* \param [in] env An environment handle returned by \ref mdbx_env_create().
*
* \returns A non-zero error value on failure and 0 on success, or
* \ref MDBX_RESULT_TRUE if thread is not registered or already undegistered. */
LIBMDBX_API int mdbx_thread_unregister(MDBX_env *env);
/** A lack-of-space callback function to resolve issues with a laggard readers. /** A lack-of-space callback function to resolve issues with a laggard readers.
* \ingroup c_err * \ingroup c_err
* *

View File

@ -5902,6 +5902,146 @@ static void mdbx_txn_valgrind(MDBX_env *env, MDBX_txn *txn) {
} }
#endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */ #endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */
typedef struct {
int err;
MDBX_reader *rslot;
} bind_rslot_result;
static bind_rslot_result bind_rslot(MDBX_env *env, const uintptr_t tid) {
mdbx_assert(env, env->me_lck);
mdbx_assert(env, (env->me_flags & (MDBX_NOTLS | MDBX_ENV_TXKEY)) ==
MDBX_ENV_TXKEY);
mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC);
mdbx_assert(env, env->me_lck->mti_os_and_format == MDBX_LOCK_FORMAT);
bind_rslot_result result = {mdbx_rdt_lock(env), nullptr};
if (unlikely(MDBX_IS_ERROR(result.err)))
return result;
if (unlikely(env->me_flags & MDBX_FATAL_ERROR)) {
mdbx_rdt_unlock(env);
result.err = MDBX_PANIC;
return result;
}
if (unlikely(!env->me_map)) {
mdbx_rdt_unlock(env);
result.err = MDBX_EPERM;
return result;
}
if (unlikely(env->me_live_reader != env->me_pid)) {
result.err = mdbx_rpid_set(env);
if (unlikely(result.err != MDBX_SUCCESS)) {
mdbx_rdt_unlock(env);
return result;
}
env->me_live_reader = env->me_pid;
}
result.err = MDBX_SUCCESS;
unsigned slot, nreaders;
while (1) {
nreaders = env->me_lck->mti_numreaders;
for (slot = 0; slot < nreaders; slot++)
if (env->me_lck->mti_readers[slot].mr_pid == 0)
break;
if (likely(slot < env->me_maxreaders))
break;
result.err = mdbx_reader_check0(env, true, NULL);
if (result.err != MDBX_RESULT_TRUE) {
mdbx_rdt_unlock(env);
result.err =
(result.err == MDBX_SUCCESS) ? MDBX_READERS_FULL : result.err;
return result;
}
}
result.rslot = &env->me_lck->mti_readers[slot];
/* Claim the reader slot, carefully since other code
* uses the reader table un-mutexed: First reset the
* slot, next publish it in lck->mti_numreaders. After
* that, it is safe for mdbx_env_close() to touch it.
* When it will be closed, we can finally claim it. */
result.rslot->mr_pid = 0;
safe64_reset(&result.rslot->mr_txnid, true);
if (slot == nreaders)
env->me_lck->mti_numreaders = ++nreaders;
result.rslot->mr_tid = (env->me_flags & MDBX_NOTLS) ? 0 : tid;
result.rslot->mr_pid = env->me_pid;
mdbx_rdt_unlock(env);
if (likely(env->me_flags & MDBX_ENV_TXKEY)) {
mdbx_assert(env, env->me_live_reader == env->me_pid);
thread_rthc_set(env->me_txkey, result.rslot);
}
return result;
}
__cold int mdbx_thread_register(MDBX_env *env) {
int rc = check_env(env);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(!env->me_lck))
return (env->me_flags & MDBX_EXCLUSIVE) ? MDBX_EINVAL : MDBX_EPERM;
if (unlikely((env->me_flags & MDBX_ENV_TXKEY) == 0)) {
mdbx_assert(env, !env->me_lck || (env->me_flags & MDBX_NOTLS));
return MDBX_EINVAL /* MDBX_NOTLS mode */;
}
mdbx_assert(env, (env->me_flags & (MDBX_NOTLS | MDBX_ENV_TXKEY |
MDBX_EXCLUSIVE)) == MDBX_ENV_TXKEY);
MDBX_reader *r = thread_rthc_get(env->me_txkey);
if (unlikely(r != NULL)) {
mdbx_assert(env, r->mr_pid == env->me_pid);
mdbx_assert(env, r->mr_tid == mdbx_thread_self());
if (unlikely(r->mr_pid != env->me_pid))
return MDBX_BAD_RSLOT;
return MDBX_RESULT_TRUE /* already registered */;
}
const uintptr_t tid = mdbx_thread_self();
if (env->me_txn0 && unlikely(env->me_txn0->mt_owner == tid))
return MDBX_TXN_OVERLAPPING;
return bind_rslot(env, tid).err;
}
__cold int mdbx_thread_unregister(MDBX_env *env) {
int rc = check_env(env);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(!env->me_lck))
return MDBX_RESULT_TRUE;
if (unlikely((env->me_flags & MDBX_ENV_TXKEY) == 0)) {
mdbx_assert(env, !env->me_lck || (env->me_flags & MDBX_NOTLS));
return MDBX_RESULT_TRUE /* MDBX_NOTLS mode */;
}
mdbx_assert(env, (env->me_flags & (MDBX_NOTLS | MDBX_ENV_TXKEY |
MDBX_EXCLUSIVE)) == MDBX_ENV_TXKEY);
MDBX_reader *r = thread_rthc_get(env->me_txkey);
if (unlikely(r == NULL))
return MDBX_RESULT_TRUE /* not registered */;
mdbx_assert(env, r->mr_pid == env->me_pid);
mdbx_assert(env, r->mr_tid == mdbx_thread_self());
if (unlikely(r->mr_pid != env->me_pid || r->mr_tid != mdbx_thread_self()))
return MDBX_BAD_RSLOT;
if (unlikely(r->mr_txnid.inconsistent < SAFE64_INVALID_THRESHOLD))
return MDBX_BUSY /* transaction is still active */;
r->mr_pid = 0;
mdbx_compiler_barrier();
env->me_lck->mti_readers_refresh_flag = true;
thread_rthc_set(env->me_txkey, nullptr);
return MDBX_SUCCESS;
}
/* Common code for mdbx_txn_begin() and mdbx_txn_renew(). */ /* Common code for mdbx_txn_begin() and mdbx_txn_renew(). */
static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) { static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
MDBX_env *env = txn->mt_env; MDBX_env *env = txn->mt_env;
@ -5951,70 +6091,13 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
r->mr_txnid.inconsistent < SAFE64_INVALID_THRESHOLD)) r->mr_txnid.inconsistent < SAFE64_INVALID_THRESHOLD))
return MDBX_BAD_RSLOT; return MDBX_BAD_RSLOT;
} else if (env->me_lck) { } else if (env->me_lck) {
unsigned slot, nreaders; bind_rslot_result brs = bind_rslot(env, tid);
mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC); if (unlikely(brs.err != MDBX_SUCCESS))
mdbx_assert(env, env->me_lck->mti_os_and_format == MDBX_LOCK_FORMAT); return brs.err;
r = brs.rslot;
rc = mdbx_rdt_lock(env);
if (unlikely(MDBX_IS_ERROR(rc)))
return rc;
if (unlikely(env->me_flags & MDBX_FATAL_ERROR)) {
mdbx_rdt_unlock(env);
return MDBX_PANIC;
}
#if defined(_WIN32) || defined(_WIN64)
if (unlikely(!env->me_map)) {
mdbx_rdt_unlock(env);
return MDBX_EPERM;
}
#endif /* Windows */
rc = MDBX_SUCCESS;
if (unlikely(env->me_live_reader != env->me_pid)) {
rc = mdbx_rpid_set(env);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_rdt_unlock(env);
return rc;
}
env->me_live_reader = env->me_pid;
}
while (1) {
nreaders = env->me_lck->mti_numreaders;
for (slot = 0; slot < nreaders; slot++)
if (env->me_lck->mti_readers[slot].mr_pid == 0)
break;
if (likely(slot < env->me_maxreaders))
break;
rc = mdbx_reader_check0(env, true, NULL);
if (rc != MDBX_RESULT_TRUE) {
mdbx_rdt_unlock(env);
return (rc == MDBX_SUCCESS) ? MDBX_READERS_FULL : rc;
}
}
r = &env->me_lck->mti_readers[slot];
/* Claim the reader slot, carefully since other code
* uses the reader table un-mutexed: First reset the
* slot, next publish it in lck->mti_numreaders. After
* that, it is safe for mdbx_env_close() to touch it.
* When it will be closed, we can finally claim it. */
r->mr_pid = 0;
safe64_reset(&r->mr_txnid, true);
if (slot == nreaders)
env->me_lck->mti_numreaders = ++nreaders;
r->mr_tid = (env->me_flags & MDBX_NOTLS) ? 0 : tid;
r->mr_pid = env->me_pid;
mdbx_rdt_unlock(env);
if (likely(env->me_flags & MDBX_ENV_TXKEY)) {
mdbx_assert(env, env->me_live_reader == env->me_pid);
thread_rthc_set(env->me_txkey, r);
}
} }
/* Seek & fetch the last meta */
while (1) { while (1) {
MDBX_meta *const meta = mdbx_meta_head(env); MDBX_meta *const meta = mdbx_meta_head(env);
mdbx_jitter4testing(false); mdbx_jitter4testing(false);
@ -17121,6 +17204,7 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) {
mdbx_debug("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN, mdbx_debug("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN,
(size_t)pid, lck->mti_readers[j].mr_txnid.inconsistent); (size_t)pid, lck->mti_readers[j].mr_txnid.inconsistent);
lck->mti_readers[j].mr_pid = 0; lck->mti_readers[j].mr_pid = 0;
mdbx_compiler_barrier();
lck->mti_readers_refresh_flag = true; lck->mti_readers_refresh_flag = true;
count++; count++;
} }
@ -17246,6 +17330,7 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
asleep->mr_tid = 0; asleep->mr_tid = 0;
asleep->mr_pid = 0; asleep->mr_pid = 0;
} }
mdbx_compiler_barrier();
lck->mti_readers_refresh_flag = true; lck->mti_readers_refresh_flag = true;
mdbx_flush_incoherent_cpu_writeback(); mdbx_flush_incoherent_cpu_writeback();
} }