mdbx: добавление MDBX_opt_writethrough_threshold и сопутствующие доработки.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-12-03 14:55:38 +03:00
parent 822952ef01
commit 23d236f70e
6 changed files with 266 additions and 113 deletions

28
mdbx.h
View File

@ -2220,6 +2220,34 @@ enum MDBX_option_t {
* to 50% (half empty) which corresponds to the range from 8192 and to 32768 * to 50% (half empty) which corresponds to the range from 8192 and to 32768
* in units respectively. */ * in units respectively. */
MDBX_opt_merge_threshold_16dot16_percent, MDBX_opt_merge_threshold_16dot16_percent,
/** \brief Controls the choosing between use write-through disk writes and
* usual ones with followed flush by the `fdatasync()` syscall.
* \details Depending on the operating system, storage subsystem
* characteristics and the use case, higher performance can be achieved by
* either using write-through or a serie of usual/lazy writes followed by
* the flush-to-disk.
*
* Basically for N chunks the latency/cost of write-through is:
* latency = N * (emit + round-trip-to-storage + storage-execution);
* And for serie of lazy writes with flush is:
* latency = N * (emit + storage-execution) + flush + round-trip-to-storage.
*
* So, for large N and/or noteable round-trip-to-storage the write+flush
* approach is win. But for small N and/or near-zero NVMe-like latency
* the write-through is better.
*
* To solve this issue libmdbx provide `MDBX_opt_writethrough_threshold`:
* - when N described above less or equal specified threshold,
* a write-through approach will be used;
* - otherwise, when N great than specified threshold,
* a write-and-flush approach will be used.
*
* \note MDBX_opt_writethrough_threshold affects only \ref MDBX_SYNC_DURABLE
* mode without \ref MDBX_WRITEMAP, and not supported on Windows.
* On Windows a write-through is used always but \ref MDBX_NOMETASYNC could
* be used for switching to write-and-flush. */
MDBX_opt_writethrough_threshold,
}; };
#ifndef __cplusplus #ifndef __cplusplus
/** \ingroup c_settings */ /** \ingroup c_settings */

View File

@ -4446,6 +4446,7 @@ static __inline int page_retire(MDBX_cursor *mc, MDBX_page *mp) {
typedef struct iov_ctx { typedef struct iov_ctx {
MDBX_env *env; MDBX_env *env;
osal_ioring_t *ior; osal_ioring_t *ior;
mdbx_filehandle_t fd;
int err; int err;
#ifndef MDBX_NEED_WRITTEN_RANGE #ifndef MDBX_NEED_WRITTEN_RANGE
#define MDBX_NEED_WRITTEN_RANGE 1 #define MDBX_NEED_WRITTEN_RANGE 1
@ -4458,10 +4459,12 @@ typedef struct iov_ctx {
} iov_ctx_t; } iov_ctx_t;
__must_check_result static int iov_init(MDBX_txn *const txn, iov_ctx_t *ctx, __must_check_result static int iov_init(MDBX_txn *const txn, iov_ctx_t *ctx,
size_t items, size_t npages) { size_t items, size_t npages,
mdbx_filehandle_t fd) {
ctx->env = txn->mt_env; ctx->env = txn->mt_env;
ctx->ior = &txn->mt_env->me_ioring; ctx->ior = &txn->mt_env->me_ioring;
ctx->err = osal_ioring_reserve(ctx->ior, items, ctx->fd = fd;
ctx->err = osal_ioring_prepare(ctx->ior, items,
pgno_align2os_bytes(txn->mt_env, npages)); pgno_align2os_bytes(txn->mt_env, npages));
if (likely(ctx->err == MDBX_SUCCESS)) { if (likely(ctx->err == MDBX_SUCCESS)) {
#if MDBX_NEED_WRITTEN_RANGE #if MDBX_NEED_WRITTEN_RANGE
@ -4534,12 +4537,10 @@ static void iov_complete(iov_ctx_t *ctx) {
__must_check_result static int iov_write(iov_ctx_t *ctx) { __must_check_result static int iov_write(iov_ctx_t *ctx) {
eASSERT(ctx->env, !iov_empty(ctx)); eASSERT(ctx->env, !iov_empty(ctx));
osal_ioring_write_result_t r = osal_ioring_write(ctx->ior); osal_ioring_write_result_t r = osal_ioring_write(ctx->ior, ctx->fd);
#if MDBX_ENABLE_PGOP_STAT #if MDBX_ENABLE_PGOP_STAT
ctx->env->me_lck->mti_pgop_stat.wops.weak += r.wops; ctx->env->me_lck->mti_pgop_stat.wops.weak += r.wops;
#endif /* MDBX_ENABLE_PGOP_STAT */ #endif /* MDBX_ENABLE_PGOP_STAT */
if (!ctx->env->me_lck->mti_eoos_timestamp.weak)
ctx->env->me_lck->mti_eoos_timestamp.weak = osal_monotime();
ctx->err = r.err; ctx->err = r.err;
if (unlikely(ctx->err != MDBX_SUCCESS)) if (unlikely(ctx->err != MDBX_SUCCESS))
ERROR("Write error: %s", mdbx_strerror(ctx->err)); ERROR("Write error: %s", mdbx_strerror(ctx->err));
@ -4596,7 +4597,6 @@ __must_check_result static int iov_page(MDBX_txn *txn, iov_ctx_t *ctx,
? ctx->flush_end ? ctx->flush_end
: dp->mp_pgno + (pgno_t)npages; : dp->mp_pgno + (pgno_t)npages;
#endif /* MDBX_NEED_WRITTEN_RANGE */ #endif /* MDBX_NEED_WRITTEN_RANGE */
env->me_lck->mti_unsynced_pages.weak += npages;
return MDBX_SUCCESS; return MDBX_SUCCESS;
} }
@ -4816,6 +4816,8 @@ __cold static int txn_spill_slowpath(MDBX_txn *const txn, MDBX_cursor *const m0,
pgno_align2os_bytes(env, txn->mt_next_pgno), MDBX_SYNC_KICK); pgno_align2os_bytes(env, txn->mt_next_pgno), MDBX_SYNC_KICK);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
env->me_lck->mti_unsynced_pages.weak +=
txn->tw.dirtylist->pages_including_loose - txn->tw.loose_count;
dpl_clear(txn->tw.dirtylist); dpl_clear(txn->tw.dirtylist);
txn->tw.dirtyroom = env->me_options.dp_limit - txn->tw.loose_count; txn->tw.dirtyroom = env->me_options.dp_limit - txn->tw.loose_count;
for (MDBX_page *lp = txn->tw.loose_pages; lp != nullptr; lp = mp_next(lp)) { for (MDBX_page *lp = txn->tw.loose_pages; lp != nullptr; lp = mp_next(lp)) {
@ -4950,7 +4952,12 @@ __cold static int txn_spill_slowpath(MDBX_txn *const txn, MDBX_cursor *const m0,
tASSERT(txn, prio2spill < prio2adjacent && prio2adjacent <= 256); tASSERT(txn, prio2spill < prio2adjacent && prio2adjacent <= 256);
iov_ctx_t ctx; iov_ctx_t ctx;
rc = iov_init(txn, &ctx, amount_entries, amount_npages); rc =
iov_init(txn, &ctx, amount_entries, amount_npages,
#if defined(_WIN32) || defined(_WIN64)
txn->mt_env->me_overlapped_fd ? txn->mt_env->me_overlapped_fd :
#endif
txn->mt_env->me_lazy_fd);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
@ -5028,6 +5035,7 @@ __cold static int txn_spill_slowpath(MDBX_txn *const txn, MDBX_cursor *const m0,
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto bailout; goto bailout;
txn->mt_env->me_lck->mti_unsynced_pages.weak += spilled_npages;
if (!MDBX_AVOID_MSYNC || !(txn->mt_flags & MDBX_WRITEMAP)) { if (!MDBX_AVOID_MSYNC || !(txn->mt_flags & MDBX_WRITEMAP)) {
pnl_sort(txn->tw.spilled.list, (size_t)txn->mt_next_pgno << 1); pnl_sort(txn->tw.spilled.list, (size_t)txn->mt_next_pgno << 1);
txn->mt_flags |= MDBX_TXN_SPILLS; txn->mt_flags |= MDBX_TXN_SPILLS;
@ -10543,7 +10551,7 @@ static int txn_write(MDBX_txn *txn, iov_ctx_t *ctx) {
tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC); tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC);
MDBX_dpl *const dl = dpl_sort(txn); MDBX_dpl *const dl = dpl_sort(txn);
int rc = MDBX_SUCCESS; int rc = MDBX_SUCCESS;
size_t r, w; size_t r, w, total_npages = 0;
for (w = 0, r = 1; r <= dl->length; ++r) { for (w = 0, r = 1; r <= dl->length; ++r) {
MDBX_page *dp = dl->items[r].ptr; MDBX_page *dp = dl->items[r].ptr;
if (dp->mp_flags & P_LOOSE) { if (dp->mp_flags & P_LOOSE) {
@ -10551,9 +10559,10 @@ static int txn_write(MDBX_txn *txn, iov_ctx_t *ctx) {
continue; continue;
} }
unsigned npages = dpl_npages(dl, r); unsigned npages = dpl_npages(dl, r);
total_npages += npages;
rc = iov_page(txn, ctx, dp, npages); rc = iov_page(txn, ctx, dp, npages);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
break; return rc;
} }
if (!iov_empty(ctx)) { if (!iov_empty(ctx)) {
@ -10561,6 +10570,13 @@ static int txn_write(MDBX_txn *txn, iov_ctx_t *ctx) {
rc = iov_write(ctx); rc = iov_write(ctx);
} }
if (likely(rc == MDBX_SUCCESS) && ctx->fd == txn->mt_env->me_lazy_fd) {
txn->mt_env->me_lck->mti_unsynced_pages.weak += total_npages;
if (!txn->mt_env->me_lck->mti_eoos_timestamp.weak)
txn->mt_env->me_lck->mti_eoos_timestamp.weak = osal_monotime();
}
txn->tw.dirtylist->pages_including_loose -= total_npages;
while (r <= dl->length) while (r <= dl->length)
dl->items[++w] = dl->items[r++]; dl->items[++w] = dl->items[r++];
@ -10569,6 +10585,8 @@ static int txn_write(MDBX_txn *txn, iov_ctx_t *ctx) {
tASSERT(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length == tASSERT(txn, txn->tw.dirtyroom + txn->tw.dirtylist->length ==
(txn->mt_parent ? txn->mt_parent->tw.dirtyroom (txn->mt_parent ? txn->mt_parent->tw.dirtyroom
: txn->mt_env->me_options.dp_limit)); : txn->mt_env->me_options.dp_limit));
tASSERT(txn, txn->tw.dirtylist->length == txn->tw.loose_count);
tASSERT(txn, txn->tw.dirtylist->pages_including_loose == txn->tw.loose_count);
return rc; return rc;
} }
@ -11235,6 +11253,7 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto fail; goto fail;
tASSERT(txn, txn->tw.loose_count == 0);
txn->mt_dbs[FREE_DBI].md_mod_txnid = (txn->mt_dbistate[FREE_DBI] & DBI_DIRTY) txn->mt_dbs[FREE_DBI].md_mod_txnid = (txn->mt_dbistate[FREE_DBI] & DBI_DIRTY)
? txn->mt_txnid ? txn->mt_txnid
: txn->mt_dbs[FREE_DBI].md_mod_txnid; : txn->mt_dbs[FREE_DBI].md_mod_txnid;
@ -11252,40 +11271,74 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
goto fail; goto fail;
} }
bool need_flush_for_nometasync = false;
const meta_ptr_t head = meta_recent(env, &txn->tw.troika); const meta_ptr_t head = meta_recent(env, &txn->tw.troika);
const uint32_t meta_sync_txnid =
atomic_load32(&env->me_lck->mti_meta_sync_txnid, mo_Relaxed);
/* sync prev meta */ /* sync prev meta */
if (head.is_steady && atomic_load32(&env->me_lck->mti_meta_sync_txnid, if (head.is_steady && meta_sync_txnid != (uint32_t)head.txnid) {
mo_Relaxed) != (uint32_t)head.txnid) { /* Исправление унаследованного от LMDB недочета:
/* FIXME: Тут есть унаследованный от LMDB недочет.
* *
* Проблем нет, если все процессы работающие с БД не используют WRITEMAP. * Всё хорошо, если все процессы работающие с БД не используют WRITEMAP.
* Тогда мета-страница (обновленная, но не сброшенная на диск) будет * Тогда мета-страница (обновленная, но не сброшенная на диск) будет
* сохранена в результате fdatasync() при записи данных этой транзакции. * сохранена в результате fdatasync() при записи данных этой транзакции.
* *
* Проблем нет, если все процессы работающие с БД используют WRITEMAP * Всё хорошо, если все процессы работающие с БД используют WRITEMAP
* без MDBX_AVOID_MSYNC. * без MDBX_AVOID_MSYNC.
* Тогда мета-страница (обновленная, но не сброшенная на диск) будет * Тогда мета-страница (обновленная, но не сброшенная на диск) будет
* сохранена в результате msync() при записи данных этой транзакции. * сохранена в результате msync() при записи данных этой транзакции.
* *
* Если же происходит комбинирование WRITEMAP и записи через файловый * Если же в процессах работающих с БД используется оба метода, как sync()
* дескриптор, то требуется явно обновлять мета-страницу. Однако, * в режиме MDBX_WRITEMAP, так и записи через файловый дескриптор, то
* так полностью теряется выгода от NOMETASYNC. * становится невозможным обеспечить фиксацию на диске мета-страницы
* * предыдущей транзакции и данных текущей транзакции, за счет одной
* Дефект же в том, что сейчас нет возможности отличить последний случай от * sync-операцией выполняемой после записи данных текущей транзакции.
* двух предыдущих и поэтому приходится всегда задействовать meta_sync(). */ * Соответственно, требуется явно обновлять мета-страницу, что полностью
rc = meta_sync(env, head); * уничтожает выгоду от NOMETASYNC. */
if (unlikely(rc != MDBX_SUCCESS)) { const uint32_t txnid_dist =
ERROR("txn-%s: error %d", "presync-meta", rc); ((txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC)
goto fail; ? MDBX_NOMETASYNC_LAZY_FD
: MDBX_NOMETASYNC_LAZY_WRITEMAP;
/* Смысл "магии" в том, чтобы избежать отдельного вызова fdatasync()
* или msync() для гарантированной фиксации на диске мета-страницы,
* которая была "лениво" отправлена на запись в предыдущей транзакции,
* но не сброшена на диск из-за активного режима MDBX_NOMETASYNC. */
if (
#if defined(_WIN32) || defined(_WIN64)
!env->me_overlapped_fd &&
#endif
meta_sync_txnid == (uint32_t)head.txnid - txnid_dist)
need_flush_for_nometasync = true;
else {
rc = meta_sync(env, head);
if (unlikely(rc != MDBX_SUCCESS)) {
ERROR("txn-%s: error %d", "presync-meta", rc);
goto fail;
}
} }
} }
if (txn->tw.dirtylist) { if (txn->tw.dirtylist) {
tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC); tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC);
tASSERT(txn, txn->tw.loose_count == 0);
mdbx_filehandle_t fd =
#if defined(_WIN32) || defined(_WIN64)
env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
(void)need_flush_for_nometasync;
#else
#define MDBX_WRITETHROUGH_THRESHOLD_DEFAULT 2
(need_flush_for_nometasync ||
env->me_dsync_fd == INVALID_HANDLE_VALUE ||
txn->tw.dirtylist->length > env->me_options.writethrough_threshold ||
atomic_load64(&env->me_lck->mti_unsynced_pages, mo_Relaxed))
? env->me_lazy_fd
: env->me_dsync_fd;
#endif /* Windows */
iov_ctx_t write_ctx; iov_ctx_t write_ctx;
rc = iov_init(txn, &write_ctx, txn->tw.dirtylist->length, rc = iov_init(txn, &write_ctx, txn->tw.dirtylist->length,
txn->tw.dirtylist->pages_including_loose - txn->tw.dirtylist->pages_including_loose, fd);
txn->tw.loose_count);
if (unlikely(rc != MDBX_SUCCESS)) { if (unlikely(rc != MDBX_SUCCESS)) {
ERROR("txn-%s: error %d", "iov-init", rc); ERROR("txn-%s: error %d", "iov-init", rc);
goto fail; goto fail;
@ -11298,6 +11351,9 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
} }
} else { } else {
tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) != 0 && !MDBX_AVOID_MSYNC); tASSERT(txn, (txn->mt_flags & MDBX_WRITEMAP) != 0 && !MDBX_AVOID_MSYNC);
env->me_lck->mti_unsynced_pages.weak += txn->tw.writemap_dirty_npages;
if (!env->me_lck->mti_eoos_timestamp.weak)
env->me_lck->mti_eoos_timestamp.weak = osal_monotime();
} }
/* TODO: use ctx.flush_begin & ctx.flush_end for range-sync */ /* TODO: use ctx.flush_begin & ctx.flush_end for range-sync */
@ -12020,6 +12076,8 @@ static int sync_locked(MDBX_env *env, unsigned flags, MDBX_meta *const pending,
atomic_store64(&env->me_lck->mti_unsynced_pages, 0, mo_Relaxed); atomic_store64(&env->me_lck->mti_unsynced_pages, 0, mo_Relaxed);
} else { } else {
assert(rc == MDBX_RESULT_TRUE /* carry non-steady */); assert(rc == MDBX_RESULT_TRUE /* carry non-steady */);
eASSERT(env, env->me_lck->mti_unsynced_pages.weak > 0);
eASSERT(env, env->me_lck->mti_eoos_timestamp.weak != 0);
unaligned_poke_u64(4, pending->mm_sign, MDBX_DATASIGN_WEAK); unaligned_poke_u64(4, pending->mm_sign, MDBX_DATASIGN_WEAK);
} }
@ -12188,9 +12246,15 @@ static int sync_locked(MDBX_env *env, unsigned flags, MDBX_meta *const pending,
if (unlikely(rc != MDBX_RESULT_TRUE)) if (unlikely(rc != MDBX_RESULT_TRUE))
goto fail; goto fail;
} }
const uint32_t sync_txnid_dist =
((flags & MDBX_NOMETASYNC) == 0) ? 0
: ((flags & MDBX_WRITEMAP) == 0 || MDBX_AVOID_MSYNC)
? MDBX_NOMETASYNC_LAZY_FD
: MDBX_NOMETASYNC_LAZY_WRITEMAP;
env->me_lck->mti_meta_sync_txnid.weak = env->me_lck->mti_meta_sync_txnid.weak =
pending->mm_txnid_a[__BYTE_ORDER__ != __ORDER_LITTLE_ENDIAN__].weak - pending->mm_txnid_a[__BYTE_ORDER__ != __ORDER_LITTLE_ENDIAN__].weak -
((flags & MDBX_NOMETASYNC) ? UINT32_MAX / 3 : 0); sync_txnid_dist;
*troika = meta_tap(env); *troika = meta_tap(env);
for (MDBX_txn *txn = env->me_txn0; txn; txn = txn->mt_child) for (MDBX_txn *txn = env->me_txn0; txn; txn = txn->mt_child)
@ -12349,11 +12413,8 @@ __cold int mdbx_env_create(MDBX_env **penv) {
env->me_maxreaders = DEFAULT_READERS; env->me_maxreaders = DEFAULT_READERS;
env->me_maxdbs = env->me_numdbs = CORE_DBS; env->me_maxdbs = env->me_numdbs = CORE_DBS;
env->me_lazy_fd = env->me_dsync_fd = env->me_fd4meta = env->me_fd4data = env->me_lazy_fd = env->me_dsync_fd = env->me_fd4meta = env->me_lfd =
#if defined(_WIN32) || defined(_WIN64) INVALID_HANDLE_VALUE;
env->me_overlapped_fd =
#endif /* Windows */
env->me_lfd = INVALID_HANDLE_VALUE;
env->me_pid = osal_getpid(); env->me_pid = osal_getpid();
env->me_stuck_meta = -1; env->me_stuck_meta = -1;
@ -12371,6 +12432,14 @@ __cold int mdbx_env_create(MDBX_env **penv) {
env->me_options.dp_loose_limit = 64; env->me_options.dp_loose_limit = 64;
env->me_options.merge_threshold_16dot16_percent = 65536 / 4 /* 25% */; env->me_options.merge_threshold_16dot16_percent = 65536 / 4 /* 25% */;
#if !(defined(_WIN32) || defined(_WIN64))
env->me_options.writethrough_threshold =
#if defined(__linux__) || defined(__gnu_linux__)
mdbx_RunningOnWSL1 ? MAX_PAGENO :
#endif /* Linux */
MDBX_WRITETHROUGH_THRESHOLD_DEFAULT;
#endif /* Windows */
env->me_os_psize = (unsigned)os_psize; env->me_os_psize = (unsigned)os_psize;
setup_pagesize(env, (env->me_os_psize < MAX_PAGESIZE) ? env->me_os_psize setup_pagesize(env, (env->me_os_psize < MAX_PAGESIZE) ? env->me_os_psize
: MAX_PAGESIZE); : MAX_PAGESIZE);
@ -14184,12 +14253,12 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
const uint64_t safe_parking_lot_offset = UINT64_C(0x7fffFFFF80000000); const uint64_t safe_parking_lot_offset = UINT64_C(0x7fffFFFF80000000);
osal_fseek(env->me_lazy_fd, safe_parking_lot_offset); osal_fseek(env->me_lazy_fd, safe_parking_lot_offset);
env->me_fd4data = env->me_fd4meta = env->me_lazy_fd; env->me_fd4meta = env->me_lazy_fd;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
uint8_t ior_flags = 0; eASSERT(env, env->me_overlapped_fd == 0);
if ((flags & (MDBX_RDONLY | MDBX_SAFE_NOSYNC)) == MDBX_SYNC_DURABLE) { bool ior_direct = false;
ior_flags = IOR_OVERLAPPED; if (!(flags & (MDBX_RDONLY | MDBX_SAFE_NOSYNC | MDBX_NOMETASYNC))) {
if ((flags & MDBX_WRITEMAP) && MDBX_AVOID_MSYNC) { if (MDBX_AVOID_MSYNC && (flags & MDBX_WRITEMAP)) {
/* Запрошен режим MDBX_SAFE_NOSYNC | MDBX_WRITEMAP при активной опции /* Запрошен режим MDBX_SAFE_NOSYNC | MDBX_WRITEMAP при активной опции
* MDBX_AVOID_MSYNC. * MDBX_AVOID_MSYNC.
* *
@ -14203,23 +14272,30 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
* 2) Кроме этого, в Windows запись в заблокированный регион файла * 2) Кроме этого, в Windows запись в заблокированный регион файла
* возможно только через тот-же дескриптор. Поэтому изначальный захват * возможно только через тот-же дескриптор. Поэтому изначальный захват
* блокировок посредством osal_lck_seize(), захват/освобождение блокировок * блокировок посредством osal_lck_seize(), захват/освобождение блокировок
* во время пишущих транзакций и запись данных должны выполнять через один * во время пишущих транзакций и запись данных должны выполнятся через
* дескриптор. * один дескриптор.
* *
* Таким образом, требуется прочитать волатильный заголовок БД, чтобы * Таким образом, требуется прочитать волатильный заголовок БД, чтобы
* узнать размер страницы, чтобы открыть дескриптор файла в режиме нужном * узнать размер страницы, чтобы открыть дескриптор файла в режиме нужном
* для записи данных, чтобы использовать именно этот дескриптор для * для записи данных, чтобы использовать именно этот дескриптор для
* изначального захвата блокировок. */ * изначального захвата блокировок. */
MDBX_meta header; MDBX_meta header;
if (read_header(env, &header, MDBX_SUCCESS, true) == MDBX_SUCCESS && uint64_t dxb_filesize;
header.mm_psize >= env->me_os_psize) int err = read_header(env, &header, MDBX_SUCCESS, true);
ior_flags |= IOR_DIRECT; if ((err == MDBX_SUCCESS && header.mm_psize >= env->me_os_psize) ||
(err == MDBX_ENODATA && mode && env->me_psize >= env->me_os_psize &&
osal_filesize(env->me_lazy_fd, &dxb_filesize) == MDBX_SUCCESS &&
dxb_filesize == 0))
/* Может быть коллизия, если два процесса пытаются одновременно создать
* БД с разным размером страницы, который у одного меньше системной
* страницы, а у другого НЕ меньше. Эта допустимая, но очень странная
* ситуация. Поэтому считаем её ошибочной и не пытаемся разрешить. */
ior_direct = true;
} }
rc = rc = osal_openfile(ior_direct ? MDBX_OPEN_DXB_OVERLAPPED_DIRECT
osal_openfile((ior_flags & IOR_DIRECT) ? MDBX_OPEN_DXB_OVERLAPPED_DIRECT : MDBX_OPEN_DXB_OVERLAPPED,
: MDBX_OPEN_DXB_OVERLAPPED, env, env_pathname.dxb, &env->me_overlapped_fd, 0);
env, env_pathname.dxb, &env->me_overlapped_fd, 0);
if (rc != MDBX_SUCCESS) if (rc != MDBX_SUCCESS)
goto bailout; goto bailout;
env->me_data_lock_event = CreateEventW(nullptr, true, false, nullptr); env->me_data_lock_event = CreateEventW(nullptr, true, false, nullptr);
@ -14227,7 +14303,6 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
rc = (int)GetLastError(); rc = (int)GetLastError();
goto bailout; goto bailout;
} }
env->me_fd4data = env->me_overlapped_fd;
osal_fseek(env->me_overlapped_fd, safe_parking_lot_offset); osal_fseek(env->me_overlapped_fd, safe_parking_lot_offset);
} }
#else #else
@ -14260,17 +14335,12 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
MDBX_DEPRECATED_COALESCE | MDBX_NORDAHEAD; MDBX_DEPRECATED_COALESCE | MDBX_NORDAHEAD;
eASSERT(env, env->me_dsync_fd == INVALID_HANDLE_VALUE); eASSERT(env, env->me_dsync_fd == INVALID_HANDLE_VALUE);
if ((flags & (MDBX_RDONLY | MDBX_SAFE_NOSYNC)) == 0 && if (!(flags & (MDBX_RDONLY | MDBX_SAFE_NOSYNC | MDBX_DEPRECATED_MAPASYNC))) {
(env->me_fd4data == env->me_lazy_fd || !(flags & MDBX_NOMETASYNC))) {
rc = osal_openfile(MDBX_OPEN_DXB_DSYNC, env, env_pathname.dxb, rc = osal_openfile(MDBX_OPEN_DXB_DSYNC, env, env_pathname.dxb,
&env->me_dsync_fd, 0); &env->me_dsync_fd, 0);
if (env->me_dsync_fd != INVALID_HANDLE_VALUE) { if (env->me_dsync_fd != INVALID_HANDLE_VALUE) {
if ((flags & MDBX_NOMETASYNC) == 0) if ((flags & MDBX_NOMETASYNC) == 0)
env->me_fd4meta = env->me_dsync_fd; env->me_fd4meta = env->me_dsync_fd;
#if defined(_WIN32) || defined(_WIN64)
if (env->me_fd4data == env->me_lazy_fd)
env->me_fd4data = env->me_dsync_fd;
#endif /* Windows must die */
osal_fseek(env->me_dsync_fd, safe_parking_lot_offset); osal_fseek(env->me_dsync_fd, safe_parking_lot_offset);
} }
} }
@ -14386,11 +14456,12 @@ __cold int mdbx_env_openW(MDBX_env *env, const wchar_t *pathname,
rc = MDBX_ENOMEM; rc = MDBX_ENOMEM;
} }
if (rc == MDBX_SUCCESS) if (rc == MDBX_SUCCESS)
rc = osal_ioring_create(&env->me_ioring, rc = osal_ioring_create(&env->me_ioring
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
ior_flags, ,
ior_direct, env->me_overlapped_fd
#endif /* Windows */ #endif /* Windows */
env->me_fd4data); );
} }
#if MDBX_DEBUG #if MDBX_DEBUG
@ -14462,10 +14533,13 @@ __cold static int env_close(MDBX_env *env) {
} }
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
if (env->me_overlapped_fd != INVALID_HANDLE_VALUE) { if (env->me_overlapped_fd) {
CloseHandle(env->me_data_lock_event);
CloseHandle(env->me_overlapped_fd); CloseHandle(env->me_overlapped_fd);
env->me_overlapped_fd = INVALID_HANDLE_VALUE; env->me_overlapped_fd = 0;
}
if (env->me_data_lock_event != INVALID_HANDLE_VALUE) {
CloseHandle(env->me_data_lock_event);
env->me_data_lock_event = INVALID_HANDLE_VALUE;
} }
#endif /* Windows */ #endif /* Windows */
@ -24054,6 +24128,24 @@ __cold int mdbx_env_set_option(MDBX_env *env, const MDBX_option_t option,
recalculate_merge_threshold(env); recalculate_merge_threshold(env);
break; break;
case MDBX_opt_writethrough_threshold:
if (value != (unsigned)value)
err = MDBX_EINVAL;
else
#if defined(_WIN32) || defined(_WIN64)
/* позволяем "установить" значение по-умолчанию и совпадающее
* с поведением соответствующим текущей установке MDBX_NOMETASYNC */
if ((unsigned)-1 != (unsigned)value &&
value != ((env->me_flags & MDBX_NOMETASYNC) ? 0 : INT_MAX))
err = MDBX_EINVAL;
#else
env->me_options.writethrough_threshold =
((unsigned)-1 == (unsigned)value)
? MDBX_WRITETHROUGH_THRESHOLD_DEFAULT
: (unsigned)value;
#endif
break;
default: default:
return MDBX_EINVAL; return MDBX_EINVAL;
} }
@ -24127,6 +24219,14 @@ __cold int mdbx_env_get_option(const MDBX_env *env, const MDBX_option_t option,
*pvalue = env->me_options.merge_threshold_16dot16_percent; *pvalue = env->me_options.merge_threshold_16dot16_percent;
break; break;
case MDBX_opt_writethrough_threshold:
#if defined(_WIN32) || defined(_WIN64)
*pvalue = (env->me_flags & MDBX_NOMETASYNC) ? 0 : INT_MAX;
#else
*pvalue = env->me_options.writethrough_threshold;
#endif
break;
default: default:
return MDBX_EINVAL; return MDBX_EINVAL;
} }

View File

@ -761,6 +761,10 @@ typedef struct MDBX_lockinfo {
/* Low 32-bit of txnid with which meta-pages was synced, /* Low 32-bit of txnid with which meta-pages was synced,
* i.e. for sync-polling in the MDBX_NOMETASYNC mode. */ * i.e. for sync-polling in the MDBX_NOMETASYNC mode. */
#define MDBX_NOMETASYNC_LAZY_UNK (UINT32_MAX / 3)
#define MDBX_NOMETASYNC_LAZY_FD (MDBX_NOMETASYNC_LAZY_UNK + UINT32_MAX / 8)
#define MDBX_NOMETASYNC_LAZY_WRITEMAP \
(MDBX_NOMETASYNC_LAZY_UNK - UINT32_MAX / 8)
MDBX_atomic_uint32_t mti_meta_sync_txnid; MDBX_atomic_uint32_t mti_meta_sync_txnid;
/* Period for timed auto-sync feature, i.e. at the every steady checkpoint /* Period for timed auto-sync feature, i.e. at the every steady checkpoint
@ -1213,10 +1217,10 @@ struct MDBX_env {
osal_mmap_t me_dxb_mmap; /* The main data file */ osal_mmap_t me_dxb_mmap; /* The main data file */
#define me_map me_dxb_mmap.base #define me_map me_dxb_mmap.base
#define me_lazy_fd me_dxb_mmap.fd #define me_lazy_fd me_dxb_mmap.fd
#define me_fd4data me_ioring.fd
mdbx_filehandle_t me_dsync_fd, me_fd4meta; mdbx_filehandle_t me_dsync_fd, me_fd4meta;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
HANDLE me_overlapped_fd, me_data_lock_event; #define me_overlapped_fd me_ioring.overlapped_fd
HANDLE me_data_lock_event;
#endif /* Windows */ #endif /* Windows */
osal_mmap_t me_lck_mmap; /* The lock file */ osal_mmap_t me_lck_mmap; /* The lock file */
#define me_lfd me_lck_mmap.fd #define me_lfd me_lck_mmap.fd
@ -1259,6 +1263,9 @@ struct MDBX_env {
uint8_t spill_min_denominator; uint8_t spill_min_denominator;
uint8_t spill_parent4child_denominator; uint8_t spill_parent4child_denominator;
unsigned merge_threshold_16dot16_percent; unsigned merge_threshold_16dot16_percent;
#if !(defined(_WIN32) || defined(_WIN64))
unsigned writethrough_threshold;
#endif /* Windows */
union { union {
unsigned all; unsigned all;
/* tracks options with non-auto values but tuned by user */ /* tracks options with non-auto values but tuned by user */

View File

@ -152,8 +152,10 @@ static __inline int flock(HANDLE fd, unsigned flags, size_t offset,
static __inline int flock_data(const MDBX_env *env, unsigned flags, static __inline int flock_data(const MDBX_env *env, unsigned flags,
size_t offset, size_t bytes) { size_t offset, size_t bytes) {
return flock_with_event(env->me_fd4data, env->me_data_lock_event, flags, const HANDLE fd4data =
offset, bytes); env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
return flock_with_event(fd4data, env->me_data_lock_event, flags, offset,
bytes);
} }
static int funlock(mdbx_filehandle_t fd, size_t offset, size_t bytes) { static int funlock(mdbx_filehandle_t fd, size_t offset, size_t bytes) {
@ -195,17 +197,19 @@ int mdbx_txn_lock(MDBX_env *env, bool dontwait) {
if (env->me_flags & MDBX_EXCLUSIVE) if (env->me_flags & MDBX_EXCLUSIVE)
return MDBX_SUCCESS; return MDBX_SUCCESS;
int rc = flock_with_event(env->me_fd4data, env->me_data_lock_event, const HANDLE fd4data =
env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
int rc = flock_with_event(fd4data, env->me_data_lock_event,
dontwait ? (LCK_EXCLUSIVE | LCK_DONTWAIT) dontwait ? (LCK_EXCLUSIVE | LCK_DONTWAIT)
: (LCK_EXCLUSIVE | LCK_WAITFOR), : (LCK_EXCLUSIVE | LCK_WAITFOR),
DXB_BODY); DXB_BODY);
if (rc == ERROR_LOCK_VIOLATION && dontwait) { if (rc == ERROR_LOCK_VIOLATION && dontwait) {
SleepEx(0, true); SleepEx(0, true);
rc = flock_with_event(env->me_fd4data, env->me_data_lock_event, rc = flock_with_event(fd4data, env->me_data_lock_event,
LCK_EXCLUSIVE | LCK_DONTWAIT, DXB_BODY); LCK_EXCLUSIVE | LCK_DONTWAIT, DXB_BODY);
if (rc == ERROR_LOCK_VIOLATION) { if (rc == ERROR_LOCK_VIOLATION) {
SleepEx(0, true); SleepEx(0, true);
rc = flock_with_event(env->me_fd4data, env->me_data_lock_event, rc = flock_with_event(fd4data, env->me_data_lock_event,
LCK_EXCLUSIVE | LCK_DONTWAIT, DXB_BODY); LCK_EXCLUSIVE | LCK_DONTWAIT, DXB_BODY);
} }
} }
@ -218,7 +222,9 @@ int mdbx_txn_lock(MDBX_env *env, bool dontwait) {
void mdbx_txn_unlock(MDBX_env *env) { void mdbx_txn_unlock(MDBX_env *env) {
if ((env->me_flags & MDBX_EXCLUSIVE) == 0) { if ((env->me_flags & MDBX_EXCLUSIVE) == 0) {
int err = funlock(env->me_fd4data, DXB_BODY); const HANDLE fd4data =
env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
int err = funlock(fd4data, DXB_BODY);
if (err != MDBX_SUCCESS) if (err != MDBX_SUCCESS)
mdbx_panic("%s failed: err %u", __func__, err); mdbx_panic("%s failed: err %u", __func__, err);
} }
@ -451,18 +457,20 @@ static void lck_unlock(MDBX_env *env) {
SetLastError(ERROR_SUCCESS); SetLastError(ERROR_SUCCESS);
} }
if (env->me_fd4data != INVALID_HANDLE_VALUE) { const HANDLE fd4data =
env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
if (fd4data != INVALID_HANDLE_VALUE) {
/* explicitly unlock to avoid latency for other processes (windows kernel /* explicitly unlock to avoid latency for other processes (windows kernel
* releases such locks via deferred queues) */ * releases such locks via deferred queues) */
do do
err = funlock(env->me_fd4data, DXB_BODY); err = funlock(fd4data, DXB_BODY);
while (err == MDBX_SUCCESS); while (err == MDBX_SUCCESS);
assert(err == ERROR_NOT_LOCKED || assert(err == ERROR_NOT_LOCKED ||
(mdbx_RunningUnderWine() && err == ERROR_LOCK_VIOLATION)); (mdbx_RunningUnderWine() && err == ERROR_LOCK_VIOLATION));
SetLastError(ERROR_SUCCESS); SetLastError(ERROR_SUCCESS);
do do
err = funlock(env->me_fd4data, DXB_WHOLE); err = funlock(fd4data, DXB_WHOLE);
while (err == MDBX_SUCCESS); while (err == MDBX_SUCCESS);
assert(err == ERROR_NOT_LOCKED || assert(err == ERROR_NOT_LOCKED ||
(mdbx_RunningUnderWine() && err == ERROR_LOCK_VIOLATION)); (mdbx_RunningUnderWine() && err == ERROR_LOCK_VIOLATION));
@ -522,7 +530,9 @@ static int internal_seize_lck(HANDLE lfd) {
} }
MDBX_INTERNAL_FUNC int osal_lck_seize(MDBX_env *env) { MDBX_INTERNAL_FUNC int osal_lck_seize(MDBX_env *env) {
assert(env->me_fd4data != INVALID_HANDLE_VALUE); const HANDLE fd4data =
env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
assert(fd4data != INVALID_HANDLE_VALUE);
if (env->me_flags & MDBX_EXCLUSIVE) if (env->me_flags & MDBX_EXCLUSIVE)
return MDBX_RESULT_TRUE /* nope since files were must be opened return MDBX_RESULT_TRUE /* nope since files were must be opened
non-shareable */ non-shareable */
@ -554,7 +564,7 @@ MDBX_INTERNAL_FUNC int osal_lck_seize(MDBX_env *env) {
return err; return err;
} }
jitter4testing(false); jitter4testing(false);
err = funlock(env->me_fd4data, DXB_WHOLE); err = funlock(fd4data, DXB_WHOLE);
if (err != MDBX_SUCCESS) if (err != MDBX_SUCCESS)
mdbx_panic("%s(%s) failed: err %u", __func__, mdbx_panic("%s(%s) failed: err %u", __func__,
"unlock-against-without-lck", err); "unlock-against-without-lck", err);
@ -564,8 +574,10 @@ MDBX_INTERNAL_FUNC int osal_lck_seize(MDBX_env *env) {
} }
MDBX_INTERNAL_FUNC int osal_lck_downgrade(MDBX_env *env) { MDBX_INTERNAL_FUNC int osal_lck_downgrade(MDBX_env *env) {
const HANDLE fd4data =
env->me_overlapped_fd ? env->me_overlapped_fd : env->me_lazy_fd;
/* Transite from exclusive-write state (E-E) to used (S-?) */ /* Transite from exclusive-write state (E-E) to used (S-?) */
assert(env->me_fd4data != INVALID_HANDLE_VALUE); assert(fd4data != INVALID_HANDLE_VALUE);
assert(env->me_lfd != INVALID_HANDLE_VALUE); assert(env->me_lfd != INVALID_HANDLE_VALUE);
if (env->me_flags & MDBX_EXCLUSIVE) if (env->me_flags & MDBX_EXCLUSIVE)

View File

@ -606,16 +606,18 @@ static size_t osal_iov_max;
#undef OSAL_IOV_MAX #undef OSAL_IOV_MAX
#endif /* OSAL_IOV_MAX */ #endif /* OSAL_IOV_MAX */
MDBX_INTERNAL_FUNC int osal_ioring_create(osal_ioring_t *ior, MDBX_INTERNAL_FUNC int osal_ioring_create(osal_ioring_t *ior
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
uint8_t flags, ,
bool enable_direct,
mdbx_filehandle_t overlapped_fd
#endif /* Windows */ #endif /* Windows */
mdbx_filehandle_t fd) { ) {
memset(ior, 0, sizeof(osal_ioring_t)); memset(ior, 0, sizeof(osal_ioring_t));
ior->fd = fd;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
ior->flags = flags; ior->overlapped_fd = overlapped_fd;
ior->direct = enable_direct && overlapped_fd;
const unsigned pagesize = (unsigned)osal_syspagesize(); const unsigned pagesize = (unsigned)osal_syspagesize();
ior->pagesize = pagesize; ior->pagesize = pagesize;
ior->pagesize_ln2 = (uint8_t)log2n_powerof2(pagesize); ior->pagesize_ln2 = (uint8_t)log2n_powerof2(pagesize);
@ -664,7 +666,7 @@ MDBX_INTERNAL_FUNC int osal_ioring_add(osal_ioring_t *ior, const size_t offset,
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
const unsigned segments = (unsigned)(bytes >> ior->pagesize_ln2); const unsigned segments = (unsigned)(bytes >> ior->pagesize_ln2);
const bool use_gather = const bool use_gather =
(ior->flags & IOR_DIRECT) && ior->slots_left >= segments; ior->direct && ior->overlapped_fd && ior->slots_left >= segments;
#endif /* Windows */ #endif /* Windows */
ior_item_t *item = ior->pool; ior_item_t *item = ior->pool;
@ -678,6 +680,7 @@ MDBX_INTERNAL_FUNC int osal_ioring_add(osal_ioring_t *ior, const size_t offset,
(uintptr_t)(uint64_t)item->sgv[0].Buffer) & (uintptr_t)(uint64_t)item->sgv[0].Buffer) &
ior_alignment_mask) == 0 && ior_alignment_mask) == 0 &&
ior->last_sgvcnt + segments < OSAL_IOV_MAX) { ior->last_sgvcnt + segments < OSAL_IOV_MAX) {
assert(ior->overlapped_fd);
assert((item->single.iov_len & ior_WriteFile_flag) == 0); assert((item->single.iov_len & ior_WriteFile_flag) == 0);
assert(item->sgv[ior->last_sgvcnt].Buffer == 0); assert(item->sgv[ior->last_sgvcnt].Buffer == 0);
ior->last_bytes += bytes; ior->last_bytes += bytes;
@ -745,6 +748,7 @@ MDBX_INTERNAL_FUNC int osal_ioring_add(osal_ioring_t *ior, const size_t offset,
assert((item->single.iov_len & ior_WriteFile_flag) != 0); assert((item->single.iov_len & ior_WriteFile_flag) != 0);
} else { } else {
/* WriteFileGather() */ /* WriteFileGather() */
assert(ior->overlapped_fd);
item->sgv[0].Buffer = PtrToPtr64(data); item->sgv[0].Buffer = PtrToPtr64(data);
for (size_t i = 1; i < segments; ++i) { for (size_t i = 1; i < segments; ++i) {
data = ptr_disp(data, ior->pagesize); data = ptr_disp(data, ior->pagesize);
@ -814,7 +818,7 @@ MDBX_INTERNAL_FUNC void osal_ioring_walk(
} }
MDBX_INTERNAL_FUNC osal_ioring_write_result_t MDBX_INTERNAL_FUNC osal_ioring_write_result_t
osal_ioring_write(osal_ioring_t *ior) { osal_ioring_write(osal_ioring_t *ior, mdbx_filehandle_t fd) {
osal_ioring_write_result_t r = {MDBX_SUCCESS, 0}; osal_ioring_write_result_t r = {MDBX_SUCCESS, 0};
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
@ -828,6 +832,7 @@ osal_ioring_write(osal_ioring_t *ior) {
size_t i = 1, bytes = item->single.iov_len - ior_WriteFile_flag; size_t i = 1, bytes = item->single.iov_len - ior_WriteFile_flag;
r.wops += 1; r.wops += 1;
if (bytes & ior_WriteFile_flag) { if (bytes & ior_WriteFile_flag) {
assert(ior->overlapped_fd && fd == ior->overlapped_fd);
bytes = ior->pagesize; bytes = ior->pagesize;
while (item->sgv[i].Buffer) { while (item->sgv[i].Buffer) {
bytes += ior->pagesize; bytes += ior->pagesize;
@ -840,11 +845,10 @@ osal_ioring_write(osal_ioring_t *ior) {
r.err = GetLastError(); r.err = GetLastError();
bailout_rc: bailout_rc:
assert(r.err != MDBX_SUCCESS); assert(r.err != MDBX_SUCCESS);
CancelIo(ior->fd); CancelIo(fd);
return r; return r;
} }
if (WriteFileGather(ior->fd, item->sgv, (DWORD)bytes, nullptr, if (WriteFileGather(fd, item->sgv, (DWORD)bytes, nullptr, &item->ov)) {
&item->ov)) {
assert(item->ov.Internal == 0 && assert(item->ov.Internal == 0 &&
WaitForSingleObject(item->ov.hEvent, 0) == WAIT_OBJECT_0); WaitForSingleObject(item->ov.hEvent, 0) == WAIT_OBJECT_0);
ior_put_event(ior, item->ov.hEvent); ior_put_event(ior, item->ov.hEvent);
@ -854,7 +858,7 @@ osal_ioring_write(osal_ioring_t *ior) {
if (unlikely(r.err != ERROR_IO_PENDING)) { if (unlikely(r.err != ERROR_IO_PENDING)) {
ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64 ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
", err %d", ", err %d",
"WriteFileGather", ior->fd, __Wpedantic_format_voidptr(item), "WriteFileGather", fd, __Wpedantic_format_voidptr(item),
item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno, item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno,
bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32),
r.err); r.err);
@ -863,11 +867,11 @@ osal_ioring_write(osal_ioring_t *ior) {
assert(wait_for > ior->event_pool + ior->event_stack); assert(wait_for > ior->event_pool + ior->event_stack);
*--wait_for = item->ov.hEvent; *--wait_for = item->ov.hEvent;
} }
} else if (ior->flags & IOR_OVERLAPPED) { } else if (fd == ior->overlapped_fd) {
assert(bytes < MAX_WRITE); assert(bytes < MAX_WRITE);
retry: retry:
item->ov.hEvent = ior; item->ov.hEvent = ior;
if (WriteFileEx(ior->fd, item->single.iov_base, (DWORD)bytes, &item->ov, if (WriteFileEx(fd, item->single.iov_base, (DWORD)bytes, &item->ov,
ior_wocr)) { ior_wocr)) {
async_started += 1; async_started += 1;
} else { } else {
@ -876,7 +880,7 @@ osal_ioring_write(osal_ioring_t *ior) {
default: default:
ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64 ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
", err %d", ", err %d",
"WriteFileEx", ior->fd, __Wpedantic_format_voidptr(item), "WriteFileEx", fd, __Wpedantic_format_voidptr(item),
item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno, item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno,
bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32),
r.err); r.err);
@ -887,7 +891,7 @@ osal_ioring_write(osal_ioring_t *ior) {
WARNING( WARNING(
"%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64 "%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
", err %d", ", err %d",
"WriteFileEx", ior->fd, __Wpedantic_format_voidptr(item), "WriteFileEx", fd, __Wpedantic_format_voidptr(item),
item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno, item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno,
bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32),
r.err); r.err);
@ -905,12 +909,12 @@ osal_ioring_write(osal_ioring_t *ior) {
} else { } else {
assert(bytes < MAX_WRITE); assert(bytes < MAX_WRITE);
DWORD written = 0; DWORD written = 0;
if (!WriteFile(ior->fd, item->single.iov_base, (DWORD)bytes, &written, if (!WriteFile(fd, item->single.iov_base, (DWORD)bytes, &written,
&item->ov)) { &item->ov)) {
r.err = (int)GetLastError(); r.err = (int)GetLastError();
ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64 ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
", err %d", ", err %d",
"WriteFile", ior->fd, __Wpedantic_format_voidptr(item), "WriteFile", fd, __Wpedantic_format_voidptr(item),
item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno, item - ior->pool, ((MDBX_page *)item->single.iov_base)->mp_pgno,
bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), bytes, item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32),
r.err); r.err);
@ -974,8 +978,7 @@ osal_ioring_write(osal_ioring_t *ior) {
} }
if (!HasOverlappedIoCompleted(&item->ov)) { if (!HasOverlappedIoCompleted(&item->ov)) {
DWORD written = 0; DWORD written = 0;
if (unlikely( if (unlikely(!GetOverlappedResult(fd, &item->ov, &written, true))) {
!GetOverlappedResult(ior->fd, &item->ov, &written, true))) {
ERROR("%s: item %p (%zu), pgno %u, bytes %zu, offset %" PRId64 ERROR("%s: item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
", err %d", ", err %d",
"GetOverlappedResult", __Wpedantic_format_voidptr(item), "GetOverlappedResult", __Wpedantic_format_voidptr(item),
@ -1025,16 +1028,16 @@ osal_ioring_write(osal_ioring_t *ior) {
#if MDBX_HAVE_PWRITEV #if MDBX_HAVE_PWRITEV
assert(item->sgvcnt > 0); assert(item->sgvcnt > 0);
if (item->sgvcnt == 1) if (item->sgvcnt == 1)
r.err = osal_pwrite(ior->fd, item->sgv[0].iov_base, item->sgv[0].iov_len, r.err = osal_pwrite(fd, item->sgv[0].iov_base, item->sgv[0].iov_len,
item->offset); item->offset);
else else
r.err = osal_pwritev(ior->fd, item->sgv, item->sgvcnt, item->offset); r.err = osal_pwritev(fd, item->sgv, item->sgvcnt, item->offset);
// TODO: io_uring_prep_write(sqe, fd, ...); // TODO: io_uring_prep_write(sqe, fd, ...);
item = ior_next(item, item->sgvcnt); item = ior_next(item, item->sgvcnt);
#else #else
r.err = osal_pwrite(ior->fd, item->single.iov_base, item->single.iov_len, r.err = osal_pwrite(fd, item->single.iov_base, item->single.iov_len,
item->offset); item->offset);
item = ior_next(item, 1); item = ior_next(item, 1);
#endif #endif
@ -1055,8 +1058,10 @@ MDBX_INTERNAL_FUNC void osal_ioring_reset(osal_ioring_t *ior) {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
if (ior->last) { if (ior->last) {
for (ior_item_t *item = ior->pool; item <= ior->last;) { for (ior_item_t *item = ior->pool; item <= ior->last;) {
if (!HasOverlappedIoCompleted(&item->ov)) if (!HasOverlappedIoCompleted(&item->ov)) {
CancelIoEx(ior->fd, &item->ov); assert(ior->overlapped_fd);
CancelIoEx(ior->overlapped_fd, &item->ov);
}
if (item->ov.hEvent && item->ov.hEvent != ior) if (item->ov.hEvent && item->ov.hEvent != ior)
ior_put_event(ior, item->ov.hEvent); ior_put_event(ior, item->ov.hEvent);
size_t i = 1; size_t i = 1;
@ -1090,13 +1095,12 @@ MDBX_INTERNAL_FUNC int osal_ioring_resize(osal_ioring_t *ior, size_t items) {
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
if (ior->state & IOR_STATE_LOCKED) if (ior->state & IOR_STATE_LOCKED)
return MDBX_SUCCESS; return MDBX_SUCCESS;
const bool useSetFileIoOverlappedRange = (ior->flags & IOR_OVERLAPPED) && const bool useSetFileIoOverlappedRange =
mdbx_SetFileIoOverlappedRange && ior->overlapped_fd && mdbx_SetFileIoOverlappedRange && items > 42;
items > 7;
const size_t ceiling = const size_t ceiling =
useSetFileIoOverlappedRange useSetFileIoOverlappedRange
? ((items < 65536 / 2 / sizeof(ior_item_t)) ? 65536 : 65536 * 4) ? ((items < 65536 / 2 / sizeof(ior_item_t)) ? 65536 : 65536 * 4)
: 4096; : 1024;
const size_t bytes = ceil_powerof2(sizeof(ior_item_t) * items, ceiling); const size_t bytes = ceil_powerof2(sizeof(ior_item_t) * items, ceiling);
items = bytes / sizeof(ior_item_t); items = bytes / sizeof(ior_item_t);
#endif /* Windows */ #endif /* Windows */
@ -1134,7 +1138,7 @@ MDBX_INTERNAL_FUNC int osal_ioring_resize(osal_ioring_t *ior, size_t items) {
ior->boundary = ptr_disp(ior->pool, ior->allocated); ior->boundary = ptr_disp(ior->pool, ior->allocated);
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
if (useSetFileIoOverlappedRange) { if (useSetFileIoOverlappedRange) {
if (mdbx_SetFileIoOverlappedRange(ior->fd, ptr, (ULONG)bytes)) if (mdbx_SetFileIoOverlappedRange(ior->overlapped_fd, ptr, (ULONG)bytes))
ior->state += IOR_STATE_LOCKED; ior->state += IOR_STATE_LOCKED;
else else
return GetLastError(); return GetLastError();

View File

@ -312,13 +312,12 @@ typedef struct osal_ioring {
unsigned slots_left; unsigned slots_left;
unsigned allocated; unsigned allocated;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
#define IOR_DIRECT 1
#define IOR_OVERLAPPED 2
#define IOR_STATE_LOCKED 1 #define IOR_STATE_LOCKED 1
HANDLE overlapped_fd;
unsigned pagesize; unsigned pagesize;
unsigned last_sgvcnt; unsigned last_sgvcnt;
size_t last_bytes; size_t last_bytes;
uint8_t flags, state, pagesize_ln2; uint8_t direct, state, pagesize_ln2;
unsigned event_stack; unsigned event_stack;
HANDLE *event_pool; HANDLE *event_pool;
volatile LONG async_waiting; volatile LONG async_waiting;
@ -335,7 +334,6 @@ typedef struct osal_ioring {
#define ior_last_sgvcnt(ior, item) (1) #define ior_last_sgvcnt(ior, item) (1)
#define ior_last_bytes(ior, item) (item)->single.iov_len #define ior_last_bytes(ior, item) (item)->single.iov_len
#endif /* !Windows */ #endif /* !Windows */
mdbx_filehandle_t fd;
ior_item_t *last; ior_item_t *last;
ior_item_t *pool; ior_item_t *pool;
char *boundary; char *boundary;
@ -344,11 +342,13 @@ typedef struct osal_ioring {
#ifndef __cplusplus #ifndef __cplusplus
/* Actually this is not ioring for now, but on the way. */ /* Actually this is not ioring for now, but on the way. */
MDBX_INTERNAL_FUNC int osal_ioring_create(osal_ioring_t *, MDBX_INTERNAL_FUNC int osal_ioring_create(osal_ioring_t *
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
uint8_t flags, ,
bool enable_direct,
mdbx_filehandle_t overlapped_fd
#endif /* Windows */ #endif /* Windows */
mdbx_filehandle_t fd); );
MDBX_INTERNAL_FUNC int osal_ioring_resize(osal_ioring_t *, size_t items); MDBX_INTERNAL_FUNC int osal_ioring_resize(osal_ioring_t *, size_t items);
MDBX_INTERNAL_FUNC void osal_ioring_destroy(osal_ioring_t *); MDBX_INTERNAL_FUNC void osal_ioring_destroy(osal_ioring_t *);
MDBX_INTERNAL_FUNC void osal_ioring_reset(osal_ioring_t *); MDBX_INTERNAL_FUNC void osal_ioring_reset(osal_ioring_t *);
@ -359,7 +359,7 @@ typedef struct osal_ioring_write_result {
unsigned wops; unsigned wops;
} osal_ioring_write_result_t; } osal_ioring_write_result_t;
MDBX_INTERNAL_FUNC osal_ioring_write_result_t MDBX_INTERNAL_FUNC osal_ioring_write_result_t
osal_ioring_write(osal_ioring_t *ior); osal_ioring_write(osal_ioring_t *ior, mdbx_filehandle_t fd);
typedef struct iov_ctx iov_ctx_t; typedef struct iov_ctx iov_ctx_t;
MDBX_INTERNAL_FUNC void osal_ioring_walk( MDBX_INTERNAL_FUNC void osal_ioring_walk(
@ -377,11 +377,13 @@ osal_ioring_used(const osal_ioring_t *ior) {
} }
MDBX_MAYBE_UNUSED static inline int MDBX_MAYBE_UNUSED static inline int
osal_ioring_reserve(osal_ioring_t *ior, size_t items, size_t bytes) { osal_ioring_prepare(osal_ioring_t *ior, size_t items, size_t bytes) {
items = (items > 32) ? items : 32; items = (items > 32) ? items : 32;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32) || defined(_WIN64)
const size_t npages = bytes >> ior->pagesize_ln2; if (ior->direct) {
items = (items > npages) ? items : npages; const size_t npages = bytes >> ior->pagesize_ln2;
items = (items > npages) ? items : npages;
}
#else #else
(void)bytes; (void)bytes;
#endif #endif