From 0a01b4611286b1da4867f5edb9cb221b073713c0 Mon Sep 17 00:00:00 2001 From: Leonid Yuriev Date: Thu, 30 Jul 2020 01:17:03 +0300 Subject: [PATCH] mdbx: add mdbx_thread_register() and mdbx_thread_unregister(). Change-Id: I605bc75a20631e781043fafcc26f5e59cb40adaa --- ChangeLog.md | 2 +- mdbx.h | 38 +++++++++- src/core.c | 209 ++++++++++++++++++++++++++++++++++++--------------- 3 files changed, 184 insertions(+), 65 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index d4045880..10c8d22a 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -4,7 +4,7 @@ ChangeLog ## v0.9.x (in the development): - Since v0.9 usage of custom comparators and the `mdbx_dbi_open_ex()` are deprecated. - Support for Doxygen & [online API reference](https://erthink.github.io/libmdbx/). -- TODO: API for explicit threads (de)registration. +- Functions to explicit reader threads (de)registration. - TODO: Native bindings for C++. - TODO: Packages for AltLinux, Fedora/RHEL, Debian/Ubuntu. diff --git a/mdbx.h b/mdbx.h index fbeed109..1cbd7a76 100644 --- a/mdbx.h +++ b/mdbx.h @@ -3605,8 +3605,7 @@ LIBMDBX_API int mdbx_reader_list(const MDBX_env *env, * \param [out] dead Number of stale slots that were cleared. * * \returns A non-zero error value on failure and 0 on success, - * or \ref MDBX_RESULT_TRUE if a dead reader(s) found or mutex was recovered. - */ + * or \ref MDBX_RESULT_TRUE if a dead reader(s) found or mutex was recovered. */ LIBMDBX_API int mdbx_reader_check(MDBX_env *env, int *dead); /** Returns a lag of the reading for the given transaction. @@ -3624,6 +3623,41 @@ LIBMDBX_API int mdbx_reader_check(MDBX_env *env, int *dead); MDBX_DEPRECATED LIBMDBX_API int mdbx_txn_straggler(const MDBX_txn *txn, int *percent); +/** Registers the current thread as a reader for the environment. + * \ingroup c_extra + * + * To perform read operations without blocking, a reader slot must be assigned + * for each thread. However, this assignment requires a short-term lock + * acquisition which is performed automatically. This function allows you to + * assign the reader slot in advance and thus avoid capturing the blocker when + * the read transaction starts firstly from current thread. + * \see mdbx_thread_unregister() + * + * \note Threads are registered automatically the first time a read transaction + * starts. Therefore, there is no need to use this function, except in + * special cases. + * + * \param [in] env An environment handle returned by \ref mdbx_env_create(). + * + * \returns A non-zero error value on failure and 0 on success, + * or \ref MDBX_RESULT_TRUE if thread is already registered. */ +LIBMDBX_API int mdbx_thread_register(MDBX_env *env); + +/** Unregisters the current thread as a reader for the environment. + * \ingroup c_extra + * + * To perform read operations without blocking, a reader slot must be assigned + * for each thread. However, the assigned reader slot will remain occupied until + * the thread ends or the environment closes. This function allows you to + * explicitly release the assigned reader slot. + * \see mdbx_thread_register() + * + * \param [in] env An environment handle returned by \ref mdbx_env_create(). + * + * \returns A non-zero error value on failure and 0 on success, or + * \ref MDBX_RESULT_TRUE if thread is not registered or already undegistered. */ +LIBMDBX_API int mdbx_thread_unregister(MDBX_env *env); + /** A lack-of-space callback function to resolve issues with a laggard readers. * \ingroup c_err * diff --git a/src/core.c b/src/core.c index c3b2bbcb..f5037e69 100644 --- a/src/core.c +++ b/src/core.c @@ -5902,6 +5902,146 @@ static void mdbx_txn_valgrind(MDBX_env *env, MDBX_txn *txn) { } #endif /* MDBX_USE_VALGRIND || __SANITIZE_ADDRESS__ */ +typedef struct { + int err; + MDBX_reader *rslot; +} bind_rslot_result; + +static bind_rslot_result bind_rslot(MDBX_env *env, const uintptr_t tid) { + mdbx_assert(env, env->me_lck); + mdbx_assert(env, (env->me_flags & (MDBX_NOTLS | MDBX_ENV_TXKEY)) == + MDBX_ENV_TXKEY); + mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC); + mdbx_assert(env, env->me_lck->mti_os_and_format == MDBX_LOCK_FORMAT); + + bind_rslot_result result = {mdbx_rdt_lock(env), nullptr}; + if (unlikely(MDBX_IS_ERROR(result.err))) + return result; + if (unlikely(env->me_flags & MDBX_FATAL_ERROR)) { + mdbx_rdt_unlock(env); + result.err = MDBX_PANIC; + return result; + } + if (unlikely(!env->me_map)) { + mdbx_rdt_unlock(env); + result.err = MDBX_EPERM; + return result; + } + + if (unlikely(env->me_live_reader != env->me_pid)) { + result.err = mdbx_rpid_set(env); + if (unlikely(result.err != MDBX_SUCCESS)) { + mdbx_rdt_unlock(env); + return result; + } + env->me_live_reader = env->me_pid; + } + + result.err = MDBX_SUCCESS; + unsigned slot, nreaders; + while (1) { + nreaders = env->me_lck->mti_numreaders; + for (slot = 0; slot < nreaders; slot++) + if (env->me_lck->mti_readers[slot].mr_pid == 0) + break; + + if (likely(slot < env->me_maxreaders)) + break; + + result.err = mdbx_reader_check0(env, true, NULL); + if (result.err != MDBX_RESULT_TRUE) { + mdbx_rdt_unlock(env); + result.err = + (result.err == MDBX_SUCCESS) ? MDBX_READERS_FULL : result.err; + return result; + } + } + + result.rslot = &env->me_lck->mti_readers[slot]; + /* Claim the reader slot, carefully since other code + * uses the reader table un-mutexed: First reset the + * slot, next publish it in lck->mti_numreaders. After + * that, it is safe for mdbx_env_close() to touch it. + * When it will be closed, we can finally claim it. */ + result.rslot->mr_pid = 0; + safe64_reset(&result.rslot->mr_txnid, true); + if (slot == nreaders) + env->me_lck->mti_numreaders = ++nreaders; + result.rslot->mr_tid = (env->me_flags & MDBX_NOTLS) ? 0 : tid; + result.rslot->mr_pid = env->me_pid; + mdbx_rdt_unlock(env); + + if (likely(env->me_flags & MDBX_ENV_TXKEY)) { + mdbx_assert(env, env->me_live_reader == env->me_pid); + thread_rthc_set(env->me_txkey, result.rslot); + } + return result; +} + +__cold int mdbx_thread_register(MDBX_env *env) { + int rc = check_env(env); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + if (unlikely(!env->me_lck)) + return (env->me_flags & MDBX_EXCLUSIVE) ? MDBX_EINVAL : MDBX_EPERM; + + if (unlikely((env->me_flags & MDBX_ENV_TXKEY) == 0)) { + mdbx_assert(env, !env->me_lck || (env->me_flags & MDBX_NOTLS)); + return MDBX_EINVAL /* MDBX_NOTLS mode */; + } + + mdbx_assert(env, (env->me_flags & (MDBX_NOTLS | MDBX_ENV_TXKEY | + MDBX_EXCLUSIVE)) == MDBX_ENV_TXKEY); + MDBX_reader *r = thread_rthc_get(env->me_txkey); + if (unlikely(r != NULL)) { + mdbx_assert(env, r->mr_pid == env->me_pid); + mdbx_assert(env, r->mr_tid == mdbx_thread_self()); + if (unlikely(r->mr_pid != env->me_pid)) + return MDBX_BAD_RSLOT; + return MDBX_RESULT_TRUE /* already registered */; + } + + const uintptr_t tid = mdbx_thread_self(); + if (env->me_txn0 && unlikely(env->me_txn0->mt_owner == tid)) + return MDBX_TXN_OVERLAPPING; + return bind_rslot(env, tid).err; +} + +__cold int mdbx_thread_unregister(MDBX_env *env) { + int rc = check_env(env); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; + + if (unlikely(!env->me_lck)) + return MDBX_RESULT_TRUE; + + if (unlikely((env->me_flags & MDBX_ENV_TXKEY) == 0)) { + mdbx_assert(env, !env->me_lck || (env->me_flags & MDBX_NOTLS)); + return MDBX_RESULT_TRUE /* MDBX_NOTLS mode */; + } + + mdbx_assert(env, (env->me_flags & (MDBX_NOTLS | MDBX_ENV_TXKEY | + MDBX_EXCLUSIVE)) == MDBX_ENV_TXKEY); + MDBX_reader *r = thread_rthc_get(env->me_txkey); + if (unlikely(r == NULL)) + return MDBX_RESULT_TRUE /* not registered */; + + mdbx_assert(env, r->mr_pid == env->me_pid); + mdbx_assert(env, r->mr_tid == mdbx_thread_self()); + if (unlikely(r->mr_pid != env->me_pid || r->mr_tid != mdbx_thread_self())) + return MDBX_BAD_RSLOT; + + if (unlikely(r->mr_txnid.inconsistent < SAFE64_INVALID_THRESHOLD)) + return MDBX_BUSY /* transaction is still active */; + + r->mr_pid = 0; + mdbx_compiler_barrier(); + env->me_lck->mti_readers_refresh_flag = true; + thread_rthc_set(env->me_txkey, nullptr); + return MDBX_SUCCESS; +} + /* Common code for mdbx_txn_begin() and mdbx_txn_renew(). */ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) { MDBX_env *env = txn->mt_env; @@ -5951,70 +6091,13 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) { r->mr_txnid.inconsistent < SAFE64_INVALID_THRESHOLD)) return MDBX_BAD_RSLOT; } else if (env->me_lck) { - unsigned slot, nreaders; - mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC); - mdbx_assert(env, env->me_lck->mti_os_and_format == MDBX_LOCK_FORMAT); - - rc = mdbx_rdt_lock(env); - if (unlikely(MDBX_IS_ERROR(rc))) - return rc; - if (unlikely(env->me_flags & MDBX_FATAL_ERROR)) { - mdbx_rdt_unlock(env); - return MDBX_PANIC; - } -#if defined(_WIN32) || defined(_WIN64) - if (unlikely(!env->me_map)) { - mdbx_rdt_unlock(env); - return MDBX_EPERM; - } -#endif /* Windows */ - rc = MDBX_SUCCESS; - - if (unlikely(env->me_live_reader != env->me_pid)) { - rc = mdbx_rpid_set(env); - if (unlikely(rc != MDBX_SUCCESS)) { - mdbx_rdt_unlock(env); - return rc; - } - env->me_live_reader = env->me_pid; - } - - while (1) { - nreaders = env->me_lck->mti_numreaders; - for (slot = 0; slot < nreaders; slot++) - if (env->me_lck->mti_readers[slot].mr_pid == 0) - break; - - if (likely(slot < env->me_maxreaders)) - break; - - rc = mdbx_reader_check0(env, true, NULL); - if (rc != MDBX_RESULT_TRUE) { - mdbx_rdt_unlock(env); - return (rc == MDBX_SUCCESS) ? MDBX_READERS_FULL : rc; - } - } - - r = &env->me_lck->mti_readers[slot]; - /* Claim the reader slot, carefully since other code - * uses the reader table un-mutexed: First reset the - * slot, next publish it in lck->mti_numreaders. After - * that, it is safe for mdbx_env_close() to touch it. - * When it will be closed, we can finally claim it. */ - r->mr_pid = 0; - safe64_reset(&r->mr_txnid, true); - if (slot == nreaders) - env->me_lck->mti_numreaders = ++nreaders; - r->mr_tid = (env->me_flags & MDBX_NOTLS) ? 0 : tid; - r->mr_pid = env->me_pid; - mdbx_rdt_unlock(env); - - if (likely(env->me_flags & MDBX_ENV_TXKEY)) { - mdbx_assert(env, env->me_live_reader == env->me_pid); - thread_rthc_set(env->me_txkey, r); - } + bind_rslot_result brs = bind_rslot(env, tid); + if (unlikely(brs.err != MDBX_SUCCESS)) + return brs.err; + r = brs.rslot; } + /* Seek & fetch the last meta */ while (1) { MDBX_meta *const meta = mdbx_meta_head(env); mdbx_jitter4testing(false); @@ -17121,6 +17204,7 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) { mdbx_debug("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN, (size_t)pid, lck->mti_readers[j].mr_txnid.inconsistent); lck->mti_readers[j].mr_pid = 0; + mdbx_compiler_barrier(); lck->mti_readers_refresh_flag = true; count++; } @@ -17246,6 +17330,7 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) { asleep->mr_tid = 0; asleep->mr_pid = 0; } + mdbx_compiler_barrier(); lck->mti_readers_refresh_flag = true; mdbx_flush_incoherent_cpu_writeback(); }