mdbx: rework auto-sync by volume feature.

Change-Id: I0a34a65a974f28c6f0a950c11d55a43cfcfcab22
This commit is contained in:
Leonid Yuriev 2019-08-23 11:40:30 +03:00
parent 6f8238e1e9
commit 68e0076ca3
3 changed files with 79 additions and 54 deletions

7
mdbx.h
View File

@ -1581,14 +1581,15 @@ LIBMDBX_API char *mdbx_dkey(const MDBX_val *key, char *const buf,
LIBMDBX_API int mdbx_env_close_ex(MDBX_env *env, int dont_sync);
/* Set threshold to force flush the data buffers to disk,
/* Sets threshold to force flush the data buffers to disk,
* even of MDBX_NOSYNC, MDBX_NOMETASYNC and MDBX_MAPASYNC flags
* in the environment.
* in the environment. The value affects all processes which operates with given
* DB until the last process close DB or a new value will be settled.
*
* Data is always written to disk when mdbx_txn_commit() is called,
* but the operating system may keep it buffered. MDBX always flushes
* the OS buffers upon commit as well, unless the environment was
* opened with MDBX_NOSYNC or in part MDBX_NOMETASYNC.
* opened with MDBX_NOSYNC, MDBX_MAPASYNC or in part MDBX_NOMETASYNC.
*
* The default is 0, than mean no any threshold checked, and no additional
* flush will be made.

View File

@ -810,11 +810,17 @@ struct MDBX_env {
unsigned me_maxkey_limit; /* max size of a key */
mdbx_pid_t me_live_reader; /* have liveness lock in reader table */
void *me_userctx; /* User-settable context */
size_t me_sync_pending; /* Total dirty/non-sync'ed bytes
* since the last mdbx_env_sync() */
size_t me_sync_threshold; /* Treshold of above to force synchronous flush */
volatile pgno_t *me_unsynced_pages;
volatile pgno_t *me_autosync_threshold;
MDBX_oom_func *me_oom_func; /* Callback for kicking laggard readers */
txnid_t me_oldest_stub;
struct {
#ifdef MDBX_OSAL_LOCK
MDBX_OSAL_LOCK wmutex;
#endif
txnid_t oldest;
pgno_t autosync_pending;
pgno_t autosync_threshold;
} me_lckless_stub;
#if MDBX_DEBUG
MDBX_assert_func *me_assert_func; /* Callback for assertion failures */
#endif
@ -835,7 +841,6 @@ struct MDBX_env {
/* Workaround for LockFileEx and WriteFile multithread bug */
CRITICAL_SECTION me_windowsbug_lock;
#else
mdbx_fastmutex_t me_lckless_wmutex;
mdbx_fastmutex_t me_remap_guard;
#endif
};

View File

@ -2175,7 +2175,7 @@ static txnid_t mdbx_find_oldest(MDBX_txn *txn) {
MDBX_lockinfo *const lck = env->me_lck;
if (unlikely(lck == NULL /* exclusive mode */))
return env->me_oldest_stub = edge;
return env->me_lckless_stub.oldest = edge;
const txnid_t last_oldest = lck->mti_oldest_reader;
mdbx_tassert(txn, edge >= last_oldest);
@ -3022,15 +3022,14 @@ __cold static int mdbx_env_sync_ex(MDBX_env *env, int force, int nonblock) {
}
const MDBX_meta *head = mdbx_meta_head(env);
if (!META_IS_STEADY(head) || env->me_sync_pending) {
if (force || (env->me_sync_threshold &&
env->me_sync_pending >= env->me_sync_threshold))
pgno_t unsynced_pages = *env->me_unsynced_pages;
if (!META_IS_STEADY(head) || unsynced_pages) {
const pgno_t autosync_threshold = *env->me_autosync_threshold;
if (force || (autosync_threshold && unsynced_pages >= autosync_threshold))
flags &= MDBX_WRITEMAP /* clear flags for full steady sync */;
if (outside_txn) {
if (env->me_sync_pending >
pgno2bytes(env, 16 /* FIXME: define threshold */) &&
if (unsynced_pages > /* FIXME: define threshold */ 16 &&
(flags & (MDBX_NOSYNC | MDBX_MAPASYNC)) == 0) {
mdbx_assert(env, ((flags ^ env->me_flags) & MDBX_WRITEMAP) == 0);
const size_t usedbytes = pgno_align2os_bytes(env, head->mm_geo.next);
@ -3048,19 +3047,19 @@ __cold static int mdbx_env_sync_ex(MDBX_env *env, int force, int nonblock) {
if (unlikely(rc != MDBX_SUCCESS))
return rc;
/* LY: head may be changed. */
/* LY: head and unsynced_pages may be changed. */
head = mdbx_meta_head(env);
unsynced_pages = *env->me_unsynced_pages;
}
env->me_txn0->mt_txnid = meta_txnid(env, head, false);
mdbx_find_oldest(env->me_txn0);
}
if (!META_IS_STEADY(head) ||
((flags & (MDBX_NOSYNC | MDBX_MAPASYNC)) == 0 &&
env->me_sync_pending)) {
mdbx_debug("meta-head %" PRIaPGNO ", %s, sync_pending %" PRIuPTR,
((flags & (MDBX_NOSYNC | MDBX_MAPASYNC)) == 0 && unsynced_pages)) {
mdbx_debug("meta-head %" PRIaPGNO ", %s, sync_pending %" PRIaPGNO,
container_of(head, MDBX_page, mp_data)->mp_pgno,
mdbx_durable_str(head), env->me_sync_pending);
mdbx_durable_str(head), unsynced_pages);
MDBX_meta meta = *head;
int rc = mdbx_sync_locked(env, flags | MDBX_SHRINK_ALLOWED, &meta);
if (unlikely(rc != MDBX_SUCCESS)) {
@ -4627,8 +4626,7 @@ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep) {
}
dp->mp_flags &= ~P_DIRTY;
dp->mp_validator = 0 /* TODO */;
env->me_sync_pending +=
IS_OVERFLOW(dp) ? pgno2bytes(env, dp->mp_pages) : env->me_psize;
*env->me_unsynced_pages += IS_OVERFLOW(dp) ? dp->mp_pages : 1;
}
goto done;
}
@ -4649,8 +4647,9 @@ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep) {
dp->mp_flags &= ~P_DIRTY;
dp->mp_validator = 0 /* TODO */;
pos = pgno2bytes(env, pgno);
size = IS_OVERFLOW(dp) ? pgno2bytes(env, dp->mp_pages) : env->me_psize;
env->me_sync_pending += size;
const unsigned npages = IS_OVERFLOW(dp) ? dp->mp_pages : 1;
*env->me_unsynced_pages += npages;
size = pgno2bytes(env, npages);
}
/* Write up to MDBX_COMMIT_PAGES dirty pages at a time. */
if (pos != next_pos || n == MDBX_COMMIT_PAGES || wsize + size > MAX_WRITE) {
@ -5385,12 +5384,15 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
mdbx_assert(env,
pending < METAPAGE(env, 0) || pending > METAPAGE(env, NUM_METAS));
mdbx_assert(env, (env->me_flags & (MDBX_RDONLY | MDBX_FATAL_ERROR)) == 0);
mdbx_assert(env, !META_IS_STEADY(head) || env->me_sync_pending != 0);
mdbx_assert(env, !META_IS_STEADY(head) || *env->me_unsynced_pages != 0);
mdbx_assert(env, pending->mm_geo.next <= pending->mm_geo.now);
const size_t usedbytes = pgno_align2os_bytes(env, pending->mm_geo.next);
if (env->me_sync_threshold && env->me_sync_pending >= env->me_sync_threshold)
if (flags & (MDBX_NOSYNC | MDBX_MAPASYNC)) {
/* Check auto-sync conditions */
const pgno_t autosync_threshold = *env->me_autosync_threshold;
if (autosync_threshold && *env->me_unsynced_pages >= autosync_threshold)
flags &= MDBX_WRITEMAP | MDBX_SHRINK_ALLOWED; /* force steady */
}
/* LY: check conditions to shrink datafile */
const pgno_t backlog_gap =
@ -5422,36 +5424,41 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
}
/* LY: step#1 - sync previously written/updated data-pages */
int rc = MDBX_RESULT_TRUE;
if (env->me_sync_pending && (flags & MDBX_NOSYNC) == 0) {
int rc = *env->me_unsynced_pages ? MDBX_RESULT_TRUE /* carry non-steady */
: MDBX_RESULT_FALSE /* carry steady */;
if (rc != MDBX_RESULT_FALSE && (flags & MDBX_NOSYNC) == 0) {
mdbx_assert(env, ((flags ^ env->me_flags) & MDBX_WRITEMAP) == 0);
MDBX_meta *const steady = mdbx_meta_steady(env);
MDBX_meta *const recent_steady_meta = mdbx_meta_steady(env);
if (flags & MDBX_WRITEMAP) {
const size_t usedbytes = pgno_align2os_bytes(env, pending->mm_geo.next);
rc = mdbx_msync(&env->me_dxb_mmap, 0, usedbytes, flags & MDBX_MAPASYNC);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
rc = MDBX_RESULT_TRUE /* carry non-steady */;
if ((flags & MDBX_MAPASYNC) == 0) {
if (unlikely(pending->mm_geo.next > steady->mm_geo.now)) {
if (unlikely(pending->mm_geo.next > recent_steady_meta->mm_geo.now)) {
rc = mdbx_filesync(env->me_fd, MDBX_SYNC_SIZE);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
}
env->me_sync_pending = 0;
rc = MDBX_RESULT_FALSE /* carry steady */;
}
} else {
rc = mdbx_filesync(env->me_fd, (pending->mm_geo.next > steady->mm_geo.now)
rc = mdbx_filesync(env->me_fd,
(pending->mm_geo.next > recent_steady_meta->mm_geo.now)
? MDBX_SYNC_DATA | MDBX_SYNC_SIZE
: MDBX_SYNC_DATA);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
env->me_sync_pending = 0;
}
}
/* Steady or Weak */
if (env->me_sync_pending == 0) {
if (rc == MDBX_RESULT_FALSE /* carry steady */) {
pending->mm_datasync_sign = mdbx_meta_sign(pending);
*env->me_unsynced_pages = 0;
} else {
assert(rc == MDBX_RESULT_TRUE /* carry non-steady */);
pending->mm_datasync_sign =
(flags & MDBX_UTTERLY_NOSYNC) == MDBX_UTTERLY_NOSYNC
? MDBX_DATASIGN_NONE
@ -5694,7 +5701,7 @@ int __cold mdbx_env_create(MDBX_env **penv) {
mdbx_fastmutex_destroy(&env->me_dbi_lock);
goto bailout;
}
rc = mdbx_fastmutex_init(&env->me_lckless_wmutex);
rc = mdbx_fastmutex_init(&env->me_lckless_stub.wmutex);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_fastmutex_destroy(&env->me_remap_guard);
mdbx_fastmutex_destroy(&env->me_dbi_lock);
@ -6019,7 +6026,7 @@ mdbx_env_set_geometry(MDBX_env *env, intptr_t size_lower, intptr_t size_now,
goto bailout;
head = /* base address could be changed */ mdbx_meta_head(env);
}
env->me_sync_pending += env->me_psize;
*env->me_unsynced_pages += 1;
mdbx_meta_set_txnid(env, &meta, mdbx_meta_txnid_stable(env, head) + 1);
rc = mdbx_sync_locked(env, env->me_flags, &meta);
}
@ -6381,7 +6388,7 @@ static int __cold mdbx_setup_dxb(MDBX_env *env, const int lck_rc) {
mdbx_ensure(env, mdbx_meta_eq(env, &meta, head));
mdbx_meta_set_txnid(env, &meta, txnid + 1);
env->me_sync_pending += env->me_psize;
*env->me_unsynced_pages += 1;
err = mdbx_sync_locked(env, env->me_flags | MDBX_SHRINK_ALLOWED, &meta);
if (err) {
mdbx_info("error %d, while updating meta.geo: "
@ -6422,10 +6429,12 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
if (MDBX_IS_ERROR(rc))
return rc;
env->me_oldest = &env->me_oldest_stub;
env->me_oldest = &env->me_lckless_stub.oldest;
env->me_unsynced_pages = &env->me_lckless_stub.autosync_pending;
env->me_autosync_threshold = &env->me_lckless_stub.autosync_threshold;
env->me_maxreaders = UINT_MAX;
#ifdef MDBX_OSAL_LOCK
env->me_wmutex = &env->me_lckless_wmutex;
env->me_wmutex = &env->me_lckless_stub.wmutex;
#endif
mdbx_debug("lck-setup:%s%s%s", " lck-less",
(env->me_flags & MDBX_RDONLY) ? " readonly" : "",
@ -6531,6 +6540,8 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname,
mdbx_assert(env, !MDBX_IS_ERROR(rc));
env->me_oldest = &env->me_lck->mti_oldest_reader;
env->me_unsynced_pages = &env->me_lck->mti_unsynced_pages;
env->me_autosync_threshold = &env->me_lck->mti_autosync_threshold;
#ifdef MDBX_OSAL_LOCK
env->me_wmutex = &env->me_lck->mti_wmutex;
#endif
@ -6777,6 +6788,8 @@ static void __cold mdbx_env_close0(MDBX_env *env) {
if (env->me_lck)
mdbx_munmap(&env->me_lck_mmap);
env->me_oldest = nullptr;
env->me_unsynced_pages = nullptr;
env->me_autosync_threshold = nullptr;
mdbx_lck_destroy(env);
if (env->me_lfd != INVALID_HANDLE_VALUE) {
@ -6831,12 +6844,15 @@ int __cold mdbx_env_close_ex(MDBX_env *env, int dont_sync) {
/* me_remap_guard don't have destructor (Slim Reader/Writer Lock) */
DeleteCriticalSection(&env->me_windowsbug_lock);
#else
mdbx_ensure(env,
mdbx_fastmutex_destroy(&env->me_lckless_wmutex) == MDBX_SUCCESS);
mdbx_ensure(env,
mdbx_fastmutex_destroy(&env->me_remap_guard) == MDBX_SUCCESS);
#endif /* Windows */
#ifdef MDBX_OSAL_LOCK
mdbx_ensure(env, mdbx_fastmutex_destroy(&env->me_lckless_stub.wmutex) ==
MDBX_SUCCESS);
#endif
env->me_pid = 0;
env->me_signature = 0;
mdbx_free(env);
@ -12646,7 +12662,6 @@ int mdbx_drop(MDBX_txn *txn, MDBX_dbi dbi, int del) {
txn->mt_dbs[dbi].md_entries = 0;
txn->mt_dbs[dbi].md_root = P_INVALID;
txn->mt_dbs[dbi].md_seq = 0;
txn->mt_flags |= MDBX_TXN_DIRTY;
}
@ -13003,8 +13018,14 @@ int __cold mdbx_env_set_syncbytes(MDBX_env *env, size_t bytes) {
if (unlikely(env->me_signature != MDBX_ME_SIGNATURE))
return MDBX_EBADSIGN;
env->me_sync_threshold = bytes;
return env->me_map ? mdbx_env_sync(env, false) : MDBX_SUCCESS;
if (unlikely(env->me_flags & (MDBX_RDONLY | MDBX_FATAL_ERROR)))
return MDBX_EACCESS;
if (unlikely(!env->me_map))
return MDBX_EPERM;
*env->me_autosync_threshold = bytes2pgno(env, bytes + env->me_psize - 1);
return bytes ? mdbx_env_sync(env, false) : MDBX_SUCCESS;
}
int __cold mdbx_env_set_oomfunc(MDBX_env *env, MDBX_oom_func *oomfunc) {
@ -13349,11 +13370,9 @@ int mdbx_canary_put(MDBX_txn *txn, const mdbx_canary *canary) {
txn->mt_canary.v = txn->mt_txnid;
if ((txn->mt_flags & MDBX_TXN_DIRTY) == 0) {
MDBX_env *env = txn->mt_env;
txn->mt_flags |= MDBX_TXN_DIRTY;
env->me_sync_pending += env->me_psize;
*txn->mt_env->me_unsynced_pages += 1;
}
return MDBX_SUCCESS;
}