mdbx: rework/simplify mdbx_env_sync_internal().

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2021-12-04 06:56:44 +03:00
parent 8ef8733ddc
commit da855b13a3

View File

@ -6977,119 +6977,123 @@ fail:
__cold static int mdbx_env_sync_internal(MDBX_env *env, bool force, __cold static int mdbx_env_sync_internal(MDBX_env *env, bool force,
bool nonblock) { bool nonblock) {
unsigned flags = env->me_flags & ~MDBX_NOMETASYNC; bool locked = false;
if (unlikely(flags & (MDBX_RDONLY | MDBX_FATAL_ERROR)))
return MDBX_EACCESS;
int rc = MDBX_RESULT_TRUE /* means "nothing to sync" */; int rc = MDBX_RESULT_TRUE /* means "nothing to sync" */;
bool need_unlock = false;
if (nonblock &&
atomic_load32(&env->me_lck->mti_unsynced_pages, mo_AcquireRelease) == 0)
goto fastpath;
const bool outside_txn = (env->me_txn0->mt_owner != mdbx_thread_self()); retry:;
if (outside_txn) { unsigned flags = env->me_flags & ~MDBX_NOMETASYNC;
int err = mdbx_txn_lock(env, nonblock); if (unlikely(flags & (MDBX_RDONLY | MDBX_FATAL_ERROR))) {
if (unlikely(err != MDBX_SUCCESS)) rc = MDBX_EACCESS;
return err; goto bailout;
need_unlock = true;
} }
const MDBX_meta *head = mdbx_meta_head(env); const pgno_t unsynced_pages =
pgno_t unsynced_pages =
atomic_load32(&env->me_lck->mti_unsynced_pages, mo_Relaxed); atomic_load32(&env->me_lck->mti_unsynced_pages, mo_Relaxed);
if (!META_IS_STEADY(head) || unsynced_pages) { const MDBX_meta *head = mdbx_meta_head(env);
const pgno_t autosync_threshold = const txnid_t head_txnid = mdbx_meta_txnid_fluid(env, head);
atomic_load32(&env->me_lck->mti_autosync_threshold, mo_Relaxed); const uint32_t synched_meta_txnid_u32 =
const uint64_t autosync_period = atomic_load32(&env->me_lck->mti_meta_sync_txnid, mo_Relaxed);
atomic_load64(&env->me_lck->mti_autosync_period, mo_Relaxed); if (unsynced_pages == 0 && synched_meta_txnid_u32 == (uint32_t)head_txnid &&
if (force || (autosync_threshold && unsynced_pages >= autosync_threshold) || META_IS_STEADY(head))
(autosync_period && goto bailout;
mdbx_osal_monotime() -
atomic_load64(&env->me_lck->mti_sync_timestamp, mo_Relaxed) >=
autosync_period))
flags &= MDBX_WRITEMAP /* clear flags for full steady sync */;
if (outside_txn) { const pgno_t autosync_threshold =
atomic_load32(&env->me_lck->mti_autosync_threshold, mo_Relaxed);
const uint64_t autosync_period =
atomic_load64(&env->me_lck->mti_autosync_period, mo_Relaxed);
if (force || (autosync_threshold && unsynced_pages >= autosync_threshold) ||
(autosync_period &&
mdbx_osal_monotime() -
atomic_load64(&env->me_lck->mti_sync_timestamp, mo_Relaxed) >=
autosync_period))
flags &= MDBX_WRITEMAP /* clear flags for full steady sync */;
const bool inside_txn = (env->me_txn0->mt_owner == mdbx_thread_self());
if (!inside_txn) {
if (!locked) {
int err;
unsigned wops = 0;
/* pre-sync to avoid latency for writer */
if (unsynced_pages > /* FIXME: define threshold */ 16 && if (unsynced_pages > /* FIXME: define threshold */ 16 &&
(flags & MDBX_SAFE_NOSYNC) == 0) { (flags & MDBX_SAFE_NOSYNC) == 0) {
mdbx_assert(env, ((flags ^ env->me_flags) & MDBX_WRITEMAP) == 0); mdbx_assert(env, ((flags ^ env->me_flags) & MDBX_WRITEMAP) == 0);
const size_t usedbytes = pgno_align2os_bytes(env, head->mm_geo.next); if (flags & MDBX_WRITEMAP) {
/* Acquire guard to avoid collision with remap */
#if defined(_WIN32) || defined(_WIN64)
mdbx_srwlock_AcquireShared(&env->me_remap_guard);
#else
err = mdbx_fastmutex_acquire(&env->me_remap_guard);
if (unlikely(err != MDBX_SUCCESS))
return err;
#endif
const size_t usedbytes = pgno_align2os_bytes(env, head->mm_geo.next);
err = mdbx_msync(&env->me_dxb_mmap, 0, usedbytes, MDBX_SYNC_DATA);
#if defined(_WIN32) || defined(_WIN64)
mdbx_srwlock_ReleaseShared(&env->me_remap_guard);
#else
int unlock_err = mdbx_fastmutex_release(&env->me_remap_guard);
if (unlikely(unlock_err != MDBX_SUCCESS) && err == MDBX_SUCCESS)
err = unlock_err;
#endif
} else
err = mdbx_fsync(env->me_lazy_fd, MDBX_SYNC_DATA);
#if MDBX_ENABLE_PGOP_STAT
env->me_lck->mti_pgop_stat.wops.weak += 1;
#endif /* MDBX_ENABLE_PGOP_STAT */
mdbx_txn_unlock(env);
/* LY: pre-sync without holding lock to reduce latency for writer(s) */
int err =
(flags & MDBX_WRITEMAP)
? mdbx_msync(&env->me_dxb_mmap, 0, usedbytes, MDBX_SYNC_DATA)
: mdbx_fsync(env->me_lazy_fd, MDBX_SYNC_DATA);
if (unlikely(err != MDBX_SUCCESS)) if (unlikely(err != MDBX_SUCCESS))
return err; return err;
err = mdbx_txn_lock(env, nonblock); /* pre-sync done */
if (unlikely(err != MDBX_SUCCESS)) wops = 1;
return err; rc = MDBX_SUCCESS /* means "some data was synced" */;
/* LY: head and unsynced_pages may be changed. */
head = mdbx_meta_head(env);
unsynced_pages =
atomic_load32(&env->me_lck->mti_unsynced_pages, mo_Relaxed);
} }
env->me_txn0->mt_txnid = meta_txnid(env, head, false);
mdbx_find_oldest(env->me_txn0);
rc = MDBX_RESULT_FALSE /* means "some data was synced" */;
}
if (!META_IS_STEADY(head) || err = mdbx_txn_lock(env, nonblock);
((flags & MDBX_SAFE_NOSYNC) == 0 && unsynced_pages)) { if (unlikely(err != MDBX_SUCCESS))
mdbx_debug("meta-head %" PRIaPGNO ", %s, sync_pending %" PRIaPGNO,
data_page(head)->mp_pgno, mdbx_durable_str(head),
unsynced_pages);
MDBX_meta meta = *head;
int err = mdbx_sync_locked(env, flags | MDBX_SHRINK_ALLOWED, &meta);
if (unlikely(err != MDBX_SUCCESS)) {
if (need_unlock)
mdbx_txn_unlock(env);
return err; return err;
}
rc = MDBX_RESULT_FALSE /* means "some data was synced" */; locked = true;
#if MDBX_ENABLE_PGOP_STAT
env->me_lck->mti_pgop_stat.wops.weak += wops;
#endif /* MDBX_ENABLE_PGOP_STAT */
goto retry;
} }
env->me_txn0->mt_txnid = head_txnid;
mdbx_assert(env, head_txnid == meta_txnid(env, head, false));
mdbx_assert(env, head_txnid == mdbx_recent_committed_txnid(env));
mdbx_find_oldest(env->me_txn0);
}
mdbx_assert(env, inside_txn || locked);
if (!META_IS_STEADY(head) ||
((flags & MDBX_SAFE_NOSYNC) == 0 && unsynced_pages)) {
mdbx_debug("meta-head %" PRIaPGNO ", %s, sync_pending %" PRIaPGNO,
data_page(head)->mp_pgno, mdbx_durable_str(head),
unsynced_pages);
MDBX_meta meta = *head;
rc = mdbx_sync_locked(env, flags | MDBX_SHRINK_ALLOWED, &meta);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
} }
fastpath:
/* LY: sync meta-pages if MDBX_NOMETASYNC enabled /* LY: sync meta-pages if MDBX_NOMETASYNC enabled
* and someone was not synced above. */ * and someone was not synced above. */
if (rc == MDBX_RESULT_TRUE && (env->me_flags & MDBX_NOMETASYNC) != 0) { if (atomic_load32(&env->me_lck->mti_meta_sync_txnid, mo_Relaxed) !=
const txnid_t head_txnid = mdbx_recent_committed_txnid(env); (uint32_t)head_txnid) {
if (atomic_load32(&env->me_lck->mti_meta_sync_txnid, mo_Relaxed) !=
(uint32_t)head_txnid) {
#if MDBX_ENABLE_PGOP_STAT #if MDBX_ENABLE_PGOP_STAT
if (need_unlock) env->me_lck->mti_pgop_stat.wops.weak += 1;
env->me_lck->mti_pgop_stat.wops.weak += 1;
#if MDBX_64BIT_ATOMIC
else {
MDBX_atomic_uint64_t *wops = &env->me_lck->mti_pgop_stat.wops;
while (unlikely(!atomic_cas64(wops, wops->weak, wops->weak + 1)))
atomic_yield();
}
#else
/* loose the env->me_lck->mti_pgop_stat.wops.weak increment */
#endif /* MDBX_64BIT_ATOMIC */
#endif /* MDBX_ENABLE_PGOP_STAT */ #endif /* MDBX_ENABLE_PGOP_STAT */
rc = (flags & MDBX_WRITEMAP) rc = (flags & MDBX_WRITEMAP)
? mdbx_msync(&env->me_dxb_mmap, 0, ? mdbx_msync(&env->me_dxb_mmap, 0,
pgno_align2os_bytes(env, NUM_METAS), pgno_align2os_bytes(env, NUM_METAS),
MDBX_SYNC_DATA | MDBX_SYNC_IODQ) MDBX_SYNC_DATA | MDBX_SYNC_IODQ)
: mdbx_fsync(env->me_lazy_fd, MDBX_SYNC_DATA | MDBX_SYNC_IODQ); : mdbx_fsync(env->me_lazy_fd, MDBX_SYNC_DATA | MDBX_SYNC_IODQ);
if (likely(rc == MDBX_SUCCESS)) if (likely(rc == MDBX_SUCCESS))
atomic_store32(&env->me_lck->mti_meta_sync_txnid, (uint32_t)head_txnid, atomic_store32(&env->me_lck->mti_meta_sync_txnid, (uint32_t)head_txnid,
mo_Relaxed); mo_Relaxed);
}
} }
if (need_unlock)
bailout:
if (locked)
mdbx_txn_unlock(env); mdbx_txn_unlock(env);
return rc; return rc;
} }
@ -10911,11 +10915,9 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
goto undo; goto undo;
} }
} }
if (flags & MDBX_NOMETASYNC) env->me_lck->mti_meta_sync_txnid.weak =
env->me_lck->mti_unsynced_pages.weak += 1; (uint32_t)unaligned_peek_u64(4, pending->mm_txnid_a) -
else ((flags & MDBX_NOMETASYNC) ? UINT32_MAX / 3 : 0);
env->me_lck->mti_meta_sync_txnid.weak =
(uint32_t)unaligned_peek_u64(4, pending->mm_txnid_a);
/* LY: shrink datafile if needed */ /* LY: shrink datafile if needed */
if (unlikely(shrink)) { if (unlikely(shrink)) {
@ -19471,7 +19473,7 @@ __cold static int fetch_envinfo_ex(const MDBX_env *env, const MDBX_txn *txn,
const pgno_t unsynced_pages = const pgno_t unsynced_pages =
atomic_load32(&env->me_lck->mti_unsynced_pages, mo_Relaxed) + atomic_load32(&env->me_lck->mti_unsynced_pages, mo_Relaxed) +
(atomic_load32(&env->me_lck->mti_meta_sync_txnid, mo_Relaxed) != (atomic_load32(&env->me_lck->mti_meta_sync_txnid, mo_Relaxed) !=
(uint32_t)arg->mi_last_pgno); (uint32_t)arg->mi_recent_txnid);
arg->mi_mapsize = env->me_dxb_mmap.limit; arg->mi_mapsize = env->me_dxb_mmap.limit;