lmdb: auto-sync with kbytes threshold (aka checkpoint by kbytes).

Change-Id: If29a3d70e775a65d832f578f0c3edd3ea41dcc67
This commit is contained in:
Leo Yuriev 2014-11-30 12:19:27 +03:00
parent c9489da920
commit fcfe2b7042
2 changed files with 56 additions and 14 deletions

19
lmdb.h
View File

@ -909,6 +909,25 @@ typedef void MDB_assert_func(MDB_env *env, const char *msg);
*/ */
int mdb_env_set_assert(MDB_env *env, MDB_assert_func *func); int mdb_env_set_assert(MDB_env *env, MDB_assert_func *func);
/** @brief Set threshold to force flush the data buffers to disk,
* even of #MDB_NOSYNC, #MDB_NOMETASYNC and #MDB_MAPASYNC flags
* in the environment.
*
* Data is always written to disk when #mdb_txn_commit() is called,
* but the operating system may keep it buffered. LMDB always flushes
* the OS buffers upon commit as well, unless the environment was
* opened with #MDB_NOSYNC or in part #MDB_NOMETASYNC.
*
* The default is 0, than mean no any threshold checked,
* and no additional flush will be made.
*
* @param[in] env An environment handle returned by #mdb_env_create()
* @param[in] bytes The size in bytes of summary changes
* when a synchronous flush would be made.
* @return A non-zero error value on failure and 0 on success.
*/
int mdb_env_set_syncbytes(MDB_env *env, size_t bytes);
/** @brief Create a transaction for use with the environment. /** @brief Create a transaction for use with the environment.
* *
* The transaction handle may be discarded using #mdb_txn_abort() or #mdb_txn_commit(). * The transaction handle may be discarded using #mdb_txn_abort() or #mdb_txn_commit().

51
mdb.c
View File

@ -1188,6 +1188,9 @@ struct MDB_env {
#endif #endif
void *me_userctx; /**< User-settable context */ void *me_userctx; /**< User-settable context */
MDB_assert_func *me_assert_func; /**< Callback for assertion failures */ MDB_assert_func *me_assert_func; /**< Callback for assertion failures */
uint64_t me_sync_pending; /**< Total dirty/commited bytes since the last mdb_env_sync() */
uint64_t me_sync_threshold; /**< Treshold of above to force synchronous flush */
size_t me_sync_size; /**< Tracking me_size for FGREW/fsync() */
}; };
/** Nested transaction */ /** Nested transaction */
@ -1235,7 +1238,7 @@ static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
static int mdb_env_read_header(MDB_env *env, MDB_meta *meta); static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
static int mdb_env_pick_meta(const MDB_env *env); static int mdb_env_pick_meta(const MDB_env *env);
static int mdb_env_write_meta(MDB_txn *txn); static int mdb_env_write_meta(MDB_txn *txn, int force);
#if !(defined(_WIN32) || defined(MDB_USE_SYSV_SEM)) /* Drop unused excl arg */ #if !(defined(_WIN32) || defined(MDB_USE_SYSV_SEM)) /* Drop unused excl arg */
# define mdb_env_close0(env, excl) mdb_env_close1(env) # define mdb_env_close0(env, excl) mdb_env_close1(env)
#endif #endif
@ -2352,22 +2355,28 @@ fail:
http://www.openldap.org/lists/openldap-devel/201411/msg00000.html */ http://www.openldap.org/lists/openldap-devel/201411/msg00000.html */
static int static int
mdb_env_sync0(MDB_env *env, int flag) mdb_env_sync0(MDB_env *env, unsigned int *flags)
{ {
int rc = 0, force = flag & FORCE; int rc = 0, force;
if (env->me_sync_threshold && env->me_sync_pending >= env->me_sync_threshold) {
*flags |= FORCE;
if (env->me_sync_size != env->me_size)
*flags |= FGREW;
}
force = *flags & FORCE;
if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) { if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) {
if (env->me_flags & MDB_WRITEMAP) { if (env->me_flags & MDB_WRITEMAP) {
int flags = ((env->me_flags & MDB_MAPASYNC) && !force) int mode = ((env->me_flags & MDB_MAPASYNC) && !force)
? MS_ASYNC : MS_SYNC; ? MS_ASYNC : MS_SYNC;
if (MDB_MSYNC(env->me_map, env->me_mapsize, flags)) if (MDB_MSYNC(env->me_map, env->me_mapsize, mode))
rc = ErrCode(); rc = ErrCode();
#ifdef _WIN32 #ifdef _WIN32
else if (flags == MS_SYNC && MDB_FDATASYNC(env->me_fd)) else if (mode == MS_SYNC && MDB_FDATASYNC(env->me_fd))
rc = ErrCode(); rc = ErrCode();
#endif #endif
} else { } else {
#ifdef HAVE_FDATASYNC #ifdef HAVE_FDATASYNC
if (flag & FGREW) { if (*flags & FGREW) {
if (fsync(env->me_fd)) /* Avoid ext-fs bugs, do full sync */ if (fsync(env->me_fd)) /* Avoid ext-fs bugs, do full sync */
rc = ErrCode(); rc = ErrCode();
} else } else
@ -2375,6 +2384,11 @@ mdb_env_sync0(MDB_env *env, int flag)
if (MDB_FDATASYNC(env->me_fd)) if (MDB_FDATASYNC(env->me_fd))
rc = ErrCode(); rc = ErrCode();
} }
if (! rc) {
env->me_sync_pending = 0;
if (*flags & FGREW)
env->me_sync_size = env->me_size;
}
} }
return rc; return rc;
} }
@ -2382,7 +2396,8 @@ mdb_env_sync0(MDB_env *env, int flag)
int int
mdb_env_sync(MDB_env *env, int force) mdb_env_sync(MDB_env *env, int force)
{ {
return mdb_env_sync0(env, force != 0); unsigned int flags = force ? FORCE | FGREW : 0;
return mdb_env_sync0(env, &flags);
} }
/** Back up parent txn's cursors, then grab the originals for tracking */ /** Back up parent txn's cursors, then grab the originals for tracking */
@ -3174,6 +3189,7 @@ mdb_page_flush(MDB_txn *txn, int keep)
continue; continue;
} }
dp->mp_flags &= ~P_DIRTY; dp->mp_flags &= ~P_DIRTY;
env->me_sync_pending += IS_OVERFLOW(dp) ? psize * dp->mp_pages : psize;
} }
goto done; goto done;
} }
@ -3194,6 +3210,7 @@ mdb_page_flush(MDB_txn *txn, int keep)
pos = pgno * psize; pos = pgno * psize;
size = psize; size = psize;
if (IS_OVERFLOW(dp)) size *= dp->mp_pages; if (IS_OVERFLOW(dp)) size *= dp->mp_pages;
env->me_sync_pending += size;
} }
#ifdef _WIN32 #ifdef _WIN32
else break; else break;
@ -3487,8 +3504,8 @@ mdb_txn_commit(MDB_txn *txn)
} }
#endif #endif
if ((rc = mdb_page_flush(txn, 0)) || if ((rc = mdb_page_flush(txn, 0)) ||
(rc = mdb_env_sync0(env, i)) || (rc = mdb_env_sync0(env, &i)) ||
(rc = mdb_env_write_meta(txn))) (rc = mdb_env_write_meta(txn, i != 0)))
goto fail; goto fail;
/* Free P_LOOSE pages left behind in dirty_list */ /* Free P_LOOSE pages left behind in dirty_list */
@ -3512,6 +3529,12 @@ fail:
return rc; return rc;
} }
int
mdb_env_set_syncbytes(MDB_env *env, size_t bytes) {
env->me_sync_threshold = bytes;
return env->me_map ? mdb_env_sync(env, 0) : 0;
}
/** Read the environment parameters of a DB environment before /** Read the environment parameters of a DB environment before
* mapping it into memory. * mapping it into memory.
* @param[in] env the environment handle * @param[in] env the environment handle
@ -3646,7 +3669,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
static int static int
mdb_env_write_meta(MDB_txn *txn) mdb_env_write_meta(MDB_txn *txn, int force)
{ {
MDB_env *env; MDB_env *env;
MDB_meta meta, metab, *mp; MDB_meta meta, metab, *mp;
@ -3682,9 +3705,9 @@ mdb_env_write_meta(MDB_txn *txn)
__sync_synchronize(); __sync_synchronize();
#endif #endif
mp->mm_txnid = txn->mt_txnid; mp->mm_txnid = txn->mt_txnid;
if (!(env->me_flags & (MDB_NOMETASYNC|MDB_NOSYNC))) { if (force || !(env->me_flags & (MDB_NOMETASYNC|MDB_NOSYNC))) {
unsigned meta_size = env->me_psize; unsigned meta_size = env->me_psize;
rc = (env->me_flags & MDB_MAPASYNC) ? MS_ASYNC : MS_SYNC; rc = (!force && (env->me_flags & MDB_MAPASYNC)) ? MS_ASYNC : MS_SYNC;
ptr = env->me_map; ptr = env->me_map;
if (toggle) { if (toggle) {
#ifndef _WIN32 /* POSIX msync() requires ptr = start of OS page */ #ifndef _WIN32 /* POSIX msync() requires ptr = start of OS page */
@ -3718,7 +3741,7 @@ mdb_env_write_meta(MDB_txn *txn)
off += PAGEHDRSZ; off += PAGEHDRSZ;
/* Write to the SYNC fd */ /* Write to the SYNC fd */
mfd = env->me_flags & (MDB_NOSYNC|MDB_NOMETASYNC) ? mfd = (!force || (env->me_flags & (MDB_NOSYNC|MDB_NOMETASYNC))) ?
env->me_fd : env->me_mfd; env->me_fd : env->me_mfd;
#ifdef _WIN32 #ifdef _WIN32
{ {