diff --git a/src/core.c b/src/core.c index 145b22b1..4d82daa2 100644 --- a/src/core.c +++ b/src/core.c @@ -3581,8 +3581,6 @@ static int __must_check_result mdbx_page_search(MDBX_cursor *mc, const MDBX_val *key, int flags); static int __must_check_result mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst); -static int __must_check_result mdbx_page_flush(MDBX_txn *txn, - const unsigned keep); #define MDBX_SPLIT_REPLACE MDBX_APPENDDUP /* newkey is not new */ static int __must_check_result mdbx_page_split(MDBX_cursor *mc, @@ -4591,7 +4589,8 @@ static int mdbx_page_retire_ex(MDBX_cursor *mc, const pgno_t pgno, mdbx_tassert(txn, !is_spilled); mdbx_tassert(txn, !txn->tw.spill_pages || !mdbx_pnl_exist(txn->tw.spill_pages, pgno << 1)); - mdbx_tassert(txn, debug_dpl_find(txn, pgno) == mp || txn->mt_parent); + mdbx_tassert(txn, debug_dpl_find(txn, pgno) == mp || txn->mt_parent || + (txn->mt_flags & MDBX_WRITEMAP)); } else { mdbx_tassert(txn, !debug_dpl_find(txn, pgno)); } @@ -4654,6 +4653,9 @@ status_done: * Её МОЖНО вытолкнуть в нераспределенный хвост. */ kind = "spilled"; mdbx_spill_remove(txn, si, npages); + } else if ((txn->mt_flags & MDBX_WRITEMAP)) { + kind = "writemap"; + mdbx_tassert(txn, mp && IS_MODIFIABLE(txn, mp)); } else { /* Страница аллоцирована, запачкана и возможно пролита в одной * из родительских транзакций. @@ -4809,13 +4811,8 @@ static __inline int mdbx_page_retire(MDBX_cursor *mc, MDBX_page *mp) { return mdbx_page_retire_ex(mc, mp->mp_pgno, mp, PAGETYPE(mp)); } -/* Toggle P_KEEP in dirty, non-overflow, non-sub pages watched by txn. - * - * [in] mc A cursor handle for the current operation. - * [in] pflags Flags of the pages to update: - * - 0 to set P_KEEP, - * - P_KEEP to clear it. */ -static void mdbx_cursor_xkeep(MDBX_cursor *mc, unsigned pflags) { +/* Set P_KEEP in dirty, non-overflow, non-sub pages watched by txn. */ +static void mdbx_cursor_keep(MDBX_cursor *mc) { const unsigned mask = P_SUBP | P_LOOSE | P_KEEP | P_SPILLED; if (mc->mc_flags & C_INITIALIZED) { MDBX_cursor *m3 = mc; @@ -4823,8 +4820,8 @@ static void mdbx_cursor_xkeep(MDBX_cursor *mc, unsigned pflags) { MDBX_page *mp = NULL; for (unsigned j = 0; j < m3->mc_snum; j++) { mp = m3->mc_pg[j]; - if (IS_MODIFIABLE(mc->mc_txn, mp) && (mp->mp_flags & mask) == pflags) - mp->mp_flags ^= P_KEEP; + if (IS_MODIFIABLE(mc->mc_txn, mp) && !(mp->mp_flags & mask)) + mp->mp_flags |= P_KEEP; } if (!(mp && IS_LEAF(mp))) break; @@ -4845,38 +4842,212 @@ static void mdbx_cursor_xkeep(MDBX_cursor *mc, unsigned pflags) { } } -/* Mark pages seen by cursors: First m0, then tracked cursors - * [in] all No shortcuts. Needed except after a full mdbx_page_flush(). */ -static void mdbx_txn_xkeep(MDBX_txn *txn, MDBX_cursor *m0, - const unsigned pflags, const bool all) { +static void mdbx_txn_keep(MDBX_txn *txn, MDBX_cursor *m0) { if (m0) - mdbx_cursor_xkeep(m0, pflags); + mdbx_cursor_keep(m0); for (unsigned i = FREE_DBI; i < txn->mt_numdbs; ++i) if (txn->mt_dbistate[i] & DBI_DIRTY) for (MDBX_cursor *mc = txn->tw.cursors[i]; mc; mc = mc->mc_next) if (mc != m0) - mdbx_cursor_xkeep(mc, pflags); + mdbx_cursor_keep(mc); - if (all) { - /* Mark dirty root pages */ - const unsigned mask = P_SUBP | P_LOOSE | P_KEEP | P_SPILLED; - for (unsigned i = 0; i < txn->mt_numdbs; i++) { - if (txn->mt_dbistate[i] & DBI_DIRTY) { - pgno_t pgno = txn->mt_dbs[i].md_root; - if (pgno == P_INVALID) - continue; - unsigned di = mdbx_dpl_exist(txn, pgno); - if (di) { - MDBX_page *dp = txn->tw.dirtylist->items[di].ptr; - if ((dp->mp_flags & mask) == pflags) - dp->mp_flags ^= P_KEEP; - } + /* Mark dirty root pages */ + const unsigned mask = P_SUBP | P_LOOSE | P_KEEP | P_SPILLED; + for (unsigned i = 0; i < txn->mt_numdbs; i++) { + if (txn->mt_dbistate[i] & DBI_DIRTY) { + pgno_t pgno = txn->mt_dbs[i].md_root; + if (pgno == P_INVALID) + continue; + unsigned di = mdbx_dpl_exist(txn, pgno); + if (di) { + MDBX_page *dp = txn->tw.dirtylist->items[di].ptr; + if (!(dp->mp_flags & mask)) + dp->mp_flags |= P_KEEP; } } } } +/* Returns the spilling priority (0..255) for a dirty page: + * 0 = should be spilled; + * ... + * > 255 = must not be spilled. */ +static unsigned spill_prio(const MDBX_txn *txn, const unsigned i, + const unsigned lru_min, const unsigned reciprocal) { + MDBX_dpl *const dl = txn->tw.dirtylist; + const pgno_t pgno = dl->items[i].pgno; + MDBX_page *const dp = dl->items[i].ptr; + const unsigned lru = dl->items[i].lru; + const unsigned npages = dpl_npages(dl, i); + if (dp->mp_flags & (P_LOOSE | P_KEEP | P_SPILLED)) { + mdbx_debug("skip %s %u page %" PRIaPGNO, + (dp->mp_flags & P_LOOSE) + ? "loose" + : (dp->mp_flags & P_LOOSE) + ? "loose" + : (dp->mp_flags & P_SPILLED) ? "parent-spilled" + : "keep", + npages, pgno); + return 256; + } + + /* Can't spill twice, + * make sure it's not already in a parent's spill list(s). */ + MDBX_txn *parent = txn->mt_parent; + if (parent && (parent->mt_flags & MDBX_TXN_SPILLS)) { + do + if (parent->tw.spill_pages && + mdbx_pnl_intersect(parent->tw.spill_pages, pgno << 1, npages << 1)) { + mdbx_debug("skip-2 parent-spilled %u page %" PRIaPGNO, npages, pgno); + dp->mp_flags |= P_SPILLED; + return 256; + } + while ((parent = parent->mt_parent) != nullptr); + } + + unsigned prio = 1 + ((lru - lru_min) * reciprocal >> 8); + mdbx_tassert(txn, prio > 0 && prio < 256); + if (npages > 1) { + /* makes a large/overflow pages be likely to spill */ + uint32_t x = npages | npages >> 16; + x |= x >> 8; + x |= x >> 4; + x |= x >> 2; + x |= x >> 1; + prio = (255 - prio) * log2n_powerof2(x + 1) + 157; + prio = (prio < 256) ? 255 - prio : 0; + mdbx_tassert(txn, prio < 256); + } + return prio; +} + +struct mdbx_iov_ctx { + unsigned iov_items; + size_t iov_bytes; + size_t iov_off; + pgno_t flush_begin; + pgno_t flush_end; + struct iovec iov[MDBX_COMMIT_PAGES]; +}; + +static __inline void mdbx_iov_init(MDBX_txn *const txn, + struct mdbx_iov_ctx *ctx) { + ctx->flush_begin = MAX_PAGENO; + ctx->flush_end = MIN_PAGENO; + ctx->iov_items = 0; + ctx->iov_bytes = 0; + ctx->iov_off = 0; + (void)txn; +} + +static __inline void mdbx_iov_done(MDBX_txn *const txn, + struct mdbx_iov_ctx *ctx) { + mdbx_tassert(txn, ctx->iov_items == 0); +#if defined(__linux__) || defined(__gnu_linux__) + MDBX_env *const env = txn->mt_env; + if (!(txn->mt_flags & MDBX_WRITEMAP) && + mdbx_linux_kernel_version < 0x02060b00) + /* Linux kernels older than version 2.6.11 ignore the addr and nbytes + * arguments, making this function fairly expensive. Therefore, the + * whole cache is always flushed. */ + mdbx_flush_incoherent_mmap( + env->me_map + pgno2bytes(env, ctx->flush_begin), + pgno2bytes(env, ctx->flush_end - ctx->flush_begin), env->me_os_psize); +#endif /* Linux */ +} + +static int mdbx_iov_write(MDBX_txn *const txn, struct mdbx_iov_ctx *ctx) { + mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP)); + mdbx_tassert(txn, ctx->iov_items > 0); + + MDBX_env *const env = txn->mt_env; + int rc; + if (likely(ctx->iov_items == 1)) { + mdbx_assert(env, ctx->iov_bytes == (size_t)ctx->iov[0].iov_len); + rc = mdbx_pwrite(env->me_lazy_fd, ctx->iov[0].iov_base, ctx->iov[0].iov_len, + ctx->iov_off); + } else { + rc = mdbx_pwritev(env->me_lazy_fd, ctx->iov, ctx->iov_items, ctx->iov_off, + ctx->iov_bytes); + } + + if (unlikely(rc != MDBX_SUCCESS)) + mdbx_error("Write error: %s", mdbx_strerror(rc)); + else { + VALGRIND_MAKE_MEM_DEFINED(txn->mt_env->me_map + ctx->iov_off, + ctx->iov_bytes); + ASAN_UNPOISON_MEMORY_REGION(txn->mt_env->me_map + ctx->iov_off, + ctx->iov_bytes); + } + + for (unsigned i = 0; i < ctx->iov_items; i++) + mdbx_dpage_free(env, (MDBX_page *)ctx->iov[i].iov_base, + bytes2pgno(env, ctx->iov[i].iov_len)); + ctx->iov_items = 0; + ctx->iov_bytes = 0; + return rc; +} + +static int iov_page(MDBX_txn *txn, struct mdbx_iov_ctx *ctx, MDBX_page *dp, + unsigned npages) { + MDBX_env *const env = txn->mt_env; + mdbx_tassert(txn, + dp->mp_pgno >= MIN_PAGENO && dp->mp_pgno < txn->mt_next_pgno); + mdbx_tassert(txn, IS_MODIFIABLE(txn, dp)); + mdbx_tassert(txn, + !(dp->mp_flags & ~(P_BRANCH | P_LEAF | P_LEAF2 | P_OVERFLOW))); + + ctx->flush_begin = + (ctx->flush_begin < dp->mp_pgno) ? ctx->flush_begin : dp->mp_pgno; + ctx->flush_end = (ctx->flush_end > dp->mp_pgno + npages) + ? ctx->flush_end + : dp->mp_pgno + npages; + env->me_unsynced_pages->weak += npages; + + if (IS_SHADOWED(txn, dp)) { + mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP)); + dp->mp_txnid = txn->mt_txnid; + mdbx_tassert(txn, IS_SPILLED(txn, dp)); + const size_t size = pgno2bytes(env, npages); + if (ctx->iov_off + ctx->iov_bytes != pgno2bytes(env, dp->mp_pgno) || + ctx->iov_items == ARRAY_LENGTH(ctx->iov) || + ctx->iov_bytes + size > MAX_WRITE) { + if (ctx->iov_items) { + int err = mdbx_iov_write(txn, ctx); + if (unlikely(err != MDBX_SUCCESS)) + return err; +#if defined(__linux__) || defined(__gnu_linux__) + if (mdbx_linux_kernel_version >= 0x02060b00) + /* Linux kernels older than version 2.6.11 ignore the addr and nbytes + * arguments, making this function fairly expensive. Therefore, the + * whole cache is always flushed. */ +#endif /* Linux */ + mdbx_flush_incoherent_mmap(env->me_map + ctx->iov_off, ctx->iov_bytes, + env->me_os_psize); + } + ctx->iov_off = pgno2bytes(env, dp->mp_pgno); + } + ctx->iov[ctx->iov_items].iov_base = (void *)dp; + ctx->iov[ctx->iov_items].iov_len = size; + ctx->iov_items += 1; + ctx->iov_bytes += size; + } else { + mdbx_tassert(txn, txn->mt_flags & MDBX_WRITEMAP); + } + return MDBX_SUCCESS; +} + +static int spill_page(MDBX_txn *txn, struct mdbx_iov_ctx *ctx, MDBX_page *dp, + unsigned npages) { + mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP)); + pgno_t pgno = dp->mp_pgno; + int err = iov_page(txn, ctx, dp, npages); + if (likely(err == MDBX_SUCCESS)) + err = mdbx_pnl_append_range(true, &txn->tw.spill_pages, pgno << 1, npages); + return err; +} + /* Spill pages from the dirty list back to disk. * This is intended to prevent running into MDBX_TXN_FULL situations, * but note that they may still occur in a few cases: @@ -4933,13 +5104,60 @@ static int mdbx_txn_spill(MDBX_txn *txn, MDBX_cursor *m0, unsigned need) { wanna_spill, txn->tw.dirtyroom, need); mdbx_tassert(txn, txn->tw.dirtylist->length >= wanna_spill); - int rc; + struct mdbx_iov_ctx ctx; + mdbx_iov_init(txn, &ctx); + int rc = MDBX_SUCCESS; + if (txn->mt_flags & MDBX_WRITEMAP) { + MDBX_dpl *const dl = txn->tw.dirtylist; +#ifndef MDBX_FAKE_SPILL_WRITEMAP +#define MDBX_FAKE_SPILL_WRITEMAP 1 +#endif + const unsigned span = dl->length - txn->tw.loose_count; + txn->tw.dirtyroom += span; + if (MDBX_FAKE_SPILL_WRITEMAP) { + txn->mt_env->me_unsynced_pages->weak += span; + dpl_clear(dl); + for (MDBX_page *loose = txn->tw.loose_pages; loose; + loose = loose->mp_next) { + rc = mdbx_dpl_append(txn, loose->mp_pgno, loose, 1); + mdbx_tassert(txn, rc == MDBX_SUCCESS); + } + mdbx_tassert(txn, mdbx_dirtylist_check(txn)); + } else { + unsigned r, w; + for (w = 0, r = 1; r <= dl->length; ++r) { + MDBX_page *dp = dl->items[r].ptr; + if (dp->mp_flags & P_LOOSE) + dl->items[++w] = dl->items[r]; + else { + rc = iov_page(txn, &ctx, dp, dpl_npages(dl, r)); + mdbx_tassert(txn, rc == MDBX_SUCCESS); + } + } + + mdbx_tassert(txn, span == r - 1 - w && w == txn->tw.loose_count); + dl->sorted = (dl->sorted == dl->length) ? w : 0; + dpl_setlen(dl, w); + mdbx_tassert(txn, mdbx_dirtylist_check(txn)); + + MDBX_env *const env = txn->mt_env; + rc = mdbx_msync(&env->me_dxb_mmap, + pgno_align2os_bytes(env, ctx.flush_begin), + pgno_align2os_bytes(env, ctx.flush_end - ctx.flush_begin), + MDBX_SYNC_NONE); + } + return rc; + } + + mdbx_tassert(txn, !(txn->mt_flags & MDBX_WRITEMAP)); if (!txn->tw.spill_pages) { txn->tw.spill_least_removed = INT_MAX; txn->tw.spill_pages = mdbx_pnl_alloc(wanna_spill); if (unlikely(!txn->tw.spill_pages)) { rc = MDBX_ENOMEM; - goto bailout; + bailout: + txn->mt_flags |= MDBX_TXN_ERROR; + return rc; } } else { /* purge deleted slots */ @@ -4950,75 +5168,153 @@ static int mdbx_txn_spill(MDBX_txn *txn, MDBX_cursor *m0, unsigned need) { ; } - /* Preserve pages which may soon be dirtied again */ - mdbx_txn_xkeep(txn, m0, 0, true); - + /* Сортируем чтобы запись на диск была полее последовательна */ MDBX_dpl *const dl = mdbx_dpl_sort(txn); - /* Save the page IDs of all the pages we're flushing */ - /* flush from the tail forward, this saves a lot of shifting later on. */ - const unsigned dl_len_before = dl->length; - unsigned spilled = 0; - unsigned keep = dl_len_before; - for (; keep && wanna_spill; keep--) { - const pgno_t pgno = dl->items[keep].pgno; - MDBX_page *dp = dl->items[keep].ptr; - const unsigned npages = dpl_npages(dl, keep); - if (dp->mp_flags & (P_LOOSE | P_KEEP | P_SPILLED)) { - mdbx_debug("skip %s %u page %" PRIaPGNO, - (dp->mp_flags & P_LOOSE) - ? "loose" - : (dp->mp_flags & P_SPILLED) ? "parent-spilled" : "keep", - npages, dp->mp_pgno); - skip: - continue; - } - /* Can't spill twice, - * make sure it's not already in a parent's spill list(s). */ - MDBX_txn *parent = txn->mt_parent; - if (parent && (parent->mt_flags & MDBX_TXN_SPILLS)) { - do - if (parent->tw.spill_pages && - mdbx_pnl_intersect(parent->tw.spill_pages, pgno << 1, - npages << 1)) { - mdbx_debug("skip-2 parent-spilled %u page %" PRIaPGNO, npages, pgno); - dp->mp_flags |= P_SPILLED; - goto skip; - } - while ((parent = parent->mt_parent) != nullptr); - } - mdbx_debug("spill %u page %" PRIaPGNO, npages, dp->mp_pgno); - rc = mdbx_pnl_append_range(true, &txn->tw.spill_pages, pgno << 1, npages); - if (unlikely(rc != MDBX_SUCCESS)) - goto bailout; - wanna_spill--; - spilled += 1; - } - mdbx_pnl_sort(txn->tw.spill_pages); - /* Flush the spilled part of dirty list */ - rc = mdbx_page_flush(txn, keep); + /* Preserve pages which may soon be dirtied again */ + mdbx_txn_keep(txn, m0); + + /* Подзадача: Вытолкнуть часть страниц на диск в соответствии с LRU, + * но при этом учесть важные поправки: + * - лучше выталкивать старые large/overflow страницы, так будет освобождено + * больше памяти, а также так как они (в текущем понимании) гораздо реже + * повторно изменяются; + * - при прочих равных лучше выталкивать смежные страницы, так будет + * меньше I/O операций; + * - желательно потратить на это меньше времени чем std::partial_sort_copy; + * + * Решение: + * - Квантуем весь диапазон lru-меток до 256 значений и задействуем один + * проход 8-битного radix-sort. В результате получаем 256 уровней + * "свежести", в том числе значение lru-метки, старее которой страницы + * должны быть выгружены; + * - Двигаемся последовательно в сторону увеличения номеров страниц + * и выталкиваем страницы с lru-меткой старее отсекающего значения, + * пока не вытолкнем достаточно; + * - Встречая страницы смежные с выталкиваемыми для уменьшения кол-ва + * I/O операций выталкиваем и их, если они попадают в первую половину + * между выталкиваемыми и с самыми свежими lru-метками; + * - дополнительно при сортировке умышленно старим large/overflow страницы, + * тем самым повышая их шансы на выталкивание. */ + + /* get min/max of LRU-labels */ + unsigned lru_min = dl->items[1].lru, lru_max = lru_min; + for (unsigned i = 2; i <= dl->length; ++i) { + lru_min = (lru_min < dl->items[i].lru) ? lru_min : dl->items[i].lru; + lru_max = (lru_max > dl->items[i].lru) ? lru_max : dl->items[i].lru; + } + + /* half of 8-bit radix-sort */ + unsigned radix_counters[256], spillable = 0; + memset(&radix_counters, 0, sizeof(radix_counters)); + unsigned const reciprocal = 255 * 256 / (lru_max - lru_min + 1); + for (unsigned i = 1; i <= dl->length; ++i) { + unsigned prio = spill_prio(txn, i, lru_min, reciprocal); + if (prio < 256) { + radix_counters[prio] += 1; + spillable += 1; + } + } + + unsigned prio2spill = 0, prio2adjacent = 127, amount = radix_counters[0]; + for (unsigned i = 1; i < 256; i++) { + if (amount < wanna_spill) { + prio2spill = i; + prio2adjacent = i + (255 - i) / 2; + amount += radix_counters[i]; + } else if (amount + amount < spillable + wanna_spill + /* РАВНОЗНАЧНО: amount - wanna_spill < spillable - amount */) { + prio2adjacent = i; + amount += radix_counters[i]; + } else + break; + } + + unsigned prev_prio = 256, spilled = 0; + unsigned r, w, prio; + for (w = 0, r = 1; r <= dl->length && spilled < wanna_spill; + prev_prio = prio, ++r) { + prio = spill_prio(txn, r, lru_min, reciprocal); + MDBX_page *const dp = dl->items[r].ptr; + if (prio < prio2adjacent) { + const pgno_t pgno = dl->items[r].pgno; + const unsigned npages = dpl_npages(dl, r); + if (prio <= prio2spill) { + if (prev_prio < prio2adjacent && prev_prio > prio2spill && + dpl_endpgno(dl, r - 1) == pgno) { + mdbx_debug("co-spill %u prev-adjacent page %" PRIaPGNO + " (lru-dist %d, prio %u)", + dpl_npages(dl, w), dl->items[r - 1].pgno, + txn->tw.dirtylru - dl->items[r - 1].lru, prev_prio); + --w; + rc = spill_page(txn, &ctx, dl->items[r - 1].ptr, + dpl_npages(dl, r - 1)); + if (unlikely(rc != MDBX_SUCCESS)) + break; + ++spilled; + } + + mdbx_debug("spill %u page %" PRIaPGNO " (lru-dist %d, prio %u)", npages, + dp->mp_pgno, txn->tw.dirtylru - dl->items[r].lru, prio); + rc = spill_page(txn, &ctx, dp, npages); + if (unlikely(rc != MDBX_SUCCESS)) + break; + ++spilled; + continue; + } + + if (prev_prio <= prio2spill && dpl_endpgno(dl, r - 1) == pgno) { + mdbx_debug("co-spill %u next-adjacent page %" PRIaPGNO + " (lru-dist %d, prio %u)", + npages, dp->mp_pgno, txn->tw.dirtylru - dl->items[r].lru, + prio); + rc = spill_page(txn, &ctx, dp, npages); + if (unlikely(rc != MDBX_SUCCESS)) + break; + prio = prev_prio /* to continue co-spilling next adjacent pages */; + ++spilled; + continue; + } + } + if (unlikely(prio > 255 && (dp->mp_flags & P_KEEP))) + /* Reset any dirty pages we kept that page_flush didn't see */ + dp->mp_flags -= P_KEEP; + dl->items[++w] = dl->items[r]; + } + + while (r <= dl->length) { + MDBX_page *const dp = dl->items[r].ptr; + if (unlikely(dp->mp_flags & P_KEEP)) + /* Reset any dirty pages we kept that page_flush didn't see */ + dp->mp_flags -= P_KEEP; + dl->items[++w] = dl->items[r++]; + } + mdbx_tassert(txn, r - 1 - w == spilled); + if (unlikely(spilled == 0)) { + mdbx_tassert(txn, ctx.iov_items == 0 && rc == MDBX_SUCCESS); + return MDBX_SUCCESS; + } + dl->sorted = dpl_setlen(dl, w); + txn->tw.dirtyroom += spilled; + mdbx_tassert(txn, mdbx_dirtylist_check(txn)); + + if (ctx.iov_items) + rc = mdbx_iov_write(txn, &ctx); + if (unlikely(rc != MDBX_SUCCESS)) goto bailout; - /* Reset any dirty pages we kept that page_flush didn't see */ - mdbx_tassert(txn, dl_len_before - spilled == dl->length); - mdbx_txn_xkeep(txn, m0, P_KEEP, keep > 0); - mdbx_tassert(txn, mdbx_dirtylist_check(txn)); - + mdbx_pnl_sort(txn->tw.spill_pages); + txn->mt_flags |= MDBX_TXN_SPILLS; mdbx_notice("spilled %u dirty-entries, now have %u dirty-room", spilled, txn->tw.dirtyroom); - -bailout: - txn->mt_flags |= rc ? MDBX_TXN_ERROR : MDBX_TXN_SPILLS; - return rc; + mdbx_iov_done(txn, &ctx); + return MDBX_SUCCESS; } static int mdbx_cursor_spill(MDBX_cursor *mc, const MDBX_val *key, const MDBX_val *data) { MDBX_txn *txn = mc->mc_txn; - if (txn->mt_flags & MDBX_WRITEMAP) - return MDBX_SUCCESS; - /* Estimate how much space this operation will take: */ /* 1) Max b-tree height, reasonable enough with including dups' sub-tree */ unsigned need = CURSOR_STACK + 3; @@ -8937,136 +9233,35 @@ bailout_notracking: return rc; } -static int mdbx_flush_iov(MDBX_txn *const txn, struct iovec *iov, - unsigned iov_items, size_t iov_off, - size_t iov_bytes) { - MDBX_env *const env = txn->mt_env; - mdbx_assert(env, iov_items > 0); - int rc; - if (likely(iov_items == 1)) { - mdbx_assert(env, iov_bytes == (size_t)iov->iov_len); - rc = mdbx_pwrite(env->me_lazy_fd, iov->iov_base, iov_bytes, iov_off); - } else { - rc = mdbx_pwritev(env->me_lazy_fd, iov, iov_items, iov_off, iov_bytes); - } - - if (unlikely(rc != MDBX_SUCCESS)) - mdbx_error("Write error: %s", mdbx_strerror(rc)); - else { - VALGRIND_MAKE_MEM_DEFINED(txn->mt_env->me_map + iov_off, iov_bytes); - ASAN_UNPOISON_MEMORY_REGION(txn->mt_env->me_map + iov_off, iov_bytes); - } - - for (unsigned i = 0; i < iov_items; i++) - mdbx_dpage_free(env, (MDBX_page *)iov[i].iov_base, - bytes2pgno(env, iov[i].iov_len)); - return rc; -} - -/* Flush (some) dirty pages to the map, after clearing their dirty flag. - * [in] txn the transaction that's being committed - * [in] keep number of initial pages in dirtylist to keep dirty. - * Returns 0 on success, non-zero on failure. */ -__hot static int mdbx_page_flush(MDBX_txn *txn, const unsigned keep) { - struct iovec iov[MDBX_COMMIT_PAGES]; - MDBX_dpl *const dl = mdbx_dpl_sort(txn); - MDBX_env *const env = txn->mt_env; - pgno_t flush_begin = MAX_PAGENO; - pgno_t flush_end = MIN_PAGENO; +static int mdbx_txn_write(MDBX_txn *txn, struct mdbx_iov_ctx *ctx) { + MDBX_dpl *const dl = + (txn->mt_flags & MDBX_WRITEMAP) ? txn->tw.dirtylist : mdbx_dpl_sort(txn); int rc = MDBX_SUCCESS; - unsigned iov_items = 0; - size_t iov_bytes = 0; - size_t iov_off = 0; - unsigned r, w; - for (r = w = keep; ++r <= dl->length;) { + for (w = 0, r = 1; r <= dl->length; ++r) { MDBX_page *dp = dl->items[r].ptr; - mdbx_tassert(txn, - dp->mp_pgno >= MIN_PAGENO && dp->mp_pgno < txn->mt_next_pgno); - mdbx_tassert(txn, dp->mp_flags == P_LOOSE || IS_MODIFIABLE(txn, dp)); - - /* Don't flush this page yet */ - if (dp->mp_flags & P_KEEP) { - dp->mp_flags -= P_KEEP; + if (dp->mp_flags & P_LOOSE) { dl->items[++w] = dl->items[r]; continue; } - if (dp->mp_flags & (P_LOOSE | P_SPILLED)) { - dl->items[++w] = dl->items[r]; - continue; - } - - const unsigned npages = dpl_npages(dl, r); - flush_begin = (flush_begin < dp->mp_pgno) ? flush_begin : dp->mp_pgno; - flush_end = - (flush_end > dp->mp_pgno + npages) ? flush_end : dp->mp_pgno + npages; - env->me_unsynced_pages->weak += npages; - - if (IS_SHADOWED(txn, dp)) { - dp->mp_txnid = txn->mt_txnid; - mdbx_tassert(txn, IS_SPILLED(txn, dp)); - const size_t size = pgno2bytes(env, npages); - if (iov_off + iov_bytes != pgno2bytes(env, dp->mp_pgno) || - iov_items == ARRAY_LENGTH(iov) || iov_bytes + size > MAX_WRITE) { - if (iov_items) { - rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes); -#if defined(__linux__) || defined(__gnu_linux__) - if (mdbx_linux_kernel_version >= 0x02060b00) - /* Linux kernels older than version 2.6.11 ignore the addr and nbytes - * arguments, making this function fairly expensive. Therefore, the - * whole cache is always flushed. */ -#endif /* Linux */ - mdbx_flush_incoherent_mmap(env->me_map + iov_off, iov_bytes, - env->me_os_psize); - iov_items = 0; - iov_bytes = 0; - if (unlikely(rc != MDBX_SUCCESS)) { - do - dl->items[++w] = dl->items[r]; - while (++r <= dl->length); - break; - } - } - iov_off = pgno2bytes(env, dp->mp_pgno); - } - iov[iov_items].iov_base = (void *)dp; - iov[iov_items].iov_len = size; - iov_items += 1; - iov_bytes += size; - } + unsigned npages = dpl_npages(dl, r); + rc = iov_page(txn, ctx, dp, npages); + if (unlikely(rc != MDBX_SUCCESS)) + break; } - mdbx_tassert(txn, dl->sorted == dl->length && r == dl->length + 1); - txn->tw.dirtyroom += dl->length - w; - assert(txn->tw.dirtyroom <= txn->mt_env->me_options.dp_limit); + if (ctx->iov_items) + rc = mdbx_iov_write(txn, ctx); + + while (r <= dl->length) + dl->items[++w] = dl->items[r++]; + dl->sorted = dpl_setlen(dl, w); - mdbx_tassert(txn, txn->mt_parent || - txn->tw.dirtyroom + txn->tw.dirtylist->length == - txn->mt_env->me_options.dp_limit); - - if (iov_items) - rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes); - - if (unlikely(rc != MDBX_SUCCESS)) { - txn->mt_flags |= MDBX_TXN_ERROR; - return rc; - } - -#if defined(__linux__) || defined(__gnu_linux__) - if ((env->me_flags & MDBX_WRITEMAP) == 0 && - mdbx_linux_kernel_version < 0x02060b00) - /* Linux kernels older than version 2.6.11 ignore the addr and nbytes - * arguments, making this function fairly expensive. Therefore, the - * whole cache is always flushed. */ - mdbx_flush_incoherent_mmap(env->me_map + pgno2bytes(env, flush_begin), - pgno2bytes(env, flush_end - flush_begin), - env->me_os_psize); -#endif /* Linux */ - - /* TODO: use flush_begin & flush_end for msync() & sync_file_range(). */ - (void)flush_begin; - (void)flush_end; - return MDBX_SUCCESS; + txn->tw.dirtyroom += r - 1 - w; + mdbx_tassert(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length == + (txn->mt_parent ? txn->mt_parent->tw.dirtyroom + : txn->mt_env->me_options.dp_limit)); + return rc; } /* Check txn and dbi arguments to a function */ @@ -9682,8 +9877,14 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) { goto fail; } - rc = mdbx_page_flush(txn, 0); + struct mdbx_iov_ctx ctx; + mdbx_iov_init(txn, &ctx); + rc = mdbx_txn_write(txn, &ctx); + if (likely(rc == MDBX_SUCCESS)) + mdbx_iov_done(txn, &ctx); + /* TODO: use ctx.flush_begin & ctx.flush_end for range-sync */ ts_3 = latency ? mdbx_osal_monotime() : 0; + if (likely(rc == MDBX_SUCCESS)) { if (txn->mt_dbs[MAIN_DBI].md_flags & DBI_DIRTY) txn->mt_dbs[MAIN_DBI].md_mod_txnid = txn->mt_txnid;