mdbx: rework mdbx_page_flush(), providing flush_begin & flush_end.

Change-Id: Id7366d427d204b444ab76c880ad1c0757a7de94e
This commit is contained in:
Leonid Yuriev 2019-10-17 10:03:45 +03:00
parent 6a01955810
commit 659933d0c9

View File

@ -1685,6 +1685,8 @@ static int __must_check_result mdbx_page_search(MDBX_cursor *mc, MDBX_val *key,
int flags);
static int __must_check_result mdbx_page_merge(MDBX_cursor *csrc,
MDBX_cursor *cdst);
static int __must_check_result mdbx_page_flush(MDBX_txn *txn,
const unsigned keep);
#define MDBX_SPLIT_REPLACE MDBX_APPENDDUP /* newkey is not new */
static int __must_check_result mdbx_page_split(MDBX_cursor *mc,
@ -2488,8 +2490,6 @@ mark_done:
return rc;
}
static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep);
/* Spill pages from the dirty list back to disk.
* This is intended to prevent running into MDBX_TXN_FULL situations,
* but note that they may still occur in a few cases:
@ -2525,12 +2525,12 @@ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep);
*
* Returns 0 on success, non-zero on failure. */
static int mdbx_page_spill(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data) {
MDBX_txn *txn = mc->mc_txn;
MDBX_DPL dl = txn->tw.dirtylist;
if (mc->mc_flags & C_SUB)
return MDBX_SUCCESS;
MDBX_txn *txn = mc->mc_txn;
MDBX_DPL dl = txn->tw.dirtylist;
/* Estimate how much space this op will take */
pgno_t i = mc->mc_db->md_depth;
/* Named DBs also dirty the main DB */
@ -5559,123 +5559,114 @@ bailout_notracking:
return rc;
}
static int mdbx_flush_iov(MDBX_txn *const txn, struct iovec *iov,
unsigned iov_items, size_t iov_off,
size_t iov_bytes) {
MDBX_env *const env = txn->mt_env;
int rc = mdbx_pwritev(env->me_fd, iov, iov_items, iov_off, iov_bytes);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_error("Write error: %s", mdbx_strerror(rc));
txn->mt_flags |= MDBX_TXN_ERROR;
}
for (unsigned i = 0; i < iov_items; i++)
mdbx_dpage_free(env, (MDBX_page *)iov[i].iov_base,
bytes2pgno(env, iov[i].iov_len));
return rc;
}
/* Flush (some) dirty pages to the map, after clearing their dirty flag.
* [in] txn the transaction that's being committed
* [in] keep number of initial pages in dirtylist to keep dirty.
* Returns 0 on success, non-zero on failure. */
static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep) {
MDBX_env *env = txn->mt_env;
const MDBX_DPL dl = txn->tw.dirtylist;
unsigned i, j, pagecount = dl->length;
int rc;
size_t size = 0, pos = 0;
pgno_t pgno = 0;
MDBX_page *dp = NULL;
static int mdbx_page_flush(MDBX_txn *txn, const unsigned keep) {
struct iovec iov[MDBX_COMMIT_PAGES];
intptr_t wpos = 0, wsize = 0;
size_t next_pos = 1; /* impossible pos, so pos != next_pos */
int n = 0;
const MDBX_DPL dl = (keep || txn->tw.loose_count > 1)
? mdbx_dpl_sort(txn->tw.dirtylist)
: txn->tw.dirtylist;
MDBX_env *const env = txn->mt_env;
pgno_t flush_begin = MAX_PAGENO;
pgno_t flush_end = MIN_PAGENO;
unsigned iov_items = 0;
size_t iov_bytes = 0;
size_t iov_off = 0;
unsigned r, w;
for (r = w = keep; ++r <= dl->length;) {
MDBX_page *dp = dl[r].ptr;
mdbx_tassert(txn,
dp->mp_pgno >= MIN_PAGENO && dp->mp_pgno < txn->mt_next_pgno);
mdbx_tassert(txn, dp->mp_flags & P_DIRTY);
j = i = keep;
if (env->me_flags & MDBX_WRITEMAP) {
/* Clear dirty flags */
while (++i <= pagecount) {
dp = dl[i].ptr;
/* Don't flush this page yet */
if (dp->mp_flags & (P_LOOSE | P_KEEP)) {
dp->mp_flags &= ~P_KEEP;
dl[++j] = dl[i];
continue;
}
dp->mp_flags &= ~P_DIRTY;
dp->mp_validator = 0 /* TODO */;
*env->me_unsynced_pages += IS_OVERFLOW(dp) ? dp->mp_pages : 1;
/* Don't flush this page yet */
if (dp->mp_flags & (P_LOOSE | P_KEEP)) {
dp->mp_flags &= ~P_KEEP;
dl[++w] = dl[r];
continue;
}
goto done;
}
/* Write the pages */
for (;;) {
if (++i <= pagecount) {
dp = dl[i].ptr;
/* Don't flush this page yet */
if (dp->mp_flags & (P_LOOSE | P_KEEP)) {
dp->mp_flags &= ~P_KEEP;
dl[i].pgno = 0;
continue;
}
pgno = dl[i].pgno;
mdbx_tassert(txn, pgno >= MIN_PAGENO);
/* clear dirty flag */
dp->mp_flags &= ~P_DIRTY;
dp->mp_validator = 0 /* TODO */;
pos = pgno2bytes(env, pgno);
const unsigned npages = IS_OVERFLOW(dp) ? dp->mp_pages : 1;
*env->me_unsynced_pages += npages;
size = pgno2bytes(env, npages);
}
/* Write up to MDBX_COMMIT_PAGES dirty pages at a time. */
if (pos != next_pos || n == MDBX_COMMIT_PAGES || wsize + size > MAX_WRITE) {
if (n) {
/* Write previous page(s) */
rc = mdbx_pwritev(env->me_fd, iov, n, wpos, wsize);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_debug("Write error: %s", mdbx_strerror(rc));
return rc;
}
const unsigned npages = IS_OVERFLOW(dp) ? dp->mp_pages : 1;
flush_begin = (flush_begin < dp->mp_pgno) ? flush_begin : dp->mp_pgno;
flush_end =
(flush_end > dp->mp_pgno + npages) ? flush_end : dp->mp_pgno + npages;
*env->me_unsynced_pages += npages;
dp->mp_flags &= ~P_DIRTY;
dp->mp_validator = 0 /* TODO */;
if ((env->me_flags & MDBX_WRITEMAP) == 0) {
const size_t size = pgno2bytes(env, npages);
if (iov_off + iov_bytes != pgno2bytes(env, dp->mp_pgno) ||
iov_items == ARRAY_LENGTH(iov) || iov_bytes + size > MAX_WRITE) {
if (iov_items) {
int rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
#if MDBX_CPU_CACHE_MMAP_NONCOHERENT
#if defined(__linux__) || defined(__gnu_linux__)
if (mdbx_linux_kernel_version >= 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
if (mdbx_linux_kernel_version >= 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
#endif /* Linux */
mdbx_invalidate_mmap_noncoherent_cache(env->me_map + wpos, wsize);
mdbx_invalidate_mmap_noncoherent_cache(env->me_map + iov_off,
iov_bytes);
#endif /* MDBX_CPU_CACHE_MMAP_NONCOHERENT */
n = 0;
iov_items = 0;
iov_bytes = 0;
}
iov_off = pgno2bytes(env, dp->mp_pgno);
}
if (i > pagecount)
break;
wpos = pos;
wsize = 0;
iov[iov_items].iov_base = dp;
iov[iov_items].iov_len = size;
iov_items += 1;
iov_bytes += size;
}
mdbx_debug("committing page %" PRIaPGNO, pgno);
next_pos = pos + size;
iov[n].iov_len = size;
iov[n].iov_base = (char *)dp;
wsize += size;
n++;
}
if (iov_items) {
int rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}
#if MDBX_CPU_CACHE_MMAP_NONCOHERENT && \
(defined(__linux__) || defined(__gnu_linux__))
if (mdbx_linux_kernel_version < 0x02060b00) {
if ((env->me_flags & MDBX_WRITEMAP) == 0 &&
mdbx_linux_kernel_version < 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the whole
* cache is always flushed. */
mdbx_invalidate_mmap_noncoherent_cache(env->me_map,
pgno2bytes(env, txn->mt_next_pgno));
}
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
mdbx_invalidate_mmap_noncoherent_cache(
env->me_map + pgno2bytes(env, flush_begin),
pgno2bytes(env, flush_end - flush_begin));
#endif /* MDBX_CPU_CACHE_MMAP_NONCOHERENT && Linux */
for (i = keep; ++i <= pagecount;) {
dp = dl[i].ptr;
/* This is a page we skipped above */
if (!dl[i].pgno) {
dl[++j] = dl[i];
dl[j].pgno = dp->mp_pgno;
continue;
}
mdbx_dpage_free(env, dp, IS_OVERFLOW(dp) ? dp->mp_pages : 1);
}
/* TODO: use flush_begin & flush_end for msync() & sync_file_range(). */
(void)flush_begin;
(void)flush_end;
done:
i--;
txn->tw.dirtyroom += i - j;
dl->length = j;
txn->tw.dirtyroom += r - 1 - w;
dl->length = w;
mdbx_tassert(txn, txn->mt_parent ||
txn->tw.dirtyroom + txn->tw.dirtylist->length ==
MDBX_DPL_TXNFULL);