mdbx: LRU spilling feature (squashed).

Resolves https://github.com/erthink/libmdbx/issues/186

Change-Id: Ie3318964e4e2adbeb77738b8301c45075080850b
This commit is contained in:
Leonid Yuriev 2021-04-26 17:51:11 +03:00
parent 3e272d339a
commit 4fa5e95241

View File

@ -3581,8 +3581,6 @@ static int __must_check_result mdbx_page_search(MDBX_cursor *mc,
const MDBX_val *key, int flags);
static int __must_check_result mdbx_page_merge(MDBX_cursor *csrc,
MDBX_cursor *cdst);
static int __must_check_result mdbx_page_flush(MDBX_txn *txn,
const unsigned keep);
#define MDBX_SPLIT_REPLACE MDBX_APPENDDUP /* newkey is not new */
static int __must_check_result mdbx_page_split(MDBX_cursor *mc,
@ -4591,7 +4589,8 @@ static int mdbx_page_retire_ex(MDBX_cursor *mc, const pgno_t pgno,
mdbx_tassert(txn, !is_spilled);
mdbx_tassert(txn, !txn->tw.spill_pages ||
!mdbx_pnl_exist(txn->tw.spill_pages, pgno << 1));
mdbx_tassert(txn, debug_dpl_find(txn, pgno) == mp || txn->mt_parent);
mdbx_tassert(txn, debug_dpl_find(txn, pgno) == mp || txn->mt_parent ||
(txn->mt_flags & MDBX_WRITEMAP));
} else {
mdbx_tassert(txn, !debug_dpl_find(txn, pgno));
}
@ -4654,6 +4653,9 @@ status_done:
* Её МОЖНО вытолкнуть в нераспределенный хвост. */
kind = "spilled";
mdbx_spill_remove(txn, si, npages);
} else if ((txn->mt_flags & MDBX_WRITEMAP)) {
kind = "writemap";
mdbx_tassert(txn, mp && IS_MODIFIABLE(txn, mp));
} else {
/* Страница аллоцирована, запачкана и возможно пролита в одной
* из родительских транзакций.
@ -4809,13 +4811,8 @@ static __inline int mdbx_page_retire(MDBX_cursor *mc, MDBX_page *mp) {
return mdbx_page_retire_ex(mc, mp->mp_pgno, mp, PAGETYPE(mp));
}
/* Toggle P_KEEP in dirty, non-overflow, non-sub pages watched by txn.
*
* [in] mc A cursor handle for the current operation.
* [in] pflags Flags of the pages to update:
* - 0 to set P_KEEP,
* - P_KEEP to clear it. */
static void mdbx_cursor_xkeep(MDBX_cursor *mc, unsigned pflags) {
/* Set P_KEEP in dirty, non-overflow, non-sub pages watched by txn. */
static void mdbx_cursor_keep(MDBX_cursor *mc) {
const unsigned mask = P_SUBP | P_LOOSE | P_KEEP | P_SPILLED;
if (mc->mc_flags & C_INITIALIZED) {
MDBX_cursor *m3 = mc;
@ -4823,8 +4820,8 @@ static void mdbx_cursor_xkeep(MDBX_cursor *mc, unsigned pflags) {
MDBX_page *mp = NULL;
for (unsigned j = 0; j < m3->mc_snum; j++) {
mp = m3->mc_pg[j];
if (IS_MODIFIABLE(mc->mc_txn, mp) && (mp->mp_flags & mask) == pflags)
mp->mp_flags ^= P_KEEP;
if (IS_MODIFIABLE(mc->mc_txn, mp) && !(mp->mp_flags & mask))
mp->mp_flags |= P_KEEP;
}
if (!(mp && IS_LEAF(mp)))
break;
@ -4845,20 +4842,16 @@ static void mdbx_cursor_xkeep(MDBX_cursor *mc, unsigned pflags) {
}
}
/* Mark pages seen by cursors: First m0, then tracked cursors
* [in] all No shortcuts. Needed except after a full mdbx_page_flush(). */
static void mdbx_txn_xkeep(MDBX_txn *txn, MDBX_cursor *m0,
const unsigned pflags, const bool all) {
static void mdbx_txn_keep(MDBX_txn *txn, MDBX_cursor *m0) {
if (m0)
mdbx_cursor_xkeep(m0, pflags);
mdbx_cursor_keep(m0);
for (unsigned i = FREE_DBI; i < txn->mt_numdbs; ++i)
if (txn->mt_dbistate[i] & DBI_DIRTY)
for (MDBX_cursor *mc = txn->tw.cursors[i]; mc; mc = mc->mc_next)
if (mc != m0)
mdbx_cursor_xkeep(mc, pflags);
mdbx_cursor_keep(mc);
if (all) {
/* Mark dirty root pages */
const unsigned mask = P_SUBP | P_LOOSE | P_KEEP | P_SPILLED;
for (unsigned i = 0; i < txn->mt_numdbs; i++) {
@ -4869,12 +4862,190 @@ static void mdbx_txn_xkeep(MDBX_txn *txn, MDBX_cursor *m0,
unsigned di = mdbx_dpl_exist(txn, pgno);
if (di) {
MDBX_page *dp = txn->tw.dirtylist->items[di].ptr;
if ((dp->mp_flags & mask) == pflags)
dp->mp_flags ^= P_KEEP;
if (!(dp->mp_flags & mask))
dp->mp_flags |= P_KEEP;
}
}
}
}
/* Returns the spilling priority (0..255) for a dirty page:
* 0 = should be spilled;
* ...
* > 255 = must not be spilled. */
static unsigned spill_prio(const MDBX_txn *txn, const unsigned i,
const unsigned lru_min, const unsigned reciprocal) {
MDBX_dpl *const dl = txn->tw.dirtylist;
const pgno_t pgno = dl->items[i].pgno;
MDBX_page *const dp = dl->items[i].ptr;
const unsigned lru = dl->items[i].lru;
const unsigned npages = dpl_npages(dl, i);
if (dp->mp_flags & (P_LOOSE | P_KEEP | P_SPILLED)) {
mdbx_debug("skip %s %u page %" PRIaPGNO,
(dp->mp_flags & P_LOOSE)
? "loose"
: (dp->mp_flags & P_LOOSE)
? "loose"
: (dp->mp_flags & P_SPILLED) ? "parent-spilled"
: "keep",
npages, pgno);
return 256;
}
/* Can't spill twice,
* make sure it's not already in a parent's spill list(s). */
MDBX_txn *parent = txn->mt_parent;
if (parent && (parent->mt_flags & MDBX_TXN_SPILLS)) {
do
if (parent->tw.spill_pages &&
mdbx_pnl_intersect(parent->tw.spill_pages, pgno << 1, npages << 1)) {
mdbx_debug("skip-2 parent-spilled %u page %" PRIaPGNO, npages, pgno);
dp->mp_flags |= P_SPILLED;
return 256;
}
while ((parent = parent->mt_parent) != nullptr);
}
unsigned prio = 1 + ((lru - lru_min) * reciprocal >> 8);
mdbx_tassert(txn, prio > 0 && prio < 256);
if (npages > 1) {
/* makes a large/overflow pages be likely to spill */
uint32_t x = npages | npages >> 16;
x |= x >> 8;
x |= x >> 4;
x |= x >> 2;
x |= x >> 1;
prio = (255 - prio) * log2n_powerof2(x + 1) + 157;
prio = (prio < 256) ? 255 - prio : 0;
mdbx_tassert(txn, prio < 256);
}
return prio;
}
struct mdbx_iov_ctx {
unsigned iov_items;
size_t iov_bytes;
size_t iov_off;
pgno_t flush_begin;
pgno_t flush_end;
struct iovec iov[MDBX_COMMIT_PAGES];
};
static __inline void mdbx_iov_init(MDBX_txn *const txn,
struct mdbx_iov_ctx *ctx) {
ctx->flush_begin = MAX_PAGENO;
ctx->flush_end = MIN_PAGENO;
ctx->iov_items = 0;
ctx->iov_bytes = 0;
ctx->iov_off = 0;
(void)txn;
}
static __inline void mdbx_iov_done(MDBX_txn *const txn,
struct mdbx_iov_ctx *ctx) {
mdbx_tassert(txn, ctx->iov_items == 0);
#if defined(__linux__) || defined(__gnu_linux__)
MDBX_env *const env = txn->mt_env;
if (!(txn->mt_flags & MDBX_WRITEMAP) &&
mdbx_linux_kernel_version < 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
mdbx_flush_incoherent_mmap(
env->me_map + pgno2bytes(env, ctx->flush_begin),
pgno2bytes(env, ctx->flush_end - ctx->flush_begin), env->me_os_psize);
#endif /* Linux */
}
static int mdbx_iov_write(MDBX_txn *const txn, struct mdbx_iov_ctx *ctx) {
mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP));
mdbx_tassert(txn, ctx->iov_items > 0);
MDBX_env *const env = txn->mt_env;
int rc;
if (likely(ctx->iov_items == 1)) {
mdbx_assert(env, ctx->iov_bytes == (size_t)ctx->iov[0].iov_len);
rc = mdbx_pwrite(env->me_lazy_fd, ctx->iov[0].iov_base, ctx->iov[0].iov_len,
ctx->iov_off);
} else {
rc = mdbx_pwritev(env->me_lazy_fd, ctx->iov, ctx->iov_items, ctx->iov_off,
ctx->iov_bytes);
}
if (unlikely(rc != MDBX_SUCCESS))
mdbx_error("Write error: %s", mdbx_strerror(rc));
else {
VALGRIND_MAKE_MEM_DEFINED(txn->mt_env->me_map + ctx->iov_off,
ctx->iov_bytes);
ASAN_UNPOISON_MEMORY_REGION(txn->mt_env->me_map + ctx->iov_off,
ctx->iov_bytes);
}
for (unsigned i = 0; i < ctx->iov_items; i++)
mdbx_dpage_free(env, (MDBX_page *)ctx->iov[i].iov_base,
bytes2pgno(env, ctx->iov[i].iov_len));
ctx->iov_items = 0;
ctx->iov_bytes = 0;
return rc;
}
static int iov_page(MDBX_txn *txn, struct mdbx_iov_ctx *ctx, MDBX_page *dp,
unsigned npages) {
MDBX_env *const env = txn->mt_env;
mdbx_tassert(txn,
dp->mp_pgno >= MIN_PAGENO && dp->mp_pgno < txn->mt_next_pgno);
mdbx_tassert(txn, IS_MODIFIABLE(txn, dp));
mdbx_tassert(txn,
!(dp->mp_flags & ~(P_BRANCH | P_LEAF | P_LEAF2 | P_OVERFLOW)));
ctx->flush_begin =
(ctx->flush_begin < dp->mp_pgno) ? ctx->flush_begin : dp->mp_pgno;
ctx->flush_end = (ctx->flush_end > dp->mp_pgno + npages)
? ctx->flush_end
: dp->mp_pgno + npages;
env->me_unsynced_pages->weak += npages;
if (IS_SHADOWED(txn, dp)) {
mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP));
dp->mp_txnid = txn->mt_txnid;
mdbx_tassert(txn, IS_SPILLED(txn, dp));
const size_t size = pgno2bytes(env, npages);
if (ctx->iov_off + ctx->iov_bytes != pgno2bytes(env, dp->mp_pgno) ||
ctx->iov_items == ARRAY_LENGTH(ctx->iov) ||
ctx->iov_bytes + size > MAX_WRITE) {
if (ctx->iov_items) {
int err = mdbx_iov_write(txn, ctx);
if (unlikely(err != MDBX_SUCCESS))
return err;
#if defined(__linux__) || defined(__gnu_linux__)
if (mdbx_linux_kernel_version >= 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
#endif /* Linux */
mdbx_flush_incoherent_mmap(env->me_map + ctx->iov_off, ctx->iov_bytes,
env->me_os_psize);
}
ctx->iov_off = pgno2bytes(env, dp->mp_pgno);
}
ctx->iov[ctx->iov_items].iov_base = (void *)dp;
ctx->iov[ctx->iov_items].iov_len = size;
ctx->iov_items += 1;
ctx->iov_bytes += size;
} else {
mdbx_tassert(txn, txn->mt_flags & MDBX_WRITEMAP);
}
return MDBX_SUCCESS;
}
static int spill_page(MDBX_txn *txn, struct mdbx_iov_ctx *ctx, MDBX_page *dp,
unsigned npages) {
mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP));
pgno_t pgno = dp->mp_pgno;
int err = iov_page(txn, ctx, dp, npages);
if (likely(err == MDBX_SUCCESS))
err = mdbx_pnl_append_range(true, &txn->tw.spill_pages, pgno << 1, npages);
return err;
}
/* Spill pages from the dirty list back to disk.
@ -4933,13 +5104,60 @@ static int mdbx_txn_spill(MDBX_txn *txn, MDBX_cursor *m0, unsigned need) {
wanna_spill, txn->tw.dirtyroom, need);
mdbx_tassert(txn, txn->tw.dirtylist->length >= wanna_spill);
int rc;
struct mdbx_iov_ctx ctx;
mdbx_iov_init(txn, &ctx);
int rc = MDBX_SUCCESS;
if (txn->mt_flags & MDBX_WRITEMAP) {
MDBX_dpl *const dl = txn->tw.dirtylist;
#ifndef MDBX_FAKE_SPILL_WRITEMAP
#define MDBX_FAKE_SPILL_WRITEMAP 1
#endif
const unsigned span = dl->length - txn->tw.loose_count;
txn->tw.dirtyroom += span;
if (MDBX_FAKE_SPILL_WRITEMAP) {
txn->mt_env->me_unsynced_pages->weak += span;
dpl_clear(dl);
for (MDBX_page *loose = txn->tw.loose_pages; loose;
loose = loose->mp_next) {
rc = mdbx_dpl_append(txn, loose->mp_pgno, loose, 1);
mdbx_tassert(txn, rc == MDBX_SUCCESS);
}
mdbx_tassert(txn, mdbx_dirtylist_check(txn));
} else {
unsigned r, w;
for (w = 0, r = 1; r <= dl->length; ++r) {
MDBX_page *dp = dl->items[r].ptr;
if (dp->mp_flags & P_LOOSE)
dl->items[++w] = dl->items[r];
else {
rc = iov_page(txn, &ctx, dp, dpl_npages(dl, r));
mdbx_tassert(txn, rc == MDBX_SUCCESS);
}
}
mdbx_tassert(txn, span == r - 1 - w && w == txn->tw.loose_count);
dl->sorted = (dl->sorted == dl->length) ? w : 0;
dpl_setlen(dl, w);
mdbx_tassert(txn, mdbx_dirtylist_check(txn));
MDBX_env *const env = txn->mt_env;
rc = mdbx_msync(&env->me_dxb_mmap,
pgno_align2os_bytes(env, ctx.flush_begin),
pgno_align2os_bytes(env, ctx.flush_end - ctx.flush_begin),
MDBX_SYNC_NONE);
}
return rc;
}
mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP));
if (!txn->tw.spill_pages) {
txn->tw.spill_least_removed = INT_MAX;
txn->tw.spill_pages = mdbx_pnl_alloc(wanna_spill);
if (unlikely(!txn->tw.spill_pages)) {
rc = MDBX_ENOMEM;
goto bailout;
bailout:
txn->mt_flags |= MDBX_TXN_ERROR;
return rc;
}
} else {
/* purge deleted slots */
@ -4950,75 +5168,153 @@ static int mdbx_txn_spill(MDBX_txn *txn, MDBX_cursor *m0, unsigned need) {
;
}
/* Preserve pages which may soon be dirtied again */
mdbx_txn_xkeep(txn, m0, 0, true);
/* Сортируем чтобы запись на диск была полее последовательна */
MDBX_dpl *const dl = mdbx_dpl_sort(txn);
/* Save the page IDs of all the pages we're flushing */
/* flush from the tail forward, this saves a lot of shifting later on. */
const unsigned dl_len_before = dl->length;
unsigned spilled = 0;
unsigned keep = dl_len_before;
for (; keep && wanna_spill; keep--) {
const pgno_t pgno = dl->items[keep].pgno;
MDBX_page *dp = dl->items[keep].ptr;
const unsigned npages = dpl_npages(dl, keep);
if (dp->mp_flags & (P_LOOSE | P_KEEP | P_SPILLED)) {
mdbx_debug("skip %s %u page %" PRIaPGNO,
(dp->mp_flags & P_LOOSE)
? "loose"
: (dp->mp_flags & P_SPILLED) ? "parent-spilled" : "keep",
npages, dp->mp_pgno);
skip:
/* Preserve pages which may soon be dirtied again */
mdbx_txn_keep(txn, m0);
/* Подзадача: Вытолкнуть часть страниц на диск в соответствии с LRU,
* но при этом учесть важные поправки:
* - лучше выталкивать старые large/overflow страницы, так будет освобождено
* больше памяти, а также так как они (в текущем понимании) гораздо реже
* повторно изменяются;
* - при прочих равных лучше выталкивать смежные страницы, так будет
* меньше I/O операций;
* - желательно потратить на это меньше времени чем std::partial_sort_copy;
*
* Решение:
* - Квантуем весь диапазон lru-меток до 256 значений и задействуем один
* проход 8-битного radix-sort. В результате получаем 256 уровней
* "свежести", в том числе значение lru-метки, старее которой страницы
* должны быть выгружены;
* - Двигаемся последовательно в сторону увеличения номеров страниц
* и выталкиваем страницы с lru-меткой старее отсекающего значения,
* пока не вытолкнем достаточно;
* - Встречая страницы смежные с выталкиваемыми для уменьшения кол-ва
* I/O операций выталкиваем и их, если они попадают в первую половину
* между выталкиваемыми и с самыми свежими lru-метками;
* - дополнительно при сортировке умышленно старим large/overflow страницы,
* тем самым повышая их шансы на выталкивание. */
/* get min/max of LRU-labels */
unsigned lru_min = dl->items[1].lru, lru_max = lru_min;
for (unsigned i = 2; i <= dl->length; ++i) {
lru_min = (lru_min < dl->items[i].lru) ? lru_min : dl->items[i].lru;
lru_max = (lru_max > dl->items[i].lru) ? lru_max : dl->items[i].lru;
}
/* half of 8-bit radix-sort */
unsigned radix_counters[256], spillable = 0;
memset(&radix_counters, 0, sizeof(radix_counters));
unsigned const reciprocal = 255 * 256 / (lru_max - lru_min + 1);
for (unsigned i = 1; i <= dl->length; ++i) {
unsigned prio = spill_prio(txn, i, lru_min, reciprocal);
if (prio < 256) {
radix_counters[prio] += 1;
spillable += 1;
}
}
unsigned prio2spill = 0, prio2adjacent = 127, amount = radix_counters[0];
for (unsigned i = 1; i < 256; i++) {
if (amount < wanna_spill) {
prio2spill = i;
prio2adjacent = i + (255 - i) / 2;
amount += radix_counters[i];
} else if (amount + amount < spillable + wanna_spill
/* РАВНОЗНАЧНО: amount - wanna_spill < spillable - amount */) {
prio2adjacent = i;
amount += radix_counters[i];
} else
break;
}
unsigned prev_prio = 256, spilled = 0;
unsigned r, w, prio;
for (w = 0, r = 1; r <= dl->length && spilled < wanna_spill;
prev_prio = prio, ++r) {
prio = spill_prio(txn, r, lru_min, reciprocal);
MDBX_page *const dp = dl->items[r].ptr;
if (prio < prio2adjacent) {
const pgno_t pgno = dl->items[r].pgno;
const unsigned npages = dpl_npages(dl, r);
if (prio <= prio2spill) {
if (prev_prio < prio2adjacent && prev_prio > prio2spill &&
dpl_endpgno(dl, r - 1) == pgno) {
mdbx_debug("co-spill %u prev-adjacent page %" PRIaPGNO
" (lru-dist %d, prio %u)",
dpl_npages(dl, w), dl->items[r - 1].pgno,
txn->tw.dirtylru - dl->items[r - 1].lru, prev_prio);
--w;
rc = spill_page(txn, &ctx, dl->items[r - 1].ptr,
dpl_npages(dl, r - 1));
if (unlikely(rc != MDBX_SUCCESS))
break;
++spilled;
}
mdbx_debug("spill %u page %" PRIaPGNO " (lru-dist %d, prio %u)", npages,
dp->mp_pgno, txn->tw.dirtylru - dl->items[r].lru, prio);
rc = spill_page(txn, &ctx, dp, npages);
if (unlikely(rc != MDBX_SUCCESS))
break;
++spilled;
continue;
}
/* Can't spill twice,
* make sure it's not already in a parent's spill list(s). */
MDBX_txn *parent = txn->mt_parent;
if (parent && (parent->mt_flags & MDBX_TXN_SPILLS)) {
do
if (parent->tw.spill_pages &&
mdbx_pnl_intersect(parent->tw.spill_pages, pgno << 1,
npages << 1)) {
mdbx_debug("skip-2 parent-spilled %u page %" PRIaPGNO, npages, pgno);
dp->mp_flags |= P_SPILLED;
goto skip;
}
while ((parent = parent->mt_parent) != nullptr);
}
mdbx_debug("spill %u page %" PRIaPGNO, npages, dp->mp_pgno);
rc = mdbx_pnl_append_range(true, &txn->tw.spill_pages, pgno << 1, npages);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
wanna_spill--;
spilled += 1;
}
mdbx_pnl_sort(txn->tw.spill_pages);
/* Flush the spilled part of dirty list */
rc = mdbx_page_flush(txn, keep);
if (prev_prio <= prio2spill && dpl_endpgno(dl, r - 1) == pgno) {
mdbx_debug("co-spill %u next-adjacent page %" PRIaPGNO
" (lru-dist %d, prio %u)",
npages, dp->mp_pgno, txn->tw.dirtylru - dl->items[r].lru,
prio);
rc = spill_page(txn, &ctx, dp, npages);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
break;
prio = prev_prio /* to continue co-spilling next adjacent pages */;
++spilled;
continue;
}
}
if (unlikely(prio > 255 && (dp->mp_flags & P_KEEP)))
/* Reset any dirty pages we kept that page_flush didn't see */
mdbx_tassert(txn, dl_len_before - spilled == dl->length);
mdbx_txn_xkeep(txn, m0, P_KEEP, keep > 0);
dp->mp_flags -= P_KEEP;
dl->items[++w] = dl->items[r];
}
while (r <= dl->length) {
MDBX_page *const dp = dl->items[r].ptr;
if (unlikely(dp->mp_flags & P_KEEP))
/* Reset any dirty pages we kept that page_flush didn't see */
dp->mp_flags -= P_KEEP;
dl->items[++w] = dl->items[r++];
}
mdbx_tassert(txn, r - 1 - w == spilled);
if (unlikely(spilled == 0)) {
mdbx_tassert(txn, ctx.iov_items == 0 && rc == MDBX_SUCCESS);
return MDBX_SUCCESS;
}
dl->sorted = dpl_setlen(dl, w);
txn->tw.dirtyroom += spilled;
mdbx_tassert(txn, mdbx_dirtylist_check(txn));
if (ctx.iov_items)
rc = mdbx_iov_write(txn, &ctx);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
mdbx_pnl_sort(txn->tw.spill_pages);
txn->mt_flags |= MDBX_TXN_SPILLS;
mdbx_notice("spilled %u dirty-entries, now have %u dirty-room", spilled,
txn->tw.dirtyroom);
bailout:
txn->mt_flags |= rc ? MDBX_TXN_ERROR : MDBX_TXN_SPILLS;
return rc;
mdbx_iov_done(txn, &ctx);
return MDBX_SUCCESS;
}
static int mdbx_cursor_spill(MDBX_cursor *mc, const MDBX_val *key,
const MDBX_val *data) {
MDBX_txn *txn = mc->mc_txn;
if (txn->mt_flags & MDBX_WRITEMAP)
return MDBX_SUCCESS;
/* Estimate how much space this operation will take: */
/* 1) Max b-tree height, reasonable enough with including dups' sub-tree */
unsigned need = CURSOR_STACK + 3;
@ -8937,136 +9233,35 @@ bailout_notracking:
return rc;
}
static int mdbx_flush_iov(MDBX_txn *const txn, struct iovec *iov,
unsigned iov_items, size_t iov_off,
size_t iov_bytes) {
MDBX_env *const env = txn->mt_env;
mdbx_assert(env, iov_items > 0);
int rc;
if (likely(iov_items == 1)) {
mdbx_assert(env, iov_bytes == (size_t)iov->iov_len);
rc = mdbx_pwrite(env->me_lazy_fd, iov->iov_base, iov_bytes, iov_off);
} else {
rc = mdbx_pwritev(env->me_lazy_fd, iov, iov_items, iov_off, iov_bytes);
}
if (unlikely(rc != MDBX_SUCCESS))
mdbx_error("Write error: %s", mdbx_strerror(rc));
else {
VALGRIND_MAKE_MEM_DEFINED(txn->mt_env->me_map + iov_off, iov_bytes);
ASAN_UNPOISON_MEMORY_REGION(txn->mt_env->me_map + iov_off, iov_bytes);
}
for (unsigned i = 0; i < iov_items; i++)
mdbx_dpage_free(env, (MDBX_page *)iov[i].iov_base,
bytes2pgno(env, iov[i].iov_len));
return rc;
}
/* Flush (some) dirty pages to the map, after clearing their dirty flag.
* [in] txn the transaction that's being committed
* [in] keep number of initial pages in dirtylist to keep dirty.
* Returns 0 on success, non-zero on failure. */
__hot static int mdbx_page_flush(MDBX_txn *txn, const unsigned keep) {
struct iovec iov[MDBX_COMMIT_PAGES];
MDBX_dpl *const dl = mdbx_dpl_sort(txn);
MDBX_env *const env = txn->mt_env;
pgno_t flush_begin = MAX_PAGENO;
pgno_t flush_end = MIN_PAGENO;
static int mdbx_txn_write(MDBX_txn *txn, struct mdbx_iov_ctx *ctx) {
MDBX_dpl *const dl =
(txn->mt_flags & MDBX_WRITEMAP) ? txn->tw.dirtylist : mdbx_dpl_sort(txn);
int rc = MDBX_SUCCESS;
unsigned iov_items = 0;
size_t iov_bytes = 0;
size_t iov_off = 0;
unsigned r, w;
for (r = w = keep; ++r <= dl->length;) {
for (w = 0, r = 1; r <= dl->length; ++r) {
MDBX_page *dp = dl->items[r].ptr;
mdbx_tassert(txn,
dp->mp_pgno >= MIN_PAGENO && dp->mp_pgno < txn->mt_next_pgno);
mdbx_tassert(txn, dp->mp_flags == P_LOOSE || IS_MODIFIABLE(txn, dp));
/* Don't flush this page yet */
if (dp->mp_flags & P_KEEP) {
dp->mp_flags -= P_KEEP;
if (dp->mp_flags & P_LOOSE) {
dl->items[++w] = dl->items[r];
continue;
}
if (dp->mp_flags & (P_LOOSE | P_SPILLED)) {
dl->items[++w] = dl->items[r];
continue;
}
const unsigned npages = dpl_npages(dl, r);
flush_begin = (flush_begin < dp->mp_pgno) ? flush_begin : dp->mp_pgno;
flush_end =
(flush_end > dp->mp_pgno + npages) ? flush_end : dp->mp_pgno + npages;
env->me_unsynced_pages->weak += npages;
if (IS_SHADOWED(txn, dp)) {
dp->mp_txnid = txn->mt_txnid;
mdbx_tassert(txn, IS_SPILLED(txn, dp));
const size_t size = pgno2bytes(env, npages);
if (iov_off + iov_bytes != pgno2bytes(env, dp->mp_pgno) ||
iov_items == ARRAY_LENGTH(iov) || iov_bytes + size > MAX_WRITE) {
if (iov_items) {
rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes);
#if defined(__linux__) || defined(__gnu_linux__)
if (mdbx_linux_kernel_version >= 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
#endif /* Linux */
mdbx_flush_incoherent_mmap(env->me_map + iov_off, iov_bytes,
env->me_os_psize);
iov_items = 0;
iov_bytes = 0;
if (unlikely(rc != MDBX_SUCCESS)) {
do
dl->items[++w] = dl->items[r];
while (++r <= dl->length);
unsigned npages = dpl_npages(dl, r);
rc = iov_page(txn, ctx, dp, npages);
if (unlikely(rc != MDBX_SUCCESS))
break;
}
}
iov_off = pgno2bytes(env, dp->mp_pgno);
}
iov[iov_items].iov_base = (void *)dp;
iov[iov_items].iov_len = size;
iov_items += 1;
iov_bytes += size;
}
}
mdbx_tassert(txn, dl->sorted == dl->length && r == dl->length + 1);
txn->tw.dirtyroom += dl->length - w;
assert(txn->tw.dirtyroom <= txn->mt_env->me_options.dp_limit);
if (ctx->iov_items)
rc = mdbx_iov_write(txn, ctx);
while (r <= dl->length)
dl->items[++w] = dl->items[r++];
dl->sorted = dpl_setlen(dl, w);
mdbx_tassert(txn, txn->mt_parent ||
txn->tw.dirtyroom + txn->tw.dirtylist->length ==
txn->mt_env->me_options.dp_limit);
if (iov_items)
rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes);
if (unlikely(rc != MDBX_SUCCESS)) {
txn->mt_flags |= MDBX_TXN_ERROR;
txn->tw.dirtyroom += r - 1 - w;
mdbx_tassert(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length ==
(txn->mt_parent ? txn->mt_parent->tw.dirtyroom
: txn->mt_env->me_options.dp_limit));
return rc;
}
#if defined(__linux__) || defined(__gnu_linux__)
if ((env->me_flags & MDBX_WRITEMAP) == 0 &&
mdbx_linux_kernel_version < 0x02060b00)
/* Linux kernels older than version 2.6.11 ignore the addr and nbytes
* arguments, making this function fairly expensive. Therefore, the
* whole cache is always flushed. */
mdbx_flush_incoherent_mmap(env->me_map + pgno2bytes(env, flush_begin),
pgno2bytes(env, flush_end - flush_begin),
env->me_os_psize);
#endif /* Linux */
/* TODO: use flush_begin & flush_end for msync() & sync_file_range(). */
(void)flush_begin;
(void)flush_end;
return MDBX_SUCCESS;
}
/* Check txn and dbi arguments to a function */
@ -9682,8 +9877,14 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
goto fail;
}
rc = mdbx_page_flush(txn, 0);
struct mdbx_iov_ctx ctx;
mdbx_iov_init(txn, &ctx);
rc = mdbx_txn_write(txn, &ctx);
if (likely(rc == MDBX_SUCCESS))
mdbx_iov_done(txn, &ctx);
/* TODO: use ctx.flush_begin & ctx.flush_end for range-sync */
ts_3 = latency ? mdbx_osal_monotime() : 0;
if (likely(rc == MDBX_SUCCESS)) {
if (txn->mt_dbs[MAIN_DBI].md_flags & DBI_DIRTY)
txn->mt_dbs[MAIN_DBI].md_mod_txnid = txn->mt_txnid;