mirror of
https://github.com/isar/libmdbx.git
synced 2025-10-24 00:28:56 +08:00
mdbx: поддержка асинхронного ввода-вывода для Windows и подготовка к io_ring
(объединённые коммиты и исправления).
This commit is contained in:
624
src/osal.c
624
src/osal.c
@@ -1,4 +1,4 @@
|
||||
/* https://en.wikipedia.org/wiki/Operating_system_abstraction_layer */
|
||||
/* https://en.wikipedia.org/wiki/Operating_system_abstraction_layer */
|
||||
|
||||
/*
|
||||
* Copyright 2015-2022 Leonid Yuriev <leo@yuriev.ru>
|
||||
@@ -537,6 +537,596 @@ MDBX_INTERNAL_FUNC size_t osal_mb2w(wchar_t *dst, size_t dst_n, const char *src,
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
#define ior_alignment_mask (ior->pagesize - 1)
|
||||
#define OSAL_IOV_MAX (4096 / sizeof(ior_sgv_element))
|
||||
|
||||
static void ior_put_event(osal_ioring_t *ior, HANDLE event) {
|
||||
assert(event && event != INVALID_HANDLE_VALUE && event != ior);
|
||||
assert(ior->event_stack < ior->allocated);
|
||||
ior->event_pool[ior->event_stack] = event;
|
||||
ior->event_stack += 1;
|
||||
}
|
||||
|
||||
static HANDLE ior_get_event(osal_ioring_t *ior) {
|
||||
assert(ior->event_stack <= ior->allocated);
|
||||
if (ior->event_stack > 0) {
|
||||
ior->event_stack -= 1;
|
||||
assert(ior->event_pool[ior->event_stack] != 0);
|
||||
return ior->event_pool[ior->event_stack];
|
||||
}
|
||||
return CreateEventW(nullptr, true, false, nullptr);
|
||||
}
|
||||
|
||||
static void WINAPI ior_wocr(DWORD err, DWORD bytes, OVERLAPPED *ov) {
|
||||
osal_ioring_t *ior = ov->hEvent;
|
||||
ov->Internal = err;
|
||||
ov->InternalHigh = bytes;
|
||||
if (++ior->async_completed >= ior->async_waiting)
|
||||
SetEvent(ior->async_done);
|
||||
}
|
||||
|
||||
#elif MDBX_HAVE_PWRITEV
|
||||
#if defined(_SC_IOV_MAX)
|
||||
static size_t osal_iov_max;
|
||||
#define OSAL_IOV_MAX osal_iov_max
|
||||
#else
|
||||
#define OSAL_IOV_MAX IOV_MAX
|
||||
#endif
|
||||
#else
|
||||
#undef OSAL_IOV_MAX
|
||||
#endif /* OSAL_IOV_MAX */
|
||||
|
||||
MDBX_INTERNAL_FUNC int osal_ioring_create(osal_ioring_t *ior,
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
unsigned flags,
|
||||
#endif /* Windows */
|
||||
mdbx_filehandle_t fd) {
|
||||
memset(ior, 0, sizeof(osal_ioring_t));
|
||||
ior->fd = fd;
|
||||
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
ior->flags = flags;
|
||||
const unsigned pagesize = (unsigned)osal_syspagesize();
|
||||
ior->pagesize = pagesize;
|
||||
ior->pagesize_ln2 = (uint8_t)log2n_powerof2(pagesize);
|
||||
ior->async_done = ior_get_event(ior);
|
||||
if (!ior->async_done)
|
||||
return GetLastError();
|
||||
#endif /* !Windows */
|
||||
|
||||
#if MDBX_HAVE_PWRITEV && defined(_SC_IOV_MAX)
|
||||
if (!osal_iov_max)
|
||||
osal_iov_max = sysconf(_SC_IOV_MAX);
|
||||
#endif
|
||||
|
||||
ior->boundary = (char *)(ior->pool + ior->allocated);
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
static __inline size_t ior_offset(const ior_item_t *item) {
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
return item->ov.Offset | (size_t)((sizeof(size_t) > sizeof(item->ov.Offset))
|
||||
? (uint64_t)item->ov.OffsetHigh << 32
|
||||
: 0);
|
||||
#else
|
||||
return item->offset;
|
||||
#endif /* !Windows */
|
||||
}
|
||||
|
||||
static __inline ior_item_t *ior_next(ior_item_t *item, size_t sgvcnt) {
|
||||
#if defined(ior_sgv_element)
|
||||
assert(sgvcnt > 0);
|
||||
return (ior_item_t *)((char *)item + sizeof(ior_item_t) -
|
||||
sizeof(ior_sgv_element) +
|
||||
sizeof(ior_sgv_element) * sgvcnt);
|
||||
#else
|
||||
assert(sgvcnt == 1);
|
||||
(void)sgvcnt;
|
||||
return item + 1;
|
||||
#endif
|
||||
}
|
||||
|
||||
MDBX_INTERNAL_FUNC int osal_ioring_add(osal_ioring_t *ior, const size_t offset,
|
||||
void *data, const size_t bytes) {
|
||||
|
||||
assert(bytes && data);
|
||||
assert(bytes % MIN_PAGESIZE == 0 && bytes <= MAX_WRITE);
|
||||
assert(offset % MIN_PAGESIZE == 0 && offset + (uint64_t)bytes <= MAX_MAPSIZE);
|
||||
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
const unsigned segments = (unsigned)(bytes >> ior->pagesize_ln2);
|
||||
const bool use_gather =
|
||||
(ior->flags & IOR_UNBUFFERED) && ior->slots_left >= segments;
|
||||
#endif /* Windows */
|
||||
|
||||
ior_item_t *item = ior->pool;
|
||||
if (likely(ior->last)) {
|
||||
item = ior->last;
|
||||
if (unlikely(ior_offset(item) + ior_last_bytes(ior, item) == offset) &&
|
||||
likely(ior_last_bytes(ior, item) + bytes <= MAX_WRITE)) {
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
if (use_gather &&
|
||||
((bytes | (uintptr_t)data | ior->last_bytes |
|
||||
(uintptr_t)(uint64_t)item->sgv[0].Buffer) &
|
||||
ior_alignment_mask) == 0 &&
|
||||
ior->last_sgvcnt + segments < OSAL_IOV_MAX) {
|
||||
assert((item->single.iov_len & 1) == 0);
|
||||
assert(item->sgv[ior->last_sgvcnt].Buffer == 0);
|
||||
ior->last_bytes += bytes;
|
||||
size_t i = 0;
|
||||
do {
|
||||
item->sgv[ior->last_sgvcnt + i].Buffer = PtrToPtr64(data);
|
||||
data = (char *)data + ior->pagesize;
|
||||
} while (++i < segments);
|
||||
ior->slots_left -= segments;
|
||||
item->sgv[ior->last_sgvcnt += segments].Buffer = 0;
|
||||
assert((item->single.iov_len & 1) == 0);
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
const void *end =
|
||||
(char *)(item->single.iov_base) + item->single.iov_len - 1;
|
||||
if (unlikely(end == data)) {
|
||||
assert((item->single.iov_len & 1) != 0);
|
||||
item->single.iov_len += bytes;
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
#elif MDBX_HAVE_PWRITEV
|
||||
assert((int)item->sgvcnt > 0);
|
||||
const void *end = (char *)(item->sgv[item->sgvcnt - 1].iov_base) +
|
||||
item->sgv[item->sgvcnt - 1].iov_len;
|
||||
if (unlikely(end == data)) {
|
||||
item->sgv[item->sgvcnt - 1].iov_len += bytes;
|
||||
ior->last_bytes += bytes;
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
if (likely(item->sgvcnt < OSAL_IOV_MAX)) {
|
||||
if (unlikely(ior->slots_left < 1))
|
||||
return MDBX_RESULT_TRUE;
|
||||
item->sgv[item->sgvcnt].iov_base = data;
|
||||
item->sgv[item->sgvcnt].iov_len = bytes;
|
||||
ior->last_bytes += bytes;
|
||||
item->sgvcnt += 1;
|
||||
ior->slots_left -= 1;
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
#else
|
||||
const void *end = (char *)(item->single.iov_base) + item->single.iov_len;
|
||||
if (unlikely(end == data)) {
|
||||
item->single.iov_len += bytes;
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
item = ior_next(item, ior_last_sgvcnt(ior, item));
|
||||
}
|
||||
|
||||
if (unlikely(ior->slots_left < 1))
|
||||
return MDBX_RESULT_TRUE;
|
||||
|
||||
unsigned slots_used = 1;
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
item->ov.Internal = item->ov.InternalHigh = 0;
|
||||
item->ov.Offset = (DWORD)offset;
|
||||
item->ov.OffsetHigh = HIGH_DWORD(offset);
|
||||
item->ov.hEvent = 0;
|
||||
if (!use_gather || ((bytes | (uintptr_t)(data)) & ior_alignment_mask) != 0 ||
|
||||
segments > OSAL_IOV_MAX) {
|
||||
/* WriteFile() */
|
||||
item->single.iov_base = data;
|
||||
item->single.iov_len = bytes + 1;
|
||||
assert((item->single.iov_len & 1) != 0);
|
||||
} else {
|
||||
/* WriteFileGather() */
|
||||
item->sgv[0].Buffer = PtrToPtr64(data);
|
||||
for (size_t i = 1; i < segments; ++i) {
|
||||
data = (char *)data + ior->pagesize;
|
||||
item->sgv[slots_used].Buffer = PtrToPtr64(data);
|
||||
}
|
||||
item->sgv[slots_used].Buffer = 0;
|
||||
assert((item->single.iov_len & 1) == 0);
|
||||
slots_used = segments;
|
||||
}
|
||||
ior->last_bytes = bytes;
|
||||
ior_last_sgvcnt(ior, item) = slots_used;
|
||||
#elif MDBX_HAVE_PWRITEV
|
||||
item->offset = offset;
|
||||
item->sgv[0].iov_base = data;
|
||||
item->sgv[0].iov_len = bytes;
|
||||
ior->last_bytes = bytes;
|
||||
ior_last_sgvcnt(ior, item) = slots_used;
|
||||
#else
|
||||
item->offset = offset;
|
||||
item->single.iov_base = data;
|
||||
item->single.iov_len = bytes;
|
||||
#endif /* !Windows */
|
||||
ior->slots_left -= slots_used;
|
||||
ior->last = item;
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
MDBX_INTERNAL_FUNC void osal_ioring_walk(
|
||||
osal_ioring_t *ior, iov_ctx_t *ctx,
|
||||
void (*callback)(iov_ctx_t *ctx, size_t offset, void *data, size_t bytes)) {
|
||||
for (ior_item_t *item = ior->pool; item <= ior->last;) {
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
size_t offset = ior_offset(item);
|
||||
char *data = item->single.iov_base;
|
||||
size_t bytes = item->single.iov_len - 1;
|
||||
size_t i = 1;
|
||||
if (bytes & 1) {
|
||||
data = Ptr64ToPtr(item->sgv[0].Buffer);
|
||||
bytes = ior->pagesize;
|
||||
while (item->sgv[i].Buffer) {
|
||||
if (data + ior->pagesize != item->sgv[i].Buffer) {
|
||||
callback(ctx, offset, data, bytes);
|
||||
offset += bytes;
|
||||
data = Ptr64ToPtr(item->sgv[i].Buffer);
|
||||
bytes = 0;
|
||||
}
|
||||
bytes += ior->pagesize;
|
||||
++i;
|
||||
}
|
||||
}
|
||||
assert(bytes < MAX_WRITE);
|
||||
callback(ctx, offset, data, bytes);
|
||||
#elif MDBX_HAVE_PWRITEV
|
||||
assert(item->sgvcnt > 0);
|
||||
size_t offset = item->offset;
|
||||
size_t i = 0;
|
||||
do {
|
||||
callback(ctx, offset, item->sgv[i].iov_base, item->sgv[i].iov_len);
|
||||
offset += item->sgv[i].iov_len;
|
||||
} while (++i != item->sgvcnt);
|
||||
#else
|
||||
const size_t i = 1;
|
||||
callback(ctx, item->offset, item->single.iov_base, item->single.iov_len);
|
||||
#endif
|
||||
item = ior_next(item, i);
|
||||
}
|
||||
}
|
||||
|
||||
MDBX_INTERNAL_FUNC osal_ioring_write_result_t
|
||||
osal_ioring_write(osal_ioring_t *ior) {
|
||||
osal_ioring_write_result_t r = {MDBX_SUCCESS, 0};
|
||||
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
HANDLE *const end_wait_for =
|
||||
ior->event_pool + ior->allocated +
|
||||
/* был выделен один дополнительный элемент для async_done */ 1;
|
||||
HANDLE *wait_for = end_wait_for;
|
||||
LONG async_started = 0;
|
||||
for (ior_item_t *item = ior->pool; item <= ior->last;) {
|
||||
item->ov.Internal = STATUS_PENDING;
|
||||
size_t i = 1, bytes = item->single.iov_len - 1;
|
||||
r.wops += 1;
|
||||
if (bytes & 1) {
|
||||
bytes = ior->pagesize;
|
||||
while (item->sgv[i].Buffer) {
|
||||
bytes += ior->pagesize;
|
||||
++i;
|
||||
}
|
||||
assert(bytes < MAX_WRITE);
|
||||
item->ov.hEvent = ior_get_event(ior);
|
||||
if (unlikely(!item->ov.hEvent)) {
|
||||
bailout_geterr:
|
||||
r.err = GetLastError();
|
||||
bailout_rc:
|
||||
assert(r.err != MDBX_SUCCESS);
|
||||
CancelIo(ior->fd);
|
||||
return r;
|
||||
}
|
||||
if (WriteFileGather(ior->fd, item->sgv, (DWORD)bytes, nullptr,
|
||||
&item->ov)) {
|
||||
assert(item->ov.Internal == 0 &&
|
||||
WaitForSingleObject(item->ov.hEvent, 0) == WAIT_OBJECT_0);
|
||||
ior_put_event(ior, item->ov.hEvent);
|
||||
item->ov.hEvent = 0;
|
||||
} else {
|
||||
r.err = (int)GetLastError();
|
||||
if (unlikely(r.err != ERROR_IO_PENDING)) {
|
||||
ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
|
||||
", err %d",
|
||||
"WriteFileGather", ior->fd, item, item - ior->pool,
|
||||
((MDBX_page *)item->single.iov_base)->mp_pgno, bytes,
|
||||
item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), r.err);
|
||||
goto bailout_rc;
|
||||
}
|
||||
assert(wait_for > ior->event_pool + ior->event_stack);
|
||||
*--wait_for = item->ov.hEvent;
|
||||
}
|
||||
} else if (ior->flags & IOR_OVERLAPPED) {
|
||||
assert(bytes < MAX_WRITE);
|
||||
retry:
|
||||
item->ov.hEvent = ior;
|
||||
if (WriteFileEx(ior->fd, item->single.iov_base, (DWORD)bytes, &item->ov,
|
||||
ior_wocr)) {
|
||||
async_started += 1;
|
||||
} else {
|
||||
r.err = (int)GetLastError();
|
||||
switch (r.err) {
|
||||
default:
|
||||
ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
|
||||
", err %d",
|
||||
"WriteFileEx", ior->fd, item, item - ior->pool,
|
||||
((MDBX_page *)item->single.iov_base)->mp_pgno, bytes,
|
||||
item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), r.err);
|
||||
goto bailout_rc;
|
||||
case ERROR_NOT_FOUND:
|
||||
case ERROR_USER_MAPPED_FILE:
|
||||
case ERROR_LOCK_VIOLATION:
|
||||
WARNING(
|
||||
"%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
|
||||
", err %d",
|
||||
"WriteFileEx", ior->fd, item, item - ior->pool,
|
||||
((MDBX_page *)item->single.iov_base)->mp_pgno, bytes,
|
||||
item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), r.err);
|
||||
SleepEx(0, true);
|
||||
goto retry;
|
||||
case ERROR_INVALID_USER_BUFFER:
|
||||
case ERROR_NOT_ENOUGH_MEMORY:
|
||||
if (SleepEx(0, true) == WAIT_IO_COMPLETION)
|
||||
goto retry;
|
||||
goto bailout_rc;
|
||||
case ERROR_IO_PENDING:
|
||||
async_started += 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert(bytes < MAX_WRITE);
|
||||
DWORD written = 0;
|
||||
if (!WriteFile(ior->fd, item->single.iov_base, (DWORD)bytes, &written,
|
||||
&item->ov)) {
|
||||
r.err = (int)GetLastError();
|
||||
ERROR("%s: fd %p, item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
|
||||
", err %d",
|
||||
"WriteFile", ior->fd, item, item - ior->pool,
|
||||
((MDBX_page *)item->single.iov_base)->mp_pgno, bytes,
|
||||
item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32), r.err);
|
||||
goto bailout_rc;
|
||||
} else if (unlikely(written != bytes)) {
|
||||
r.err = ERROR_WRITE_FAULT;
|
||||
goto bailout_rc;
|
||||
}
|
||||
}
|
||||
item = ior_next(item, i);
|
||||
}
|
||||
|
||||
assert(ior->async_waiting > ior->async_completed &&
|
||||
ior->async_waiting == INT_MAX);
|
||||
ior->async_waiting = async_started;
|
||||
if (async_started > ior->async_completed && end_wait_for == wait_for) {
|
||||
assert(wait_for > ior->event_pool + ior->event_stack);
|
||||
*--wait_for = ior->async_done;
|
||||
}
|
||||
|
||||
const size_t pending_count = end_wait_for - wait_for;
|
||||
if (pending_count) {
|
||||
/* Ждем до MAXIMUM_WAIT_OBJECTS (64) последних хендлов, а после избирательно
|
||||
* ждем посредством GetOverlappedResult(), если какие-то более ранние
|
||||
* элементы еще не завершены. В целом, так получается меньше системных
|
||||
* вызовов, т.е. меньше накладных расходов. Однако, не факт что эта экономия
|
||||
* не будет перекрыта неэффективностью реализации
|
||||
* WaitForMultipleObjectsEx(), но тогда это проблемы на стороне M$. */
|
||||
DWORD madness;
|
||||
do
|
||||
madness = WaitForMultipleObjectsEx((pending_count < MAXIMUM_WAIT_OBJECTS)
|
||||
? (DWORD)pending_count
|
||||
: MAXIMUM_WAIT_OBJECTS,
|
||||
wait_for, true,
|
||||
/* сутки */ 86400000ul, true);
|
||||
while (madness == WAIT_IO_COMPLETION);
|
||||
STATIC_ASSERT(WAIT_OBJECT_0 == 0);
|
||||
if (/* madness >= WAIT_OBJECT_0 && */
|
||||
madness < WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS)
|
||||
r.err = MDBX_SUCCESS;
|
||||
else if (madness >= WAIT_ABANDONED_0 &&
|
||||
madness < WAIT_ABANDONED_0 + MAXIMUM_WAIT_OBJECTS) {
|
||||
r.err = ERROR_ABANDONED_WAIT_0;
|
||||
goto bailout_rc;
|
||||
} else if (madness == WAIT_TIMEOUT) {
|
||||
r.err = WAIT_TIMEOUT;
|
||||
goto bailout_rc;
|
||||
} else {
|
||||
r.err = /* madness == WAIT_FAILED */ MDBX_PROBLEM;
|
||||
goto bailout_rc;
|
||||
}
|
||||
|
||||
assert(ior->async_waiting == ior->async_completed);
|
||||
for (ior_item_t *item = ior->pool; item <= ior->last;) {
|
||||
size_t i = 1, bytes = item->single.iov_len - 1;
|
||||
if (bytes & 1) {
|
||||
bytes = ior->pagesize;
|
||||
while (item->sgv[i].Buffer) {
|
||||
bytes += ior->pagesize;
|
||||
++i;
|
||||
}
|
||||
if (!HasOverlappedIoCompleted(&item->ov)) {
|
||||
DWORD written = 0;
|
||||
if (unlikely(
|
||||
!GetOverlappedResult(ior->fd, &item->ov, &written, true))) {
|
||||
ERROR("%s: item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
|
||||
", err %d",
|
||||
"GetOverlappedResult", item, item - ior->pool,
|
||||
((MDBX_page *)item->single.iov_base)->mp_pgno, bytes,
|
||||
item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32),
|
||||
GetLastError());
|
||||
goto bailout_geterr;
|
||||
}
|
||||
assert(MDBX_SUCCESS == item->ov.Internal);
|
||||
assert(written == item->ov.InternalHigh);
|
||||
}
|
||||
} else {
|
||||
assert(HasOverlappedIoCompleted(&item->ov));
|
||||
}
|
||||
assert(item->ov.Internal != ERROR_IO_PENDING);
|
||||
if (unlikely(item->ov.Internal != MDBX_SUCCESS)) {
|
||||
DWORD written = 0;
|
||||
r.err = (int)item->ov.Internal;
|
||||
if ((r.err & 0x80000000) &&
|
||||
GetOverlappedResult(NULL, &item->ov, &written, true))
|
||||
r.err = (int)GetLastError();
|
||||
ERROR("%s: item %p (%zu), pgno %u, bytes %zu, offset %" PRId64
|
||||
", err %d",
|
||||
"Result", item, item - ior->pool,
|
||||
((MDBX_page *)item->single.iov_base)->mp_pgno, bytes,
|
||||
item->ov.Offset + ((uint64_t)item->ov.OffsetHigh << 32),
|
||||
GetLastError());
|
||||
goto bailout_rc;
|
||||
}
|
||||
if (unlikely(item->ov.InternalHigh != bytes)) {
|
||||
r.err = ERROR_WRITE_FAULT;
|
||||
goto bailout_rc;
|
||||
}
|
||||
item = ior_next(item, i);
|
||||
}
|
||||
assert(ior->async_waiting == ior->async_completed);
|
||||
} else {
|
||||
assert(r.err == MDBX_SUCCESS);
|
||||
}
|
||||
assert(ior->async_waiting == ior->async_completed);
|
||||
|
||||
#else
|
||||
STATIC_ASSERT_MSG(sizeof(off_t) >= sizeof(size_t),
|
||||
"libmdbx requires 64-bit file I/O on 64-bit systems");
|
||||
for (ior_item_t *item = ior->pool; item <= ior->last;) {
|
||||
#if MDBX_HAVE_PWRITEV
|
||||
assert(item->sgvcnt > 0);
|
||||
if (item->sgvcnt == 1)
|
||||
r.err = osal_pwrite(ior->fd, item->sgv[0].iov_base, item->sgv[0].iov_len,
|
||||
item->offset);
|
||||
else
|
||||
r.err = osal_pwritev(ior->fd, item->sgv, item->sgvcnt, item->offset);
|
||||
|
||||
// TODO: io_uring_prep_write(sqe, fd, ...);
|
||||
|
||||
item = ior_next(item, item->sgvcnt);
|
||||
#else
|
||||
r.err = osal_pwrite(ior->fd, item->single.iov_base, item->single.iov_len,
|
||||
item->offset);
|
||||
item = ior_next(item, 1);
|
||||
#endif
|
||||
r.wops += 1;
|
||||
if (unlikely(r.err != MDBX_SUCCESS))
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: io_uring_submit(&ring)
|
||||
// TODO: err = io_uring_wait_cqe(&ring, &cqe);
|
||||
// TODO: io_uring_cqe_seen(&ring, cqe);
|
||||
|
||||
#endif /* !Windows */
|
||||
return r;
|
||||
}
|
||||
|
||||
MDBX_INTERNAL_FUNC void osal_ioring_reset(osal_ioring_t *ior) {
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
if (ior->last) {
|
||||
for (ior_item_t *item = ior->pool; item <= ior->last;) {
|
||||
if (!HasOverlappedIoCompleted(&item->ov))
|
||||
CancelIoEx(ior->fd, &item->ov);
|
||||
if (item->ov.hEvent && item->ov.hEvent != ior)
|
||||
ior_put_event(ior, item->ov.hEvent);
|
||||
size_t i = 1;
|
||||
if ((item->single.iov_len & 1) == 0)
|
||||
while (item->sgv[i].Buffer)
|
||||
++i;
|
||||
item = ior_next(item, i);
|
||||
}
|
||||
}
|
||||
ior->async_waiting = INT_MAX;
|
||||
ior->async_completed = 0;
|
||||
ResetEvent(ior->async_done);
|
||||
#endif /* !Windows */
|
||||
ior->slots_left = ior->allocated;
|
||||
ior->last = nullptr;
|
||||
}
|
||||
|
||||
static void ior_cleanup(osal_ioring_t *ior, const size_t since) {
|
||||
osal_ioring_reset(ior);
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
for (size_t i = since; i < ior->event_stack; ++i)
|
||||
CloseHandle(ior->event_pool[i]);
|
||||
ior->event_stack = 0;
|
||||
#else
|
||||
(void)since;
|
||||
#endif /* Windows */
|
||||
}
|
||||
|
||||
MDBX_INTERNAL_FUNC int osal_ioring_resize(osal_ioring_t *ior, size_t items) {
|
||||
assert(items > 0 && items < INT_MAX / sizeof(ior_item_t));
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
if (ior->state & IOR_STATE_LOCKED)
|
||||
return MDBX_SUCCESS;
|
||||
const bool useSetFileIoOverlappedRange = (ior->flags & IOR_OVERLAPPED) &&
|
||||
mdbx_SetFileIoOverlappedRange &&
|
||||
items > 7;
|
||||
const size_t ceiling =
|
||||
useSetFileIoOverlappedRange
|
||||
? ((items < 65536 / 2 / sizeof(ior_item_t)) ? 65536 : 65536 * 4)
|
||||
: 4096;
|
||||
const size_t bytes = ceil_powerof2(sizeof(ior_item_t) * items, ceiling);
|
||||
items = bytes / sizeof(ior_item_t);
|
||||
#endif /* Windows */
|
||||
|
||||
if (items != ior->allocated) {
|
||||
assert(items >= osal_ioring_used(ior));
|
||||
if (items < ior->allocated)
|
||||
ior_cleanup(ior, items);
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
void *ptr = osal_realloc(
|
||||
ior->event_pool,
|
||||
(items + /* extra for waiting the async_done */ 1) * sizeof(HANDLE));
|
||||
if (unlikely(!ptr))
|
||||
return MDBX_ENOMEM;
|
||||
ior->event_pool = ptr;
|
||||
|
||||
int err = osal_memalign_alloc(ceiling, bytes, &ptr);
|
||||
if (unlikely(err != MDBX_SUCCESS))
|
||||
return err;
|
||||
if (ior->pool) {
|
||||
memcpy(ptr, ior->pool, ior->allocated * sizeof(ior_item_t));
|
||||
osal_memalign_free(ior->pool);
|
||||
}
|
||||
#else
|
||||
void *ptr = osal_realloc(ior->pool, sizeof(ior_item_t) * items);
|
||||
if (unlikely(!ptr))
|
||||
return MDBX_ENOMEM;
|
||||
#endif
|
||||
ior->pool = ptr;
|
||||
|
||||
if (items > ior->allocated)
|
||||
memset(ior->pool + ior->allocated, 0,
|
||||
sizeof(ior_item_t) * (items - ior->allocated));
|
||||
ior->allocated = (unsigned)items;
|
||||
ior->boundary = (char *)(ior->pool + ior->allocated);
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
if (useSetFileIoOverlappedRange) {
|
||||
if (mdbx_SetFileIoOverlappedRange(ior->fd, ptr, (ULONG)bytes))
|
||||
ior->state += IOR_STATE_LOCKED;
|
||||
else
|
||||
return GetLastError();
|
||||
}
|
||||
#endif /* Windows */
|
||||
}
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
MDBX_INTERNAL_FUNC void osal_ioring_destroy(osal_ioring_t *ior) {
|
||||
if (ior->allocated)
|
||||
ior_cleanup(ior, 0);
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
osal_memalign_free(ior->pool);
|
||||
osal_free(ior->event_pool);
|
||||
CloseHandle(ior->async_done);
|
||||
#else
|
||||
osal_free(ior->pool);
|
||||
#endif
|
||||
memset(ior, -1, sizeof(osal_ioring_t));
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
MDBX_INTERNAL_FUNC int osal_removefile(const pathchar_t *pathname) {
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
return DeleteFileW(pathname) ? MDBX_SUCCESS : (int)GetLastError();
|
||||
@@ -589,17 +1179,21 @@ MDBX_INTERNAL_FUNC int osal_openfile(const enum osal_openfile_purpose purpose,
|
||||
case MDBX_OPEN_DXB_LAZY:
|
||||
DesiredAccess |= GENERIC_READ | GENERIC_WRITE;
|
||||
break;
|
||||
case MDBX_OPEN_DXB_OVERLAPPED:
|
||||
FlagsAndAttributes |= FILE_FLAG_OVERLAPPED;
|
||||
/* fall through */
|
||||
__fallthrough;
|
||||
case MDBX_OPEN_DXB_DSYNC:
|
||||
CreationDisposition = OPEN_EXISTING;
|
||||
DesiredAccess |= GENERIC_WRITE;
|
||||
DesiredAccess |= GENERIC_WRITE | GENERIC_READ;
|
||||
FlagsAndAttributes |= FILE_FLAG_WRITE_THROUGH;
|
||||
break;
|
||||
case MDBX_OPEN_COPY:
|
||||
CreationDisposition = CREATE_NEW;
|
||||
ShareMode = 0;
|
||||
DesiredAccess |= GENERIC_WRITE;
|
||||
FlagsAndAttributes |=
|
||||
(env->me_psize < env->me_os_psize) ? 0 : FILE_FLAG_NO_BUFFERING;
|
||||
if (env->me_psize >= env->me_os_psize)
|
||||
FlagsAndAttributes |= FILE_FLAG_NO_BUFFERING;
|
||||
break;
|
||||
case MDBX_OPEN_DELETE:
|
||||
CreationDisposition = OPEN_EXISTING;
|
||||
@@ -878,28 +1472,30 @@ MDBX_INTERNAL_FUNC int osal_write(mdbx_filehandle_t fd, const void *buf,
|
||||
}
|
||||
}
|
||||
|
||||
int osal_pwritev(mdbx_filehandle_t fd, struct iovec *iov, int iovcnt,
|
||||
uint64_t offset, size_t expected_written) {
|
||||
#if defined(_WIN32) || defined(_WIN64) || defined(__APPLE__) || \
|
||||
(defined(__ANDROID_API__) && __ANDROID_API__ < 24)
|
||||
int osal_pwritev(mdbx_filehandle_t fd, struct iovec *iov, int sgvcnt,
|
||||
uint64_t offset) {
|
||||
size_t expected = 0;
|
||||
for (int i = 0; i < sgvcnt; ++i)
|
||||
expected += iov[i].iov_len;
|
||||
#if !MDBX_HAVE_PWRITEV
|
||||
size_t written = 0;
|
||||
for (int i = 0; i < iovcnt; ++i) {
|
||||
for (int i = 0; i < sgvcnt; ++i) {
|
||||
int rc = osal_pwrite(fd, iov[i].iov_base, iov[i].iov_len, offset);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
written += iov[i].iov_len;
|
||||
offset += iov[i].iov_len;
|
||||
}
|
||||
return (expected_written == written) ? MDBX_SUCCESS
|
||||
: MDBX_EIO /* ERROR_WRITE_FAULT */;
|
||||
return (expected == written) ? MDBX_SUCCESS
|
||||
: MDBX_EIO /* ERROR_WRITE_FAULT */;
|
||||
#else
|
||||
int rc;
|
||||
intptr_t written;
|
||||
do {
|
||||
STATIC_ASSERT_MSG(sizeof(off_t) >= sizeof(size_t),
|
||||
"libmdbx requires 64-bit file I/O on 64-bit systems");
|
||||
written = pwritev(fd, iov, iovcnt, offset);
|
||||
if (likely(expected_written == (size_t)written))
|
||||
written = pwritev(fd, iov, sgvcnt, offset);
|
||||
if (likely(expected == (size_t)written))
|
||||
return MDBX_SUCCESS;
|
||||
rc = errno;
|
||||
} while (rc == EINTR);
|
||||
@@ -1066,7 +1662,7 @@ MDBX_INTERNAL_FUNC int osal_thread_join(osal_thread_t thread) {
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
MDBX_INTERNAL_FUNC int osal_msync(osal_mmap_t *map, size_t offset,
|
||||
MDBX_INTERNAL_FUNC int osal_msync(const osal_mmap_t *map, size_t offset,
|
||||
size_t length,
|
||||
enum osal_syncmode_bits mode_bits) {
|
||||
uint8_t *ptr = (uint8_t *)map->address + offset;
|
||||
|
Reference in New Issue
Block a user