mdbx: shrinking memory-mapping for Windows (initial).

Change-Id: I3c3c2df0747c788ea36d7764c1e8139098ca62be
This commit is contained in:
Leo Yuriev 2018-01-07 14:37:38 +03:00
parent f10f5b376f
commit 30bd7d3078
5 changed files with 301 additions and 67 deletions

View File

@ -740,6 +740,12 @@ struct MDBX_env {
size_t grow; /* step to grow datafile */
size_t shrink; /* threshold to shrink datafile */
} me_dbgeo; /* */
#if defined(_WIN32) || defined(_WIN64)
SRWLOCK me_remap_guard;
#else
mdbx_fastmutex_t me_remap_guard;
#endif
};
/* Nested transaction */

View File

@ -152,6 +152,7 @@ void mdbx_txn_unlock(MDBX_env *env) {
#define LCK_UPPER LCK_UP_OFFSET, LCK_UP_LEN
int mdbx_rdt_lock(MDBX_env *env) {
AcquireSRWLockShared(&env->me_remap_guard);
if (env->me_lfd == INVALID_HANDLE_VALUE)
return MDBX_SUCCESS; /* readonly database in readonly filesystem */
@ -167,6 +168,118 @@ void mdbx_rdt_unlock(MDBX_env *env) {
if (!funlock(env->me_lfd, LCK_UPPER))
mdbx_panic("%s failed: errcode %u", mdbx_func_, GetLastError());
}
ReleaseSRWLockShared(&env->me_remap_guard);
}
static int suspend_and_append(mdbx_handle_array_t **array,
const DWORD ThreadId) {
const unsigned limit = (*array)->limit;
if ((*array)->count == limit) {
void *ptr = realloc((limit > ARRAY_LENGTH((*array)->handles))
? *array
: /* don't free initial array on the stack */ NULL,
sizeof(mdbx_handle_array_t) +
sizeof(HANDLE) *
(limit * 2 - ARRAY_LENGTH((*array)->handles)));
if (!ptr)
return MDBX_ENOMEM;
(*array) = (mdbx_handle_array_t *)ptr;
(*array)->limit = limit * 2;
}
HANDLE hThread = OpenThread(THREAD_SUSPEND_RESUME, FALSE, ThreadId);
if (hThread == NULL)
return GetLastError();
if (SuspendThread(hThread) == -1) {
CloseHandle(hThread);
return GetLastError();
}
(*array)->handles[(*array)->count++] = hThread;
return MDBX_SUCCESS;
}
int mdbx_suspend_threads_before_remap(MDBX_env *env,
mdbx_handle_array_t **array) {
const mdbx_pid_t CurrentTid = GetCurrentThreadId();
int rc;
if (env->me_lck) {
/* Scan LCK for threads of the current process */
const MDBX_reader *const begin = env->me_lck->mti_readers;
const MDBX_reader *const end = begin + env->me_lck->mti_numreaders;
const mdbx_tid_t WriteTxnOwner = env->me_txn0 ? env->me_txn0->mt_owner : 0;
for (const MDBX_reader *reader = begin; reader < end; ++reader) {
if (reader->mr_pid != env->me_pid || reader->mr_tid == CurrentTid ||
reader->mr_tid == WriteTxnOwner)
continue;
if (env->me_flags & MDBX_NOTLS) {
/* Skip duplicates in no-tls mode */
const MDBX_reader *scan = reader;
while (--scan >= begin)
if (scan->mr_tid == reader->mr_tid)
break;
if (scan >= reader)
continue;
}
rc = suspend_and_append(array, reader->mr_tid);
if (rc != MDBX_SUCCESS) {
bailout_lck:
(void)mdbx_resume_threads_after_remap(*array);
return rc;
}
}
if (WriteTxnOwner && WriteTxnOwner != CurrentTid) {
rc = suspend_and_append(array, WriteTxnOwner);
if (rc != MDBX_SUCCESS)
goto bailout_lck;
}
} else {
/* Without LCK (i.e. read-only mode).
* Walk thougth a snapshot of all running threads */
const HANDLE hThreadSnap = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
if (hThreadSnap == INVALID_HANDLE_VALUE)
return GetLastError();
THREADENTRY32 entry;
entry.dwSize = sizeof(THREADENTRY32);
if (!Thread32First(hThreadSnap, &entry)) {
rc = GetLastError();
bailout_toolhelp:
CloseHandle(hThreadSnap);
(void)mdbx_resume_threads_after_remap(*array);
return rc;
}
do {
if (entry.th32OwnerProcessID != env->me_pid ||
entry.th32ThreadID == CurrentTid)
continue;
rc = suspend_and_append(array, entry.th32ThreadID);
if (rc != MDBX_SUCCESS)
goto bailout_toolhelp;
} while (Thread32Next(hThreadSnap, &entry));
rc = GetLastError();
if (rc != ERROR_NO_MORE_FILES)
goto bailout_toolhelp;
}
return MDBX_SUCCESS;
}
int mdbx_resume_threads_after_remap(mdbx_handle_array_t *array) {
int rc = MDBX_SUCCESS;
for (unsigned i = 0; i < array->count; ++i) {
if (ResumeThread(array->handles[i]) == -1)
rc = GetLastError();
CloseHandle(array->handles[i]);
}
return rc;
}
/*----------------------------------------------------------------------------*/

View File

@ -1632,27 +1632,55 @@ static int mdbx_mapresize(MDBX_env *env, const pgno_t size_pgno,
mdbx_assert(env, limit_bytes >= size_bytes);
mdbx_assert(env, bytes2pgno(env, size_bytes) == size_pgno);
mdbx_assert(env, bytes2pgno(env, limit_bytes) == limit_pgno);
const int rc =
mdbx_mresize(env->me_flags, &env->me_dxb_mmap, size_bytes, limit_bytes);
#if defined(_WIN32) || defined(_WIN64)
/* Acquire guard in exclusive mode for:
* - to avoid collision between read and write txns around env->me_dbgeo;
* - to avoid attachment of new reading threads (see mdbx_rdt_lock); */
AcquireSRWLockExclusive(&env->me_remap_guard);
mdbx_handle_array_t *suspended = NULL;
mdbx_handle_array_t array_onstack;
int rc = MDBX_SUCCESS;
if (limit_bytes == env->me_dxb_mmap.length &&
size_bytes == env->me_dxb_mmap.current &&
env->me_dxb_mmap.current == env->me_dxb_mmap.filesize)
goto bailout;
if ((env->me_flags & MDBX_RDONLY) || limit_bytes != env->me_dxb_mmap.length ||
size_bytes < env->me_dxb_mmap.current) {
/* Windows allows only extending a read-write section, but not a
* corresponing mapped view. Therefore in other cases we must suspend
* the local threads for safe remap. */
array_onstack.limit = ARRAY_LENGTH(array_onstack.handles);
array_onstack.count = 0;
suspended = &array_onstack;
rc = mdbx_suspend_threads_before_remap(env, &suspended);
if (rc != MDBX_SUCCESS) {
mdbx_error("failed suspend-for-remap: errcode %d", rc);
goto bailout;
}
}
#else
/* Acquire guard to avoid collision between read and write txns
* around env->me_dbgeo */
int rc = mdbx_fastmutex_acquire(&env->me_remap_guard);
if (rc != MDBX_SUCCESS)
return rc;
if (limit_bytes == env->me_dxb_mmap.length &&
bytes2pgno(env, size_bytes) == env->me_dbgeo.now)
goto bailout;
#endif /* Windows */
rc = mdbx_mresize(env->me_flags, &env->me_dxb_mmap, size_bytes, limit_bytes);
bailout:
if (rc == MDBX_SUCCESS) {
env->me_dbgeo.now = size_bytes;
env->me_dbgeo.upper = limit_bytes;
} else if (rc != MDBX_RESULT_TRUE) {
mdbx_error("failed resize datafile/mapping: "
"present %" PRIuPTR " -> %" PRIuPTR ", "
"limit %" PRIuPTR " -> %" PRIuPTR ", errcode %d",
env->me_dbgeo.now, size_bytes, env->me_dbgeo.upper, limit_bytes,
rc);
return rc;
} else {
mdbx_notice("unable resize datafile/mapping: "
"present %" PRIuPTR " -> %" PRIuPTR ", "
"limit %" PRIuPTR " -> %" PRIuPTR ", errcode %d",
env->me_dbgeo.now, size_bytes, env->me_dbgeo.upper, limit_bytes,
rc);
if (env->me_txn) {
mdbx_tassert(env->me_txn, size_pgno >= env->me_txn->mt_next_pgno);
env->me_txn->mt_end_pgno = size_pgno;
}
#ifdef USE_VALGRIND
if (prev_mapsize != env->me_mapsize || prev_mapaddr != env->me_map) {
VALGRIND_DISCARD(env->me_valgrind_handle);
@ -1662,12 +1690,36 @@ static int mdbx_mapresize(MDBX_env *env, const pgno_t size_pgno,
VALGRIND_CREATE_BLOCK(env->me_map, env->me_mapsize, "mdbx");
}
#endif
if (env->me_txn) {
mdbx_tassert(env->me_txn, size_pgno >= env->me_txn->mt_next_pgno);
env->me_txn->mt_end_pgno = size_pgno;
} else if (rc != MDBX_RESULT_TRUE) {
mdbx_error("failed resize datafile/mapping: "
"present %" PRIuPTR " -> %" PRIuPTR ", "
"limit %" PRIuPTR " -> %" PRIuPTR ", errcode %d",
env->me_dbgeo.now, size_bytes, env->me_dbgeo.upper, limit_bytes,
rc);
} else {
mdbx_notice("unable resize datafile/mapping: "
"present %" PRIuPTR " -> %" PRIuPTR ", "
"limit %" PRIuPTR " -> %" PRIuPTR ", errcode %d",
env->me_dbgeo.now, size_bytes, env->me_dbgeo.upper, limit_bytes,
rc);
}
return MDBX_SUCCESS;
#if defined(_WIN32) || defined(_WIN64)
int err = MDBX_SUCCESS;
ReleaseSRWLockExclusive(&env->me_remap_guard);
if (suspended) {
err = mdbx_resume_threads_after_remap(suspended);
if (suspended != &array_onstack)
free(suspended);
}
#else
int err = mdbx_fastmutex_release(&env->me_remap_guard);
#endif /* Windows */
if (err != MDBX_SUCCESS) {
mdbx_fatal("failed resume-after-remap: errcode %d", err);
return MDBX_PANIC;
}
return rc;
}
/* Allocate page numbers and memory for writing. Maintain me_last_reclaimed,
@ -4109,9 +4161,6 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
}
}
#if defined(_WIN32) || defined(_WIN64)
/* Windows is unable shrinking a mapped file */
#else
/* LY: check conditions to shrink datafile */
const pgno_t backlog_gap =
pending->mm_dbs[FREE_DBI].md_depth + mdbx_backlog_extragap(env);
@ -4133,7 +4182,6 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
mdbx_meta_set_txnid(env, pending, pending->mm_txnid_a + 1);
}
}
#endif /* not a Windows */
/* Steady or Weak */
if (env->me_sync_pending == 0) {
@ -4281,9 +4329,6 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
}
}
#if defined(_WIN32) || defined(_WIN64)
/* Windows is unable shrinking a mapped file */
#else
/* LY: shrink datafile if needed */
if (unlikely(shrink)) {
mdbx_info("shrink to %" PRIaPGNO " pages (-%" PRIaPGNO ")",
@ -4292,7 +4337,6 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
if (MDBX_IS_ERROR(rc))
goto fail;
}
#endif /* not a Windows */
return MDBX_SUCCESS;
@ -4384,6 +4428,16 @@ int __cold mdbx_env_create(MDBX_env **penv) {
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
#if defined(_WIN32) || defined(_WIN64)
InitializeSRWLock(&env->me_remap_guard);
#else
rc = mdbx_fastmutex_init(&env->me_remap_guard);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_fastmutex_destroy(&env->me_dbi_lock);
goto bailout;
}
#endif /* Windows */
VALGRIND_CREATE_MEMPOOL(env, 0, 0);
env->me_signature = MDBX_ME_SIGNATURE;
*penv = env;
@ -4501,13 +4555,6 @@ LIBMDBX_API int mdbx_env_set_geometry(MDBX_env *env, intptr_t size_lower,
}
if ((size_t)size_now < usedbytes)
size_now = usedbytes;
#if defined(_WIN32) || defined(_WIN64)
if ((size_t)size_now < env->me_dbgeo.now ||
(size_t)size_upper < env->me_dbgeo.upper) {
/* Windows is unable shrinking a mapped file */
return ERROR_USER_MAPPED_FILE;
}
#endif /* Windows */
} else {
/* env NOT yet mapped */
if (unlikely(inside_txn))

View File

@ -768,21 +768,15 @@ int mdbx_msync(mdbx_mmap_t *map, size_t offset, size_t length, int async) {
#endif
}
int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t must, size_t limit) {
assert(must <= limit);
int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t size, size_t limit) {
assert(size <= limit);
#if defined(_WIN32) || defined(_WIN64)
NTSTATUS rc;
map->length = 0;
map->current = 0;
map->section = NULL;
map->address = nullptr;
uint64_t filesize;
rc = mdbx_filesize(map->fd, &filesize);
if (rc != MDBX_SUCCESS)
return rc;
if (GetFileType(map->fd) != FILE_TYPE_DISK)
return ERROR_FILE_OFFLINE;
@ -861,13 +855,19 @@ int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t must, size_t limit) {
}
}
if (filesize > must) {
rc = mdbx_ftruncate(map->fd, must);
(void)rc /* ignore error, because Windows unable shrink mapped file */;
rc = mdbx_filesize(map->fd, &map->filesize);
if (rc != MDBX_SUCCESS)
return rc;
if ((flags & MDBX_RDONLY) == 0 && map->filesize != size) {
rc = mdbx_ftruncate(map->fd, size);
if (rc == MDBX_SUCCESS)
map->filesize = size;
/* ignore error, because Windows unable shrink file
* that already mapped (by another process) */;
}
LARGE_INTEGER SectionSize;
SectionSize.QuadPart = must;
SectionSize.QuadPart = size;
rc = NtCreateSection(
&map->section,
/* DesiredAccess */ (flags & MDBX_WRITEMAP)
@ -878,7 +878,6 @@ int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t must, size_t limit) {
/* SectionPageProtection */ (flags & MDBX_RDONLY) ? PAGE_READONLY
: PAGE_READWRITE,
/* AllocationAttributes */ SEC_RESERVE, map->fd);
if (!NT_SUCCESS(rc))
return ntstatus2errcode(rc);
@ -892,7 +891,6 @@ int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t must, size_t limit) {
/* AllocationType */ (flags & MDBX_RDONLY) ? 0 : MEM_RESERVE,
/* Win32Protect */ (flags & MDBX_WRITEMAP) ? PAGE_READWRITE
: PAGE_READONLY);
if (!NT_SUCCESS(rc)) {
NtClose(map->section);
map->section = 0;
@ -905,7 +903,7 @@ int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t must, size_t limit) {
map->length = ViewSize;
return MDBX_SUCCESS;
#else
(void)must;
(void)size;
map->address = mmap(
NULL, limit, (flags & MDBX_WRITEMAP) ? PROT_READ | PROT_WRITE : PROT_READ,
MAP_SHARED, map->fd, 0);
@ -938,31 +936,88 @@ int mdbx_munmap(mdbx_mmap_t *map) {
return MDBX_SUCCESS;
}
int mdbx_mresize(int flags, mdbx_mmap_t *map, size_t atleast, size_t limit) {
assert(atleast <= limit);
int mdbx_mresize(int flags, mdbx_mmap_t *map, size_t size, size_t limit) {
assert(size <= limit);
assert(size != map->current || limit != map->length || size < map->filesize);
#if defined(_WIN32) || defined(_WIN64)
if (limit != map->length) {
int rc = mdbx_munmap(map);
if (rc == MDBX_SUCCESS)
rc = mdbx_mmap(flags, map, atleast, limit);
return rc;
}
if (atleast > map->current) {
/* growth */
LARGE_INTEGER new_size;
new_size.QuadPart = atleast;
NTSTATUS rc = NtExtendSection(map->section, &new_size);
map->current = atleast;
if (!NT_SUCCESS(rc))
if (!(flags & MDBX_RDONLY) && limit == map->length && size > map->current) {
/* growth rw-section */
LARGE_INTEGER growth;
growth.QuadPart = size;
NTSTATUS rc = NtExtendSection(map->section, &growth);
if (NT_SUCCESS(rc))
map->filesize = map->current = size;
return ntstatus2errcode(rc);
}
if (atleast < map->current) {
/* Windows unable shrinking a mapped file */
return MDBX_RESULT_TRUE;
/* Windows unable:
* - shrinking a mapped file;
* - change size of mapped view;
* - extend read-only mapping;
* Therefore we should unmap/map entire section. */
NTSTATUS rc = NtUnmapViewOfSection(GetCurrentProcess(), map->address);
if (!NT_SUCCESS(rc))
return ntstatus2errcode(rc);
rc = NtClose(map->section);
map->current = map->length = 0;
map->section = NULL;
if (!NT_SUCCESS(rc))
return ntstatus2errcode(rc);
rc = mdbx_filesize(map->fd, &map->filesize);
if (rc != MDBX_SUCCESS)
return rc;
if ((flags & MDBX_RDONLY) == 0 && map->filesize != size) {
rc = mdbx_ftruncate(map->fd, size);
if (rc == MDBX_SUCCESS)
map->filesize = size;
/* ignore error, because Windows unable shrink file
* that already mapped (by another process) */;
}
return MDBX_SUCCESS;
LARGE_INTEGER SectionSize;
SectionSize.QuadPart = size;
rc = NtCreateSection(
&map->section,
/* DesiredAccess */ (flags & MDBX_WRITEMAP)
? SECTION_QUERY | SECTION_MAP_READ | SECTION_EXTEND_SIZE |
SECTION_MAP_WRITE
: SECTION_QUERY | SECTION_MAP_READ | SECTION_EXTEND_SIZE,
/* ObjectAttributes */ NULL,
/* MaximumSize (InitialSize) */ &SectionSize,
/* SectionPageProtection */ (flags & MDBX_RDONLY) ? PAGE_READONLY
: PAGE_READWRITE,
/* AllocationAttributes */ SEC_RESERVE, map->fd);
if (!NT_SUCCESS(rc))
return ntstatus2errcode(rc);
retry:;
SIZE_T ViewSize = (flags & MDBX_RDONLY) ? size : limit;
rc = NtMapViewOfSection(
map->section, GetCurrentProcess(), &map->address,
/* ZeroBits */ 0,
/* CommitSize */ 0,
/* SectionOffset */ NULL, &ViewSize,
/* InheritDisposition */ ViewUnmap,
/* AllocationType */ (flags & MDBX_RDONLY) ? 0 : MEM_RESERVE,
/* Win32Protect */ (flags & MDBX_WRITEMAP) ? PAGE_READWRITE
: PAGE_READONLY);
if (!NT_SUCCESS(rc)) {
if (map->address) {
map->address = NULL;
goto retry;
}
NtClose(map->section);
map->section = 0;
return ntstatus2errcode(rc);
}
assert(map->address != MAP_FAILED);
map->current = (size_t)SectionSize.QuadPart;
map->length = ViewSize;
#else
(void)flags;
if (limit != map->length) {
void *ptr = mremap(map->address, map->length, limit, MREMAP_MAYMOVE);
if (ptr == MAP_FAILED)
@ -970,8 +1025,10 @@ int mdbx_mresize(int flags, mdbx_mmap_t *map, size_t atleast, size_t limit) {
map->address = ptr;
map->length = limit;
}
return mdbx_ftruncate(map->fd, atleast);
if ((flags & MDBX_RDONLY) == 0)
return mdbx_ftruncate(map->fd, size);
#endif
return MDBX_SUCCESS;
}
/*----------------------------------------------------------------------------*/

View File

@ -65,6 +65,7 @@
/* Systems includes */
#if defined(_WIN32) || defined(_WIN64)
#include <tlhelp32.h>
#include <windows.h>
#include <winnt.h>
#define HAVE_SYS_STAT_H
@ -455,6 +456,7 @@ typedef struct mdbx_mmap_param {
size_t length; /* mapping length, but NOT a size of file or DB */
#if defined(_WIN32) || defined(_WIN64)
size_t current; /* mapped region size, e.g. file and DB */
uint64_t filesize;
#endif
#ifdef MDBX_OSAL_SECTION
MDBX_OSAL_SECTION section;
@ -464,6 +466,15 @@ typedef struct mdbx_mmap_param {
int mdbx_mmap(int flags, mdbx_mmap_t *map, size_t must, size_t limit);
int mdbx_munmap(mdbx_mmap_t *map);
int mdbx_mresize(int flags, mdbx_mmap_t *map, size_t current, size_t wanna);
#if defined(_WIN32) || defined(_WIN64)
typedef struct {
unsigned limit, count;
HANDLE handles[31];
} mdbx_handle_array_t;
int mdbx_suspend_threads_before_remap(MDBX_env *env,
mdbx_handle_array_t **array);
int mdbx_resume_threads_after_remap(mdbx_handle_array_t *array);
#endif /* Windows */
int mdbx_msync(mdbx_mmap_t *map, size_t offset, size_t length, int async);
static __inline mdbx_pid_t mdbx_getpid(void) {