diff --git a/src/elements/core.c b/src/elements/core.c index eee02e60..90ff00f4 100644 --- a/src/elements/core.c +++ b/src/elements/core.c @@ -1685,6 +1685,8 @@ static int __must_check_result mdbx_page_search(MDBX_cursor *mc, MDBX_val *key, int flags); static int __must_check_result mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst); +static int __must_check_result mdbx_page_flush(MDBX_txn *txn, + const unsigned keep); #define MDBX_SPLIT_REPLACE MDBX_APPENDDUP /* newkey is not new */ static int __must_check_result mdbx_page_split(MDBX_cursor *mc, @@ -2488,8 +2490,6 @@ mark_done: return rc; } -static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep); - /* Spill pages from the dirty list back to disk. * This is intended to prevent running into MDBX_TXN_FULL situations, * but note that they may still occur in a few cases: @@ -2525,12 +2525,12 @@ static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep); * * Returns 0 on success, non-zero on failure. */ static int mdbx_page_spill(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data) { - MDBX_txn *txn = mc->mc_txn; - MDBX_DPL dl = txn->tw.dirtylist; - if (mc->mc_flags & C_SUB) return MDBX_SUCCESS; + MDBX_txn *txn = mc->mc_txn; + MDBX_DPL dl = txn->tw.dirtylist; + /* Estimate how much space this op will take */ pgno_t i = mc->mc_db->md_depth; /* Named DBs also dirty the main DB */ @@ -5559,123 +5559,114 @@ bailout_notracking: return rc; } +static int mdbx_flush_iov(MDBX_txn *const txn, struct iovec *iov, + unsigned iov_items, size_t iov_off, + size_t iov_bytes) { + MDBX_env *const env = txn->mt_env; + int rc = mdbx_pwritev(env->me_fd, iov, iov_items, iov_off, iov_bytes); + if (unlikely(rc != MDBX_SUCCESS)) { + mdbx_error("Write error: %s", mdbx_strerror(rc)); + txn->mt_flags |= MDBX_TXN_ERROR; + } + + for (unsigned i = 0; i < iov_items; i++) + mdbx_dpage_free(env, (MDBX_page *)iov[i].iov_base, + bytes2pgno(env, iov[i].iov_len)); + + return rc; +} + /* Flush (some) dirty pages to the map, after clearing their dirty flag. * [in] txn the transaction that's being committed * [in] keep number of initial pages in dirtylist to keep dirty. * Returns 0 on success, non-zero on failure. */ -static int mdbx_page_flush(MDBX_txn *txn, pgno_t keep) { - MDBX_env *env = txn->mt_env; - const MDBX_DPL dl = txn->tw.dirtylist; - unsigned i, j, pagecount = dl->length; - int rc; - size_t size = 0, pos = 0; - pgno_t pgno = 0; - MDBX_page *dp = NULL; +static int mdbx_page_flush(MDBX_txn *txn, const unsigned keep) { struct iovec iov[MDBX_COMMIT_PAGES]; - intptr_t wpos = 0, wsize = 0; - size_t next_pos = 1; /* impossible pos, so pos != next_pos */ - int n = 0; + const MDBX_DPL dl = (keep || txn->tw.loose_count > 1) + ? mdbx_dpl_sort(txn->tw.dirtylist) + : txn->tw.dirtylist; + MDBX_env *const env = txn->mt_env; + pgno_t flush_begin = MAX_PAGENO; + pgno_t flush_end = MIN_PAGENO; + unsigned iov_items = 0; + size_t iov_bytes = 0; + size_t iov_off = 0; + unsigned r, w; + for (r = w = keep; ++r <= dl->length;) { + MDBX_page *dp = dl[r].ptr; + mdbx_tassert(txn, + dp->mp_pgno >= MIN_PAGENO && dp->mp_pgno < txn->mt_next_pgno); + mdbx_tassert(txn, dp->mp_flags & P_DIRTY); - j = i = keep; - - if (env->me_flags & MDBX_WRITEMAP) { - /* Clear dirty flags */ - while (++i <= pagecount) { - dp = dl[i].ptr; - /* Don't flush this page yet */ - if (dp->mp_flags & (P_LOOSE | P_KEEP)) { - dp->mp_flags &= ~P_KEEP; - dl[++j] = dl[i]; - continue; - } - dp->mp_flags &= ~P_DIRTY; - dp->mp_validator = 0 /* TODO */; - *env->me_unsynced_pages += IS_OVERFLOW(dp) ? dp->mp_pages : 1; + /* Don't flush this page yet */ + if (dp->mp_flags & (P_LOOSE | P_KEEP)) { + dp->mp_flags &= ~P_KEEP; + dl[++w] = dl[r]; + continue; } - goto done; - } - /* Write the pages */ - for (;;) { - if (++i <= pagecount) { - dp = dl[i].ptr; - /* Don't flush this page yet */ - if (dp->mp_flags & (P_LOOSE | P_KEEP)) { - dp->mp_flags &= ~P_KEEP; - dl[i].pgno = 0; - continue; - } - pgno = dl[i].pgno; - mdbx_tassert(txn, pgno >= MIN_PAGENO); - /* clear dirty flag */ - dp->mp_flags &= ~P_DIRTY; - dp->mp_validator = 0 /* TODO */; - pos = pgno2bytes(env, pgno); - const unsigned npages = IS_OVERFLOW(dp) ? dp->mp_pages : 1; - *env->me_unsynced_pages += npages; - size = pgno2bytes(env, npages); - } - /* Write up to MDBX_COMMIT_PAGES dirty pages at a time. */ - if (pos != next_pos || n == MDBX_COMMIT_PAGES || wsize + size > MAX_WRITE) { - if (n) { - /* Write previous page(s) */ - rc = mdbx_pwritev(env->me_fd, iov, n, wpos, wsize); - if (unlikely(rc != MDBX_SUCCESS)) { - mdbx_debug("Write error: %s", mdbx_strerror(rc)); - return rc; - } + const unsigned npages = IS_OVERFLOW(dp) ? dp->mp_pages : 1; + flush_begin = (flush_begin < dp->mp_pgno) ? flush_begin : dp->mp_pgno; + flush_end = + (flush_end > dp->mp_pgno + npages) ? flush_end : dp->mp_pgno + npages; + *env->me_unsynced_pages += npages; + dp->mp_flags &= ~P_DIRTY; + dp->mp_validator = 0 /* TODO */; + if ((env->me_flags & MDBX_WRITEMAP) == 0) { + const size_t size = pgno2bytes(env, npages); + if (iov_off + iov_bytes != pgno2bytes(env, dp->mp_pgno) || + iov_items == ARRAY_LENGTH(iov) || iov_bytes + size > MAX_WRITE) { + if (iov_items) { + int rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; #if MDBX_CPU_CACHE_MMAP_NONCOHERENT #if defined(__linux__) || defined(__gnu_linux__) - if (mdbx_linux_kernel_version >= 0x02060b00) - /* Linux kernels older than version 2.6.11 ignore the addr and nbytes - * arguments, making this function fairly expensive. Therefore, the - * whole cache is always flushed. */ + if (mdbx_linux_kernel_version >= 0x02060b00) + /* Linux kernels older than version 2.6.11 ignore the addr and nbytes + * arguments, making this function fairly expensive. Therefore, the + * whole cache is always flushed. */ #endif /* Linux */ - mdbx_invalidate_mmap_noncoherent_cache(env->me_map + wpos, wsize); + mdbx_invalidate_mmap_noncoherent_cache(env->me_map + iov_off, + iov_bytes); #endif /* MDBX_CPU_CACHE_MMAP_NONCOHERENT */ - - n = 0; + iov_items = 0; + iov_bytes = 0; + } + iov_off = pgno2bytes(env, dp->mp_pgno); } - if (i > pagecount) - break; - wpos = pos; - wsize = 0; + iov[iov_items].iov_base = dp; + iov[iov_items].iov_len = size; + iov_items += 1; + iov_bytes += size; } - mdbx_debug("committing page %" PRIaPGNO, pgno); - next_pos = pos + size; - iov[n].iov_len = size; - iov[n].iov_base = (char *)dp; - wsize += size; - n++; + } + + if (iov_items) { + int rc = mdbx_flush_iov(txn, iov, iov_items, iov_off, iov_bytes); + if (unlikely(rc != MDBX_SUCCESS)) + return rc; } #if MDBX_CPU_CACHE_MMAP_NONCOHERENT && \ (defined(__linux__) || defined(__gnu_linux__)) - if (mdbx_linux_kernel_version < 0x02060b00) { + if ((env->me_flags & MDBX_WRITEMAP) == 0 && + mdbx_linux_kernel_version < 0x02060b00) /* Linux kernels older than version 2.6.11 ignore the addr and nbytes - * arguments, making this function fairly expensive. Therefore, the whole - * cache is always flushed. */ - mdbx_invalidate_mmap_noncoherent_cache(env->me_map, - pgno2bytes(env, txn->mt_next_pgno)); - } + * arguments, making this function fairly expensive. Therefore, the + * whole cache is always flushed. */ + mdbx_invalidate_mmap_noncoherent_cache( + env->me_map + pgno2bytes(env, flush_begin), + pgno2bytes(env, flush_end - flush_begin)); #endif /* MDBX_CPU_CACHE_MMAP_NONCOHERENT && Linux */ - for (i = keep; ++i <= pagecount;) { - dp = dl[i].ptr; - /* This is a page we skipped above */ - if (!dl[i].pgno) { - dl[++j] = dl[i]; - dl[j].pgno = dp->mp_pgno; - continue; - } - mdbx_dpage_free(env, dp, IS_OVERFLOW(dp) ? dp->mp_pages : 1); - } + /* TODO: use flush_begin & flush_end for msync() & sync_file_range(). */ + (void)flush_begin; + (void)flush_end; -done: - i--; - txn->tw.dirtyroom += i - j; - dl->length = j; + txn->tw.dirtyroom += r - 1 - w; + dl->length = w; mdbx_tassert(txn, txn->mt_parent || txn->tw.dirtyroom + txn->tw.dirtylist->length == MDBX_DPL_TXNFULL);