mdbx: add to env_copy() support for pipe/socket.

Change-Id: Ib2fc0249b494b885f28265f877de9953f089b403
This commit is contained in:
Leonid Yuriev 2019-09-16 14:16:14 +03:00
parent b19e180fab
commit 0d4092f4ea
4 changed files with 265 additions and 98 deletions

View File

@ -11743,6 +11743,8 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, MDBX_val *key, MDBX_val *data,
return rc;
}
/**** COPYING *****************************************************************/
#ifndef MDBX_WBUF
#define MDBX_WBUF ((size_t)1024 * 1024)
#endif
@ -11772,7 +11774,6 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
uint8_t *ptr;
int toggle = 0;
int rc;
size_t offset = pgno2bytes(my->mc_env, NUM_METAS);
mdbx_condmutex_lock(&my->mc_condmutex);
while (!my->mc_error) {
@ -11784,12 +11785,11 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
ptr = my->mc_wbuf[toggle];
again:
if (wsize > 0 && !my->mc_error) {
rc = mdbx_pwrite(my->mc_fd, ptr, wsize, offset);
rc = mdbx_write(my->mc_fd, ptr, wsize);
if (rc != MDBX_SUCCESS) {
my->mc_error = rc;
break;
}
offset += wsize;
}
/* If there's an overflow page tail, write it too */
@ -11984,9 +11984,42 @@ done:
return rc;
}
static void compact_fixup_meta(MDBX_env *env, MDBX_page *meta) {
/* Calculate filesize taking in account shrink/growing thresholds */
if (meta->mp_meta.mm_geo.next > meta->mp_meta.mm_geo.now) {
const pgno_t aligned =
pgno_align2os_pgno(env, pgno_add(meta->mp_meta.mm_geo.next,
meta->mp_meta.mm_geo.grow -
meta->mp_meta.mm_geo.next %
meta->mp_meta.mm_geo.grow));
meta->mp_meta.mm_geo.now = aligned;
} else if (meta->mp_meta.mm_geo.next < meta->mp_meta.mm_geo.now) {
meta->mp_meta.mm_geo.now = meta->mp_meta.mm_geo.next;
const pgno_t aligner = meta->mp_meta.mm_geo.grow
? meta->mp_meta.mm_geo.grow
: meta->mp_meta.mm_geo.shrink;
const pgno_t aligned =
pgno_align2os_pgno(env, meta->mp_meta.mm_geo.next + aligner -
meta->mp_meta.mm_geo.next % aligner);
meta->mp_meta.mm_geo.now = aligned;
}
if (meta->mp_meta.mm_geo.now < meta->mp_meta.mm_geo.lower)
meta->mp_meta.mm_geo.now = meta->mp_meta.mm_geo.lower;
if (meta->mp_meta.mm_geo.now > meta->mp_meta.mm_geo.upper)
meta->mp_meta.mm_geo.now = meta->mp_meta.mm_geo.upper;
/* Update signature */
assert(meta->mp_meta.mm_geo.now >= meta->mp_meta.mm_geo.next);
meta->mp_meta.mm_datasync_sign = mdbx_meta_sign(&meta->mp_meta);
}
/* Copy environment with compaction. */
static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
mdbx_filehandle_t fd, uint8_t *buffer) {
mdbx_filehandle_t fd, uint8_t *buffer,
const bool dest_is_pipe) {
const size_t meta_bytes = pgno2bytes(env, NUM_METAS);
uint8_t *const data_buffer = buffer + meta_bytes;
MDBX_page *const meta = mdbx_init_metas(env, buffer);
/* copy canary sequenses if present */
if (read_txn->mt_canary.v) {
@ -12001,6 +12034,12 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
* fix any breakage like page leaks from ITS#8174. */
meta->mp_meta.mm_dbs[MAIN_DBI].md_flags =
read_txn->mt_dbs[MAIN_DBI].md_flags;
compact_fixup_meta(env, meta);
if (dest_is_pipe) {
int rc = mdbx_write(fd, buffer, meta_bytes);
if (rc != MDBX_SUCCESS)
return rc;
}
} else {
/* Count free pages + GC pages. Subtract from last_pg
* to find the new last_pg, which also becomes the new root. */
@ -12031,9 +12070,9 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
if (unlikely(rc != MDBX_SUCCESS))
return rc;
ctx.mc_wbuf[0] = buffer + pgno2bytes(env, NUM_METAS);
memset(ctx.mc_wbuf[0], 0, MDBX_WBUF * 2);
ctx.mc_wbuf[1] = ctx.mc_wbuf[0] + MDBX_WBUF;
memset(data_buffer, 0, MDBX_WBUF * 2);
ctx.mc_wbuf[0] = data_buffer;
ctx.mc_wbuf[1] = data_buffer + MDBX_WBUF;
ctx.mc_next_pgno = NUM_METAS;
ctx.mc_env = env;
ctx.mc_fd = fd;
@ -12042,6 +12081,11 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
mdbx_thread_t thread;
int thread_err = mdbx_thread_create(&thread, mdbx_env_copythr, &ctx);
if (likely(thread_err == MDBX_SUCCESS)) {
if (dest_is_pipe) {
compact_fixup_meta(env, meta);
rc = mdbx_write(fd, buffer, meta_bytes);
}
if (rc == MDBX_SUCCESS)
rc = mdbx_env_cwalk(&ctx, &root, 0);
mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF);
thread_err = mdbx_thread_join(thread);
@ -12054,6 +12098,15 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
if (unlikely(ctx.mc_error != MDBX_SUCCESS))
return ctx.mc_error;
if (dest_is_pipe) {
if (root != new_root) {
mdbx_error("post-compactification root %" PRIaPGNO
" NE expected %" PRIaPGNO
" (source DB corrupted or has a page leak(s))",
root, new_root);
return MDBX_CORRUPTED; /* page leak or corrupt DB */
}
} else {
if (root > new_root) {
mdbx_error("post-compactification root %" PRIaPGNO
" GT expected %" PRIaPGNO " (source DB corrupted)",
@ -12068,45 +12121,35 @@ static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
meta->mp_meta.mm_dbs[MAIN_DBI].md_root = root;
meta->mp_meta.mm_geo.next = root + 1;
}
compact_fixup_meta(env, meta);
}
/* Calculate filesize taking in account shrink/growing thresholds */
if (meta->mp_meta.mm_geo.next > meta->mp_meta.mm_geo.now) {
const pgno_t aligned =
pgno_align2os_pgno(env, pgno_add(meta->mp_meta.mm_geo.next,
meta->mp_meta.mm_geo.grow -
meta->mp_meta.mm_geo.next %
meta->mp_meta.mm_geo.grow));
meta->mp_meta.mm_geo.now = aligned;
} else if (meta->mp_meta.mm_geo.next < meta->mp_meta.mm_geo.now) {
meta->mp_meta.mm_geo.now = meta->mp_meta.mm_geo.next;
const pgno_t aligner = meta->mp_meta.mm_geo.grow
? meta->mp_meta.mm_geo.grow
: meta->mp_meta.mm_geo.shrink;
const pgno_t aligned =
pgno_align2os_pgno(env, meta->mp_meta.mm_geo.next + aligner -
meta->mp_meta.mm_geo.next % aligner);
meta->mp_meta.mm_geo.now = aligned;
}
if (meta->mp_meta.mm_geo.now < meta->mp_meta.mm_geo.lower)
meta->mp_meta.mm_geo.now = meta->mp_meta.mm_geo.lower;
if (meta->mp_meta.mm_geo.now > meta->mp_meta.mm_geo.upper)
meta->mp_meta.mm_geo.now = meta->mp_meta.mm_geo.upper;
/* Update signature */
assert(meta->mp_meta.mm_geo.now >= meta->mp_meta.mm_geo.next);
meta->mp_meta.mm_datasync_sign = mdbx_meta_sign(&meta->mp_meta);
/* Extend file if required */
return (meta->mp_meta.mm_geo.now != meta->mp_meta.mm_geo.next)
? mdbx_ftruncate(fd, pgno2bytes(env, meta->mp_meta.mm_geo.now))
: MDBX_SUCCESS;
if (meta->mp_meta.mm_geo.now != meta->mp_meta.mm_geo.next) {
const size_t whole_size = pgno2bytes(env, meta->mp_meta.mm_geo.now);
if (!dest_is_pipe)
return mdbx_ftruncate(fd, whole_size);
const size_t used_size = pgno2bytes(env, meta->mp_meta.mm_geo.next);
memset(data_buffer, 0, MDBX_WBUF);
for (size_t offset = used_size; offset < whole_size;) {
const size_t chunk =
(MDBX_WBUF < whole_size - offset) ? MDBX_WBUF : whole_size - offset;
/* copy to avoit EFAULT in case swapped-out */
int rc = mdbx_write(fd, data_buffer, chunk);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
offset += chunk;
}
}
return MDBX_SUCCESS;
}
/* Copy environment as-is. */
static int __cold mdbx_env_copy_asis(MDBX_env *env, MDBX_txn *read_txn,
mdbx_filehandle_t fd, uint8_t *buffer) {
mdbx_filehandle_t fd, uint8_t *buffer,
const bool dest_is_pipe) {
/* We must start the actual read txn after blocking writers */
int rc = mdbx_txn_end(read_txn, MDBX_END_RESET_TMP);
if (unlikely(rc != MDBX_SUCCESS))
@ -12135,35 +12178,67 @@ static int __cold mdbx_env_copy_asis(MDBX_env *env, MDBX_txn *read_txn,
mdbx_txn_unlock(env);
/* Copy the data */
const uint64_t whole_size =
const size_t whole_size =
mdbx_roundup2(pgno2bytes(env, read_txn->mt_end_pgno), env->me_os_psize);
const size_t used_size = pgno2bytes(env, read_txn->mt_next_pgno);
mdbx_jitter4testing(false);
if (dest_is_pipe)
rc = mdbx_write(fd, buffer, meta_bytes);
uint8_t *const data_buffer = buffer + meta_bytes;
for (size_t offset = meta_bytes; rc == MDBX_SUCCESS && offset < used_size;) {
if (dest_is_pipe) {
#if defined(__linux__) || defined(__gnu_linux__)
off_t in_offset = offset;
const intptr_t written =
sendfile(fd, env->me_fd, &in_offset, used_size - offset);
if (unlikely(written <= 0)) {
rc = written ? errno : MDBX_ENODATA;
break;
}
offset = in_offset;
continue;
#endif
} else {
#if __GLIBC_PREREQ(2, 27) && defined(_GNU_SOURCE)
for (off_t in_offset = meta_bytes; in_offset < (off_t)used_size;) {
off_t out_offset = in_offset;
off_t in_offset = offset, out_offset = offset;
ssize_t bytes_copied = copy_file_range(
env->me_fd, &in_offset, fd, &out_offset, used_size - in_offset, 0);
env->me_fd, &in_offset, fd, &out_offset, used_size - offset, 0);
if (unlikely(bytes_copied <= 0)) {
rc = bytes_copied ? errno : MDBX_ENODATA;
break;
}
offset = in_offset;
continue;
#endif
}
#else
uint8_t *data_buffer = buffer + meta_bytes;
for (size_t offset = meta_bytes; offset < used_size;) {
/* fallback to portable */
const size_t chunk =
(MDBX_WBUF < used_size - offset) ? MDBX_WBUF : used_size - offset;
/* copy to avoit EFAULT in case swapped-out */
memcpy(data_buffer, env->me_map + offset, chunk);
rc = mdbx_pwrite(fd, data_buffer, chunk, offset);
if (unlikely(rc != MDBX_SUCCESS))
break;
rc = mdbx_write(fd, data_buffer, chunk);
offset += chunk;
}
#endif
if (likely(rc == MDBX_SUCCESS) && whole_size != used_size)
/* Extend file if required */
if (likely(rc == MDBX_SUCCESS) && whole_size != used_size) {
if (!dest_is_pipe)
rc = mdbx_ftruncate(fd, whole_size);
else {
memset(data_buffer, 0, MDBX_WBUF);
for (size_t offset = used_size;
rc == MDBX_SUCCESS && offset < whole_size;) {
const size_t chunk =
(MDBX_WBUF < whole_size - offset) ? MDBX_WBUF : whole_size - offset;
/* copy to avoit EFAULT in case swapped-out */
rc = mdbx_write(fd, data_buffer, chunk);
offset += chunk;
}
}
}
return rc;
}
@ -12176,16 +12251,22 @@ int __cold mdbx_env_copy2fd(MDBX_env *env, mdbx_filehandle_t fd,
if (unlikely(env->me_signature != MDBX_ME_SIGNATURE))
return MDBX_EBADSIGN;
const int dest_is_pipe = mdbx_is_pipe(fd);
if (MDBX_IS_ERROR(dest_is_pipe))
return dest_is_pipe;
if (!dest_is_pipe) {
int rc = mdbx_fseek(fd, 0);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}
const size_t buffer_size =
pgno2bytes(env, NUM_METAS) +
((flags & MDBX_CP_COMPACT) ? MDBX_WBUF * 2 : MDBX_WBUF);
uint8_t *buffer = NULL;
rc = mdbx_memalign_alloc(env->me_os_psize, buffer_size, (void **)&buffer);
int rc = mdbx_memalign_alloc(env->me_os_psize, buffer_size, (void **)&buffer);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
@ -12198,20 +12279,24 @@ int __cold mdbx_env_copy2fd(MDBX_env *env, mdbx_filehandle_t fd,
return rc;
}
if (!dest_is_pipe) {
/* Firstly write a stub to meta-pages.
* Now we sure to incomplete copy will not be used. */
memset(buffer, -1, pgno2bytes(env, NUM_METAS));
rc = mdbx_pwrite(fd, buffer, pgno2bytes(env, NUM_METAS), 0);
rc = mdbx_write(fd, buffer, pgno2bytes(env, NUM_METAS));
}
if (likely(rc == MDBX_SUCCESS)) {
memset(buffer, 0, pgno2bytes(env, NUM_METAS));
rc = (flags & MDBX_CP_COMPACT)
? mdbx_env_compact(env, read_txn, fd, buffer)
: mdbx_env_copy_asis(env, read_txn, fd, buffer);
? mdbx_env_compact(env, read_txn, fd, buffer, dest_is_pipe)
: mdbx_env_copy_asis(env, read_txn, fd, buffer, dest_is_pipe);
}
mdbx_txn_abort(read_txn);
if (!dest_is_pipe) {
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_filesync(fd, MDBX_SYNC_DATA | MDBX_SYNC_SIZE);
rc = mdbx_filesync(fd, MDBX_SYNC_DATA | MDBX_SYNC_SIZE | MDBX_SYNC_IODQ);
/* Write actual meta */
if (likely(rc == MDBX_SUCCESS))
@ -12219,6 +12304,7 @@ int __cold mdbx_env_copy2fd(MDBX_env *env, mdbx_filehandle_t fd,
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_filesync(fd, MDBX_SYNC_DATA | MDBX_SYNC_IODQ);
}
mdbx_memalign_free(buffer);
return rc;
@ -12277,6 +12363,8 @@ int __cold mdbx_env_copy(MDBX_env *env, const char *dest_path, unsigned flags) {
return rc;
}
/******************************************************************************/
int __cold mdbx_env_set_flags(MDBX_env *env, unsigned flags, int onoff) {
if (unlikely(!env))
return MDBX_EINVAL;

View File

@ -636,8 +636,8 @@ MDBX_INTERNAL_FUNC int mdbx_pwrite(mdbx_filehandle_t fd, const void *buf,
ov.OffsetHigh = HIGH_DWORD(offset);
DWORD written;
if (unlikely(!WriteFile(fd, buf,
(bytes <= MAX_WRITE) ? (DWORD)bytes : MAX_WRITE,
if (unlikely(!WriteFile(
fd, buf, likely(bytes <= MAX_WRITE) ? (DWORD)bytes : MAX_WRITE,
&written, &ov)))
return GetLastError();
if (likely(bytes == written))
@ -646,7 +646,7 @@ MDBX_INTERNAL_FUNC int mdbx_pwrite(mdbx_filehandle_t fd, const void *buf,
STATIC_ASSERT_MSG(sizeof(off_t) >= sizeof(size_t),
"libmdbx requires 64-bit file I/O on 64-bit systems");
const intptr_t written =
pwrite(fd, buf, (bytes <= MAX_WRITE) ? bytes : MAX_WRITE, offset);
pwrite(fd, buf, likely(bytes <= MAX_WRITE) ? bytes : MAX_WRITE, offset);
if (likely(bytes == (size_t)written))
return MDBX_SUCCESS;
if (written < 0) {
@ -662,6 +662,36 @@ MDBX_INTERNAL_FUNC int mdbx_pwrite(mdbx_filehandle_t fd, const void *buf,
}
}
MDBX_INTERNAL_FUNC int mdbx_write(mdbx_filehandle_t fd, const void *buf,
size_t bytes) {
while (true) {
#if defined(_WIN32) || defined(_WIN64)
DWORD written;
if (unlikely(!WriteFile(
fd, buf, likely(bytes <= MAX_WRITE) ? (DWORD)bytes : MAX_WRITE,
&written, nullptr)))
return GetLastError();
if (likely(bytes == written))
return MDBX_SUCCESS;
#else
STATIC_ASSERT_MSG(sizeof(off_t) >= sizeof(size_t),
"libmdbx requires 64-bit file I/O on 64-bit systems");
const intptr_t written =
write(fd, buf, likely(bytes <= MAX_WRITE) ? bytes : MAX_WRITE);
if (likely(bytes == (size_t)written))
return MDBX_SUCCESS;
if (written < 0) {
const int rc = errno;
if (rc != EINTR)
return rc;
continue;
}
#endif
bytes -= written;
buf = (char *)buf + written;
}
}
int mdbx_pwritev(mdbx_filehandle_t fd, struct iovec *iov, int iovcnt,
uint64_t offset, size_t expected_written) {
#if defined(_WIN32) || defined(_WIN64) || defined(__APPLE__)
@ -750,6 +780,37 @@ int mdbx_filesize(mdbx_filehandle_t fd, uint64_t *length) {
return MDBX_SUCCESS;
}
MDBX_INTERNAL_FUNC int mdbx_is_pipe(mdbx_filehandle_t fd) {
#if defined(_WIN32) || defined(_WIN64)
switch (GetFileType(fd)) {
case FILE_TYPE_DISK:
return MDBX_RESULT_FALSE;
case FILE_TYPE_CHAR:
case FILE_TYPE_PIPE:
return MDBX_RESULT_TRUE;
default:
return GetLastError();
}
#else
struct stat info;
if (fstat(fd, &info))
return errno;
switch (info.st_mode & S_IFMT) {
case S_IFBLK:
case S_IFREG:
return MDBX_RESULT_FALSE;
case S_IFCHR:
case S_IFIFO:
case S_IFSOCK:
return MDBX_RESULT_TRUE;
case S_IFDIR:
case S_IFLNK:
default:
return MDBX_INCOMPATIBLE;
}
#endif
}
MDBX_INTERNAL_FUNC int mdbx_ftruncate(mdbx_filehandle_t fd, uint64_t length) {
#if defined(_WIN32) || defined(_WIN64)
if (mdbx_SetFileInformationByHandle) {

View File

@ -85,6 +85,10 @@
#endif
#endif /* !xBSD */
#if defined(__linux__) || defined(__gnu_linux__)
#include <sys/sendfile.h>
#endif /* Linux */
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 0
#endif
@ -494,7 +498,11 @@ MDBX_INTERNAL_FUNC int mdbx_vasprintf(char **strp, const char *fmt, va_list ap);
/* OS abstraction layer stuff */
/* max bytes to write in one call */
#if defined(_WIN32) || defined(_WIN64)
#define MAX_WRITE UINT32_C(0x01000000)
#else
#define MAX_WRITE UINT32_C(0x3fff0000)
#endif
#if defined(__linux__) || defined(__gnu_linux__)
MDBX_INTERNAL_VAR uint32_t mdbx_linux_kernel_version;
@ -553,6 +561,8 @@ MDBX_INTERNAL_FUNC int mdbx_pread(mdbx_filehandle_t fd, void *buf, size_t count,
uint64_t offset);
MDBX_INTERNAL_FUNC int mdbx_pwrite(mdbx_filehandle_t fd, const void *buf,
size_t count, uint64_t offset);
MDBX_INTERNAL_FUNC int mdbx_write(mdbx_filehandle_t fd, const void *buf,
size_t count);
MDBX_INTERNAL_FUNC int
mdbx_thread_create(mdbx_thread_t *thread,
@ -576,6 +586,7 @@ MDBX_INTERNAL_FUNC int mdbx_openfile(const char *pathname, int flags,
bool exclusive);
MDBX_INTERNAL_FUNC int mdbx_closefile(mdbx_filehandle_t fd);
MDBX_INTERNAL_FUNC int mdbx_removefile(const char *pathname);
MDBX_INTERNAL_FUNC int mdbx_is_pipe(mdbx_filehandle_t fd);
typedef struct mdbx_mmap_param {
union {

View File

@ -49,12 +49,15 @@ int main(int argc, char *argv[]) {
const char *progname = argv[0], *act;
unsigned flags = MDBX_RDONLY;
unsigned cpflags = 0;
bool quiet = false;
for (; argc > 1 && argv[1][0] == '-'; argc--, argv++) {
if (argv[1][1] == 'n' && argv[1][2] == '\0')
flags |= MDBX_NOSUBDIR;
else if (argv[1][1] == 'c' && argv[1][2] == '\0')
cpflags |= MDBX_CP_COMPACT;
else if (argv[1][1] == 'q' && argv[1][2] == '\0')
quiet = true;
else if (argv[1][1] == 'V' && argv[1][2] == '\0') {
printf("mdbx_copy version %d.%d.%d.%d\n"
" - source: %s %s, commit %s, tree %s\n"
@ -74,7 +77,8 @@ int main(int argc, char *argv[]) {
}
if (argc < 2 || argc > 3) {
fprintf(stderr, "usage: %s [-V] [-c] [-n] srcpath [dstpath]\n", progname);
fprintf(stderr, "usage: %s [-V] [-q] [-c] [-n] srcpath [dstpath]\n",
progname);
exit(EXIT_FAILURE);
}
@ -91,10 +95,13 @@ int main(int argc, char *argv[]) {
signal(SIGTERM, signal_handler);
#endif /* !WINDOWS */
printf("mdbx_copy %s (%s, T-%s)\nRunning for copy %s to %s...\n",
if (!quiet) {
fprintf((argc == 2) ? stderr : stdout,
"mdbx_copy %s (%s, T-%s)\nRunning for copy %s to %s...\n",
mdbx_version.git.describe, mdbx_version.git.datetime,
mdbx_version.git.tree, argv[1], (argc == 2) ? "stdout" : argv[2]);
fflush(NULL);
}
act = "opening environment";
rc = mdbx_env_create(&env);