mdbx: rework env_sync() for MDBX_NOMETASYNC, add mdbx_env_sync_poll().

Change-Id: I4d212c663853b00e221d17cb8483353231497b48
This commit is contained in:
Leonid Yuriev 2019-09-29 23:51:43 +03:00
parent 91ee841fc2
commit 83da954725
3 changed files with 90 additions and 29 deletions

21
mdbx.h
View File

@ -1614,8 +1614,11 @@ LIBMDBX_API int mdbx_env_info(MDBX_env *env, MDBX_envinfo *info, size_t bytes);
* provide polling mode for lazy/asynchronous sync in conjunction with
* mdbx_env_set_syncbytes() and/or mdbx_env_set_syncperiod().
*
* Legacy mdbx_env_sync() correspond to calling mdbx_env_sync_ex() with the
* argument nonblock=false.
* The mdbx_env_sync() is shortcut to calling mdbx_env_sync_ex() with
* try force=true and nonblock=false arguments.
*
* The mdbx_env_sync_poll() is shortcut to calling mdbx_env_sync_ex() with
* the force=false and nonblock=true arguments.
*
* NOTE: This call is not valid if the environment was opened with MDBX_RDONLY.
*
@ -1628,13 +1631,15 @@ LIBMDBX_API int mdbx_env_info(MDBX_env *env, MDBX_envinfo *info, size_t bytes);
* [in] nonblock Don't wait if write transaction is running by other thread.
*
* Returns A non-zero error value on failure and MDBX_RESULT_TRUE or 0 on
* success. The MDBX_RESULT_TRUE means some data was flushed to disk,
* success. The MDBX_RESULT_TRUE means no data pending for flush to disk,
* and 0 otherwise. Some possible errors are:
* - MDBX_EACCES = the environment is read-only.
* - MDBX_BUSY = the environment is used by other thread and nonblock=true.
* - MDBX_EINVAL = an invalid parameter was specified.
* - MDBX_EIO = an error occurred during synchronization. */
LIBMDBX_API int mdbx_env_sync_ex(MDBX_env *env, int force, int nonblock);
LIBMDBX_API int mdbx_env_sync(MDBX_env *env, int force);
LIBMDBX_API int mdbx_env_sync(MDBX_env *env);
LIBMDBX_API int mdbx_env_sync_poll(MDBX_env *env);
/* Sets threshold to force flush the data buffers to disk,
* even of MDBX_NOSYNC, MDBX_NOMETASYNC and MDBX_MAPASYNC flags
@ -1649,12 +1654,12 @@ LIBMDBX_API int mdbx_env_sync(MDBX_env *env, int force);
* The default is 0, than mean no any threshold checked, and no additional
* flush will be made.
*
* [in] env An environment handle returned by mdbx_env_create()
* [in] bytes The size in bytes of summary changes when a synchronous
* flush would be made.
* [in] env An environment handle returned by mdbx_env_create().
* [in] threshold The size in bytes of summary changes when a synchronous
* flush would be made.
*
* Returns A non-zero error value on failure and 0 on success. */
LIBMDBX_API int mdbx_env_set_syncbytes(MDBX_env *env, size_t bytes);
LIBMDBX_API int mdbx_env_set_syncbytes(MDBX_env *env, size_t threshold);
/* Sets relative period since the last unsteay commit to force flush the data
* buffers to disk, even of MDBX_NOSYNC, MDBX_NOMETASYNC and MDBX_MAPASYNC flags

View File

@ -2561,13 +2561,35 @@ static __hot MDBX_meta *mdbx_meta_head(const MDBX_env *env) {
return mdbx_meta_mostrecent(prefer_last, env);
}
static __hot txnid_t mdbx_recent_committed_txnid(const MDBX_env *env) {
while (true) {
const MDBX_meta *head = mdbx_meta_head(env);
const txnid_t recent = mdbx_meta_txnid_fluid(env, head);
mdbx_compiler_barrier();
if (likely(head == mdbx_meta_head(env) &&
recent == mdbx_meta_txnid_fluid(env, head)))
return recent;
}
}
static __hot txnid_t mdbx_recent_steady_txnid(const MDBX_env *env) {
while (true) {
const MDBX_meta *head = mdbx_meta_steady(env);
const txnid_t recent = mdbx_meta_txnid_fluid(env, head);
mdbx_compiler_barrier();
if (likely(head == mdbx_meta_steady(env) &&
recent == mdbx_meta_txnid_fluid(env, head)))
return recent;
}
}
static __hot txnid_t mdbx_reclaiming_detent(const MDBX_env *env) {
if (F_ISSET(env->me_flags, MDBX_UTTERLY_NOSYNC))
return likely(env->me_txn0->mt_owner == mdbx_thread_self())
? env->me_txn0->mt_txnid - 1
: mdbx_meta_txnid_fluid(env, mdbx_meta_head(env));
: mdbx_recent_committed_txnid(env);
return mdbx_meta_txnid_stable(env, mdbx_meta_steady(env));
return mdbx_recent_steady_txnid(env);
}
static const char *mdbx_durable_str(const MDBX_meta *const meta) {
@ -3435,16 +3457,22 @@ __cold int mdbx_env_sync_ex(MDBX_env *env, int force, int nonblock) {
if (unlikely(flags & (MDBX_RDONLY | MDBX_FATAL_ERROR)))
return MDBX_EACCESS;
const bool outside_txn =
(!env->me_txn0 || env->me_txn0->mt_owner != mdbx_thread_self());
if (unlikely(!env->me_map))
return MDBX_EPERM;
int rc = MDBX_RESULT_TRUE /* means "nothing to sync" */;
bool need_unlock = false;
if (nonblock && *env->me_unsynced_pages == 0)
goto fastpath;
const bool outside_txn = (env->me_txn0->mt_owner != mdbx_thread_self());
if (outside_txn) {
int err = mdbx_txn_lock(env, nonblock);
if (unlikely(err != MDBX_SUCCESS))
return err;
need_unlock = true;
}
int rc = MDBX_RESULT_FALSE /* means "nothing to sync" */;
const MDBX_meta *head = mdbx_meta_head(env);
pgno_t unsynced_pages = *env->me_unsynced_pages;
if (!META_IS_STEADY(head) || unsynced_pages) {
@ -3479,7 +3507,7 @@ __cold int mdbx_env_sync_ex(MDBX_env *env, int force, int nonblock) {
}
env->me_txn0->mt_txnid = meta_txnid(env, head, false);
mdbx_find_oldest(env->me_txn0);
rc = MDBX_RESULT_TRUE /* means "some data was synced" */;
rc = MDBX_RESULT_FALSE /* means "some data was synced" */;
}
if (!META_IS_STEADY(head) ||
@ -3490,28 +3518,39 @@ __cold int mdbx_env_sync_ex(MDBX_env *env, int force, int nonblock) {
MDBX_meta meta = *head;
int err = mdbx_sync_locked(env, flags | MDBX_SHRINK_ALLOWED, &meta);
if (unlikely(err != MDBX_SUCCESS)) {
if (outside_txn)
if (need_unlock)
mdbx_txn_unlock(env);
return err;
}
rc = MDBX_RESULT_TRUE /* means "some data was synced" */;
rc = MDBX_RESULT_FALSE /* means "some data was synced" */;
}
}
fastpath:
/* LY: sync meta-pages if MDBX_NOMETASYNC enabled
* and someone was not synced above. */
if (rc == MDBX_RESULT_FALSE && (env->me_flags & MDBX_NOMETASYNC) != 0)
rc = (flags & MDBX_WRITEMAP)
? mdbx_msync(&env->me_dxb_mmap, 0, pgno2bytes(env, NUM_METAS),
false)
: mdbx_filesync(env->me_fd, MDBX_SYNC_DATA | MDBX_SYNC_IODQ);
if (outside_txn)
if (rc == MDBX_RESULT_TRUE && (env->me_flags & MDBX_NOMETASYNC) != 0) {
const txnid_t head_txnid = mdbx_recent_committed_txnid(env);
if (*env->me_meta_sync_txnid != (uint32_t)head_txnid) {
rc = (flags & MDBX_WRITEMAP)
? mdbx_msync(&env->me_dxb_mmap, 0, pgno2bytes(env, NUM_METAS),
false)
: mdbx_filesync(env->me_fd, MDBX_SYNC_DATA | MDBX_SYNC_IODQ);
if (likely(rc == MDBX_SUCCESS))
*env->me_meta_sync_txnid = (uint32_t)head_txnid;
}
}
if (need_unlock)
mdbx_txn_unlock(env);
return rc;
}
__cold int mdbx_env_sync(MDBX_env *env, int force) {
return mdbx_env_sync_ex(env, force, false);
__cold int mdbx_env_sync(MDBX_env *env) {
return mdbx_env_sync_ex(env, true, false);
}
__cold int mdbx_env_sync_poll(MDBX_env *env) {
return mdbx_env_sync_ex(env, false, true);
}
/* Back up parent txn's cursors, then grab the originals for tracking */
@ -6184,6 +6223,7 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
if (rc != MDBX_SUCCESS)
goto undo;
}
*env->me_meta_sync_txnid = (uint32_t)pending->mm_txnid_a.inconsistent;
}
/* LY: shrink datafile if needed */
@ -7127,6 +7167,7 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
env->me_unsynced_pages = &env->me_lckless_stub.autosync_pending;
env->me_autosync_threshold = &env->me_lckless_stub.autosync_threshold;
env->me_discarded_tail = &env->me_lckless_stub.discarded_tail;
env->me_meta_sync_txnid = &env->me_lckless_stub.meta_sync_txnid;
env->me_maxreaders = UINT_MAX;
#ifdef MDBX_OSAL_LOCK
env->me_wmutex = &env->me_lckless_stub.wmutex;
@ -7278,6 +7319,7 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
env->me_unsynced_pages = &env->me_lck->mti_unsynced_pages;
env->me_autosync_threshold = &env->me_lck->mti_autosync_threshold;
env->me_discarded_tail = &env->me_lck->mti_discarded_tail;
env->me_meta_sync_txnid = &env->me_lck->mti_meta_sync_txnid;
#ifdef MDBX_OSAL_LOCK
env->me_wmutex = &env->me_lck->mti_wmutex;
#endif
@ -13938,7 +13980,7 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
return mdbx_find_oldest(env->me_txn);
}
int __cold mdbx_env_set_syncbytes(MDBX_env *env, size_t bytes) {
int __cold mdbx_env_set_syncbytes(MDBX_env *env, size_t threshold) {
if (unlikely(!env))
return MDBX_EINVAL;
@ -13951,8 +13993,13 @@ int __cold mdbx_env_set_syncbytes(MDBX_env *env, size_t bytes) {
if (unlikely(!env->me_map))
return MDBX_EPERM;
*env->me_autosync_threshold = bytes2pgno(env, bytes + env->me_psize - 1);
return bytes ? mdbx_env_sync(env, false) : MDBX_SUCCESS;
*env->me_autosync_threshold = bytes2pgno(env, threshold + env->me_psize - 1);
if (threshold) {
int err = mdbx_env_sync_poll(env);
if (unlikely(MDBX_IS_ERROR(err)))
return err;
}
return MDBX_SUCCESS;
}
int __cold mdbx_env_set_syncperiod(MDBX_env *env, unsigned seconds_16dot16) {
@ -13969,7 +14016,12 @@ int __cold mdbx_env_set_syncperiod(MDBX_env *env, unsigned seconds_16dot16) {
return MDBX_EPERM;
*env->me_autosync_period = mdbx_osal_16dot16_to_monotime(seconds_16dot16);
return seconds_16dot16 ? mdbx_env_sync(env, false) : MDBX_SUCCESS;
if (seconds_16dot16) {
int err = mdbx_env_sync_poll(env);
if (unlikely(MDBX_IS_ERROR(err)))
return err;
}
return MDBX_SUCCESS;
}
int __cold mdbx_env_set_oomfunc(MDBX_env *env, MDBX_oom_func *oomfunc) {

View File

@ -521,7 +521,9 @@ typedef struct MDBX_lockinfo {
* zero means no-threshold, i.e. auto-sync is disabled. */
volatile pgno_t mti_autosync_threshold;
uint32_t reserved_pad;
/* Low 32-bit of txnid with which meta-pages was synced,
* i.e. for sync-polling in the MDBX_NOMETASYNC mode. */
volatile uint32_t mti_meta_sync_txnid;
/* Period for timed auto-sync feature, i.e. at the every steady checkpoint
* the mti_unsynced_timeout sets to the current_time + mti_autosync_period.
@ -938,6 +940,7 @@ struct MDBX_env {
volatile pgno_t *me_unsynced_pages;
volatile pgno_t *me_autosync_threshold;
volatile pgno_t *me_discarded_tail;
volatile uint32_t *me_meta_sync_txnid;
MDBX_oom_func *me_oom_func; /* Callback for kicking laggard readers */
struct {
#ifdef MDBX_OSAL_LOCK
@ -949,6 +952,7 @@ struct MDBX_env {
pgno_t autosync_pending;
pgno_t autosync_threshold;
pgno_t discarded_tail;
uint32_t meta_sync_txnid;
} me_lckless_stub;
#if MDBX_DEBUG
MDBX_assert_func *me_assert_func; /* Callback for assertion failures */