mdbx: fix/rework C11 atomics usage to avoid performance regression.

Resolve https://github.com/erthink/libmdbx/issues/160

Change-Id: Ic14ef8c9f1e4fb844952f51e81b58268d7939cfe
This commit is contained in:
Leonid Yuriev 2021-02-06 21:16:56 +03:00
parent 016f644815
commit c89f30e485

View File

@ -803,15 +803,14 @@ static __always_inline memory_order mo_c11_load(enum MDBX_memory_order fence) {
}
#endif /* MDBX_HAVE_C11ATOMICS */
static __maybe_unused __always_inline void mdbx_memory_fence(bool checkpoint,
bool write) {
static __maybe_unused __always_inline void
mdbx_memory_fence(enum MDBX_memory_order order, bool write) {
#ifdef MDBX_HAVE_C11ATOMICS
atomic_thread_fence(
checkpoint ? memory_order_seq_cst
: (write ? memory_order_release : memory_order_acquire));
atomic_thread_fence(write ? mo_c11_store(order) : mo_c11_load(order));
#else /* MDBX_HAVE_C11ATOMICS */
mdbx_compiler_barrier();
if (checkpoint || (write && MDBX_CPU_WRITEBACK_INCOHERENT))
if (write &&
order > (MDBX_CPU_WRITEBACK_INCOHERENT ? mo_Relaxed : mo_AcquireRelease))
mdbx_memory_barrier();
#endif /* MDBX_HAVE_C11ATOMICS */
}
@ -827,8 +826,7 @@ atomic_store32(MDBX_atomic_uint32_t *p, const uint32_t value,
if (order != mo_Relaxed)
mdbx_compiler_barrier();
p->weak = value;
if (order != mo_Relaxed)
mdbx_memory_fence(order == mo_SequentialConsistency, true);
mdbx_memory_fence(order, true);
#endif /* MDBX_HAVE_C11ATOMICS */
return value;
}
@ -840,8 +838,7 @@ atomic_load32(const MDBX_atomic_uint32_t *p, enum MDBX_memory_order order) {
assert(atomic_is_lock_free(MDBX_c11a_ro(uint32_t, p)));
return atomic_load_explicit(MDBX_c11a_ro(uint32_t, p), mo_c11_load(order));
#else /* MDBX_HAVE_C11ATOMICS */
if (order != mo_Relaxed)
mdbx_memory_fence(order == mo_SequentialConsistency, false);
mdbx_memory_fence(order, false);
const uint32_t value = p->weak;
if (order != mo_Relaxed)
mdbx_compiler_barrier();
@ -861,12 +858,11 @@ static __always_inline uint64_t atomic_store64(MDBX_atomic_uint64_t *p,
if (order != mo_Relaxed)
mdbx_compiler_barrier();
p->weak = value;
if (order != mo_Relaxed)
mdbx_memory_fence(order == mo_SequentialConsistency, true);
mdbx_memory_fence(order, true);
#endif /* MDBX_HAVE_C11ATOMICS */
#else /* !MDBX_64BIT_ATOMIC */
atomic_store32(&p->low, (uint32_t)value,
(order == mo_Relaxed) ? mo_Relaxed : mo_AcquireRelease);
mdbx_compiler_barrier();
atomic_store32(&p->low, (uint32_t)value, mo_Relaxed);
mdbx_jitter4testing(true);
atomic_store32(&p->high, (uint32_t)(value >> 32), order);
mdbx_jitter4testing(true);
@ -882,20 +878,21 @@ static __always_inline uint64_t atomic_load64(const MDBX_atomic_uint64_t *p,
assert(atomic_is_lock_free(MDBX_c11a_ro(uint64_t, p)));
return atomic_load_explicit(MDBX_c11a_ro(uint64_t, p), mo_c11_load(order));
#else /* MDBX_HAVE_C11ATOMICS */
if (order != mo_Relaxed)
mdbx_memory_fence(order == mo_SequentialConsistency, false);
mdbx_memory_fence(order, false);
const uint64_t value = p->weak;
if (order != mo_Relaxed)
mdbx_compiler_barrier();
return value;
#endif /* MDBX_HAVE_C11ATOMICS */
#else /* !MDBX_64BIT_ATOMIC */
mdbx_compiler_barrier();
uint64_t value = (uint64_t)atomic_load32(&p->high, order) << 32;
mdbx_jitter4testing(true);
value |= atomic_load32(&p->low, (order == mo_Relaxed) ? mo_Relaxed
: mo_AcquireRelease);
mdbx_jitter4testing(true);
for (;;) {
mdbx_compiler_barrier();
uint64_t again = (uint64_t)atomic_load32(&p->high, order) << 32;
mdbx_jitter4testing(true);
again |= atomic_load32(&p->low, (order == mo_Relaxed) ? mo_Relaxed
@ -1039,7 +1036,7 @@ static __always_inline void safe64_reset(MDBX_atomic_uint64_t *p,
atomic_add32(&p->low, 1) /* avoid ABA in safe64_reset_compare() */;
atomic_store32(
&p->high, UINT32_MAX,
mo_AcquireRelease) /* atomically make >= SAFE64_INVALID_THRESHOLD */;
mo_Relaxed) /* atomically make >= SAFE64_INVALID_THRESHOLD */;
atomic_add32(&p->low, 1) /* avoid ABA in safe64_reset_compare() */;
} else
#elif MDBX_64BIT_ATOMIC
@ -1089,8 +1086,9 @@ static __always_inline void safe64_write(MDBX_atomic_uint64_t *p,
#if MDBX_64BIT_ATOMIC
atomic_store64(p, v, mo_AcquireRelease);
#else /* MDBX_64BIT_ATOMIC */
mdbx_compiler_barrier();
/* update low-part but still value >= SAFE64_INVALID_THRESHOLD */
atomic_store32(&p->low, (uint32_t)v, mo_AcquireRelease);
atomic_store32(&p->low, (uint32_t)v, mo_Relaxed);
assert(p->weak >= SAFE64_INVALID_THRESHOLD);
mdbx_jitter4testing(true);
/* update high-part from SAFE64_INVALID_THRESHOLD to actual value */
@ -5156,8 +5154,7 @@ static txnid_t mdbx_find_oldest(const MDBX_txn *txn) {
MDBX_lockinfo *const lck = env->me_lck;
if (unlikely(lck == NULL /* exclusive mode */))
return atomic_store64(&env->me_lckless_stub.oldest, edge,
mo_AcquireRelease);
return atomic_store64(&env->me_lckless_stub.oldest, edge, mo_Relaxed);
const txnid_t last_oldest =
atomic_load64(&lck->mti_oldest_reader, mo_AcquireRelease);
@ -5173,8 +5170,7 @@ static txnid_t mdbx_find_oldest(const MDBX_txn *txn) {
return last_oldest;
txnid_t oldest = edge;
atomic_store32(&lck->mti_readers_refresh_flag, nothing_changed,
mo_AcquireRelease);
atomic_store32(&lck->mti_readers_refresh_flag, nothing_changed, mo_Relaxed);
const unsigned snap_nreaders =
atomic_load32(&lck->mti_numreaders, mo_AcquireRelease);
for (unsigned i = 0; i < snap_nreaders; ++i) {
@ -5192,7 +5188,7 @@ static txnid_t mdbx_find_oldest(const MDBX_txn *txn) {
if (oldest != last_oldest) {
mdbx_notice("update oldest %" PRIaTXN " -> %" PRIaTXN, last_oldest, oldest);
mdbx_tassert(txn, oldest >= lck->mti_oldest_reader.weak);
atomic_store64(&lck->mti_oldest_reader, oldest, mo_AcquireRelease);
atomic_store64(&lck->mti_oldest_reader, oldest, mo_Relaxed);
}
return oldest;
}
@ -5629,8 +5625,7 @@ __cold static int mdbx_wipe_steady(MDBX_env *env, const txnid_t last_steady) {
if (likely(env->me_lck))
/* force oldest refresh */
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true,
mo_AcquireRelease);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true, mo_Relaxed);
return MDBX_SUCCESS;
}
@ -6810,8 +6805,9 @@ __cold int mdbx_thread_unregister(const MDBX_env *env) {
if (unlikely(r->mr_txnid.weak < SAFE64_INVALID_THRESHOLD))
return MDBX_BUSY /* transaction is still active */;
atomic_store32(&r->mr_pid, 0, mo_AcquireRelease);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true, mo_Relaxed);
atomic_store32(&r->mr_pid, 0, mo_Relaxed);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true,
mo_AcquireRelease);
thread_rthc_set(env->me_txkey, nullptr);
return MDBX_SUCCESS;
}
@ -6942,7 +6938,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, const unsigned flags) {
atomic_store64(&r->mr_snapshot_pages_retired,
unaligned_peek_u64(4, meta->mm_pages_retired),
mo_Relaxed);
atomic_store64(&r->mr_txnid, txn->mt_txnid, mo_AcquireRelease);
atomic_store64(&r->mr_txnid, txn->mt_txnid, mo_Relaxed);
mdbx_jitter4testing(false);
mdbx_assert(env, r->mr_pid.weak == mdbx_getpid());
mdbx_assert(
@ -6950,7 +6946,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, const unsigned flags) {
((env->me_flags & MDBX_NOTLS) ? 0 : mdbx_thread_self()));
mdbx_assert(env, r->mr_txnid.weak == txn->mt_txnid);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true,
mo_AcquireRelease);
mo_Relaxed);
}
}
@ -7600,7 +7596,7 @@ static void dbi_update(MDBX_txn *txn, int keep) {
char *ptr = env->me_dbxs[i].md_name.iov_base;
if (ptr) {
env->me_dbxs[i].md_name.iov_len = 0;
mdbx_memory_fence(false, true);
mdbx_memory_fence(mo_AcquireRelease, true);
mdbx_assert(env, env->me_dbflags[i] == 0);
env->me_dbiseqs[i]++;
env->me_dbxs[i].md_name.iov_base = NULL;
@ -7733,14 +7729,14 @@ static int mdbx_txn_end(MDBX_txn *txn, const unsigned mode) {
atomic_store32(&slot->mr_snapshot_pages_used, 0, mo_Relaxed);
safe64_reset(&slot->mr_txnid, false);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, true,
mo_AcquireRelease);
mo_Relaxed);
} else {
mdbx_assert(env, slot->mr_pid.weak == env->me_pid);
mdbx_assert(env, slot->mr_txnid.weak >= SAFE64_INVALID_THRESHOLD);
}
if (mode & MDBX_END_SLOT) {
if ((env->me_flags & MDBX_ENV_TXKEY) == 0)
atomic_store32(&slot->mr_pid, 0, mo_AcquireRelease);
atomic_store32(&slot->mr_pid, 0, mo_Relaxed);
txn->to.reader = NULL;
}
}
@ -10018,13 +10014,12 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
: largest_bytes + madvise_threshold));
const pgno_t discard_edge_pgno = bytes2pgno(env, discard_edge_bytes);
const pgno_t prev_discarded_pgno =
atomic_load32(env->me_discarded_tail, mo_AcquireRelease);
atomic_load32(env->me_discarded_tail, mo_Relaxed);
if (prev_discarded_pgno >=
discard_edge_pgno + bytes2pgno(env, madvise_threshold)) {
mdbx_notice("open-MADV_%s %u..%u", "DONTNEED", prev_discarded_pgno,
largest_pgno);
atomic_store32(env->me_discarded_tail, discard_edge_pgno,
mo_AcquireRelease);
atomic_store32(env->me_discarded_tail, discard_edge_pgno, mo_Relaxed);
const size_t prev_discarded_bytes =
ceil_powerof2(pgno2bytes(env, prev_discarded_pgno), env->me_os_psize);
mdbx_ensure(env, prev_discarded_bytes > discard_edge_bytes);
@ -10082,7 +10077,7 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
/* LY: step#1 - sync previously written/updated data-pages */
rc = MDBX_RESULT_FALSE /* carry steady */;
if (atomic_load32(env->me_unsynced_pages, mo_AcquireRelease)) {
if (atomic_load32(env->me_unsynced_pages, mo_Relaxed)) {
mdbx_assert(env, ((flags ^ env->me_flags) & MDBX_WRITEMAP) == 0);
enum mdbx_syncmode_bits mode_bits = MDBX_SYNC_NONE;
if ((flags & MDBX_SAFE_NOSYNC) == 0) {
@ -10108,7 +10103,7 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
if (rc == MDBX_RESULT_FALSE /* carry steady */) {
atomic_store64(env->me_sync_timestamp, mdbx_osal_monotime(), mo_Relaxed);
unaligned_poke_u64(4, pending->mm_datasync_sign, mdbx_meta_sign(pending));
atomic_store32(env->me_unsynced_pages, 0, mo_AcquireRelease);
atomic_store32(env->me_unsynced_pages, 0, mo_Relaxed);
} else {
assert(rc == MDBX_RESULT_TRUE /* carry non-steady */);
unaligned_poke_u64(4, pending->mm_datasync_sign, MDBX_DATASIGN_WEAK);
@ -10267,8 +10262,7 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
if (likely(env->me_lck))
/* toggle oldest refresh */
atomic_store32(&env->me_lck->mti_readers_refresh_flag, false,
mo_AcquireRelease);
atomic_store32(&env->me_lck->mti_readers_refresh_flag, false, mo_Relaxed);
return MDBX_SUCCESS;
@ -11152,7 +11146,7 @@ static __cold int mdbx_setup_dxb(MDBX_env *env, const int lck_rc) {
}
atomic_store32(env->me_discarded_tail, bytes2pgno(env, used_aligned2os_bytes),
mo_AcquireRelease);
mo_Relaxed);
if (used_aligned2os_bytes < env->me_dxb_mmap.current) {
#if defined(MADV_REMOVE)
if (lck_rc && (env->me_flags & MDBX_WRITEMAP) != 0 &&
@ -18783,7 +18777,7 @@ static int mdbx_dbi_close_locked(MDBX_env *env, MDBX_dbi dbi) {
env->me_dbflags[dbi] = 0;
env->me_dbiseqs[dbi]++;
env->me_dbxs[dbi].md_name.iov_len = 0;
mdbx_memory_fence(false, true);
mdbx_memory_fence(mo_AcquireRelease, true);
env->me_dbxs[dbi].md_name.iov_base = NULL;
mdbx_free(ptr);
@ -19220,7 +19214,7 @@ mdbx_cleanup_dead_readers(MDBX_env *env, int rdt_locked, int *dead) {
if (lck->mti_readers[j].mr_pid.weak == pid) {
mdbx_debug("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN,
(size_t)pid, lck->mti_readers[j].mr_txnid.weak);
atomic_store32(&lck->mti_readers[j].mr_pid, 0, mo_AcquireRelease);
atomic_store32(&lck->mti_readers[j].mr_pid, 0, mo_Relaxed);
atomic_store32(&lck->mti_readers_refresh_flag, true, mo_AcquireRelease);
count++;
}
@ -19229,7 +19223,7 @@ mdbx_cleanup_dead_readers(MDBX_env *env, int rdt_locked, int *dead) {
if (likely(!MDBX_IS_ERROR(rc)))
atomic_store64(&lck->mti_reader_check_timestamp, mdbx_osal_monotime(),
mo_AcquireRelease);
mo_Relaxed);
if (rdt_locked < 0)
mdbx_rdt_unlock(env);
@ -19316,7 +19310,7 @@ static txnid_t __cold mdbx_kick_longlived_readers(MDBX_env *env,
mdbx_notice("hsr-kick: update oldest %" PRIaTXN " -> %" PRIaTXN,
env->me_oldest->weak, oldest);
mdbx_assert(env, env->me_oldest->weak <= oldest);
return atomic_store64(env->me_oldest, oldest, mo_AcquireRelease);
return atomic_store64(env->me_oldest, oldest, mo_Relaxed);
}
if (!env->me_hsr_callback)
@ -19350,7 +19344,7 @@ static txnid_t __cold mdbx_kick_longlived_readers(MDBX_env *env,
atomic_store64(&asleep->mr_tid, 0, mo_Relaxed);
atomic_store32(&asleep->mr_pid, 0, mo_Relaxed);
}
atomic_store32(&lck->mti_readers_refresh_flag, true, mo_AcquireRelease);
atomic_store32(&lck->mti_readers_refresh_flag, true, mo_Relaxed);
}
}