mdbx: refine mdbx_env_copy() internals.

Change-Id: I9e8f0dc87398564524a5ec98eda2cb9bde100909
This commit is contained in:
Leonid Yuriev 2018-11-04 18:57:15 +03:00 committed by Leo Yuriev
parent 629637d95e
commit 83f1effff1

View File

@ -11449,165 +11449,196 @@ done:
}
/* Copy environment with compaction. */
static int __cold mdbx_env_compact(MDBX_env *env, mdbx_filehandle_t fd) {
MDBX_txn *txn = NULL;
mdbx_thread_t thr;
mdbx_copy ctx;
memset(&ctx, 0, sizeof(ctx));
int rc = mdbx_condmutex_init(&ctx.mc_condmutex);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
const size_t buffer_size = pgno2bytes(env, NUM_METAS) + MDBX_WBUF * 2;
uint8_t *buffer = NULL;
rc = mdbx_memalign_alloc(env->me_os_psize, buffer_size, (void **)&buffer);
if (unlikely(rc != MDBX_SUCCESS))
goto done;
ctx.mc_wbuf[0] = buffer + pgno2bytes(env, NUM_METAS);
memset(ctx.mc_wbuf[0], 0, MDBX_WBUF * 2);
ctx.mc_wbuf[1] = ctx.mc_wbuf[0] + MDBX_WBUF;
ctx.mc_next_pgno = NUM_METAS;
ctx.mc_env = env;
ctx.mc_fd = fd;
rc = mdbx_thread_create(&thr, mdbx_env_copythr, &ctx);
if (unlikely(rc != MDBX_SUCCESS))
goto done;
rc = mdbx_txn_begin(env, NULL, MDBX_RDONLY, &txn);
if (unlikely(rc != MDBX_SUCCESS))
goto finish;
static int __cold mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
mdbx_filehandle_t fd, uint8_t *buffer) {
MDBX_page *const meta = mdbx_init_metas(env, buffer);
/* copy canary sequenses if present */
if (read_txn->mt_canary.v) {
meta->mp_meta.mm_canary = read_txn->mt_canary;
meta->mp_meta.mm_canary.v = mdbx_meta_txnid_stable(env, &meta->mp_meta);
}
/* Set metapage 1 with current main DB */
pgno_t new_root, root = txn->mt_dbs[MAIN_DBI].md_root;
if ((new_root = root) != P_INVALID) {
pgno_t new_root, root = read_txn->mt_dbs[MAIN_DBI].md_root;
if ((new_root = root) == P_INVALID) {
/* When the DB is empty, handle it specially to
* fix any breakage like page leaks from ITS#8174. */
meta->mp_meta.mm_dbs[MAIN_DBI].md_flags =
read_txn->mt_dbs[MAIN_DBI].md_flags;
} else {
/* Count free pages + freeDB pages. Subtract from last_pg
* to find the new last_pg, which also becomes the new root. */
pgno_t freecount = 0;
MDBX_cursor mc;
MDBX_val key, data;
rc = mdbx_cursor_init(&mc, txn, FREE_DBI);
int rc = mdbx_cursor_init(&mc, read_txn, FREE_DBI);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
while ((rc = mdbx_cursor_get(&mc, &key, &data, MDBX_NEXT)) == 0)
freecount += *(pgno_t *)data.iov_base;
if (unlikely(rc != MDBX_NOTFOUND))
goto finish;
return rc;
freecount += txn->mt_dbs[FREE_DBI].md_branch_pages +
txn->mt_dbs[FREE_DBI].md_leaf_pages +
txn->mt_dbs[FREE_DBI].md_overflow_pages;
freecount += read_txn->mt_dbs[FREE_DBI].md_branch_pages +
read_txn->mt_dbs[FREE_DBI].md_leaf_pages +
read_txn->mt_dbs[FREE_DBI].md_overflow_pages;
new_root = txn->mt_next_pgno - 1 - freecount;
meta->mp_meta.mm_geo.next = meta->mp_meta.mm_geo.now = new_root + 1;
meta->mp_meta.mm_dbs[MAIN_DBI] = txn->mt_dbs[MAIN_DBI];
new_root = read_txn->mt_next_pgno - 1 - freecount;
meta->mp_meta.mm_geo.next = new_root + 1;
meta->mp_meta.mm_dbs[MAIN_DBI] = read_txn->mt_dbs[MAIN_DBI];
meta->mp_meta.mm_dbs[MAIN_DBI].md_root = new_root;
} else {
/* When the DB is empty, handle it specially to
* fix any breakage like page leaks from ITS#8174. */
meta->mp_meta.mm_dbs[MAIN_DBI].md_flags = txn->mt_dbs[MAIN_DBI].md_flags;
}
/* copy canary sequenses if present */
if (txn->mt_canary.v) {
meta->mp_meta.mm_canary = txn->mt_canary;
meta->mp_meta.mm_canary.v = mdbx_meta_txnid_stable(env, &meta->mp_meta);
}
mdbx_copy ctx;
memset(&ctx, 0, sizeof(ctx));
rc = mdbx_condmutex_init(&ctx.mc_condmutex);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
/* update signature */
meta->mp_meta.mm_datasync_sign = mdbx_meta_sign(&meta->mp_meta);
memcpy(ctx.mc_wbuf[0], buffer, ctx.mc_wlen[0] = pgno2bytes(env, NUM_METAS));
ctx.mc_wbuf[0] = buffer + pgno2bytes(env, NUM_METAS);
memset(ctx.mc_wbuf[0], 0, MDBX_WBUF * 2);
ctx.mc_wbuf[1] = ctx.mc_wbuf[0] + MDBX_WBUF;
ctx.mc_next_pgno = NUM_METAS;
ctx.mc_env = env;
ctx.mc_fd = fd;
ctx.mc_txn = read_txn;
mdbx_thread_t thread;
int thread_err = mdbx_thread_create(&thread, mdbx_env_copythr, &ctx);
if (likely(thread_err == MDBX_SUCCESS)) {
rc = mdbx_env_cwalk(&ctx, &root, 0);
mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF);
thread_err = mdbx_thread_join(thread);
mdbx_condmutex_destroy(&ctx.mc_condmutex);
}
if (unlikely(thread_err != MDBX_SUCCESS))
return thread_err;
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(ctx.mc_error != MDBX_SUCCESS))
return ctx.mc_error;
ctx.mc_txn = txn;
rc = mdbx_env_cwalk(&ctx, &root, 0);
if (rc == MDBX_SUCCESS && root != new_root) {
if (root > new_root) {
mdbx_error("post-compactification root %" PRIaPGNO
" GT expected %" PRIaPGNO " (source DB corrupted)",
root, new_root);
rc = MDBX_CORRUPTED; /* page leak or corrupt DB */
} else {
mdbx_error("post-compactification root %" PRIaPGNO
" LT expected %" PRIaPGNO " (page leak(s) in source DB)",
root, new_root);
/* fixup and rewrite metas */
return MDBX_CORRUPTED; /* page leak or corrupt DB */
}
if (root < new_root) {
mdbx_notice("post-compactification root %" PRIaPGNO
" LT expected %" PRIaPGNO " (page leak(s) in source DB)",
root, new_root);
/* fixup meta */
meta->mp_meta.mm_dbs[MAIN_DBI].md_root = root;
meta->mp_meta.mm_geo.next = meta->mp_meta.mm_geo.now = root + 1;
meta->mp_meta.mm_datasync_sign = mdbx_meta_sign(&meta->mp_meta);
rc = mdbx_pwrite(fd, buffer, pgno2bytes(env, NUM_METAS), 0);
meta->mp_meta.mm_geo.next = root + 1;
}
}
finish:
if (rc != MDBX_SUCCESS)
ctx.mc_error = rc;
mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF);
rc = mdbx_thread_join(thr);
mdbx_txn_abort(txn);
done:
mdbx_memalign_free(buffer);
mdbx_condmutex_destroy(&ctx.mc_condmutex);
return rc ? rc : ctx.mc_error;
/* update signature */
meta->mp_meta.mm_datasync_sign = mdbx_meta_sign(&meta->mp_meta);
return MDBX_SUCCESS;
}
/* Copy environment as-is. */
static int __cold mdbx_env_copy_asis(MDBX_env *env, mdbx_filehandle_t fd) {
MDBX_txn *txn = NULL;
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers. */
int rc = mdbx_txn_begin(env, NULL, MDBX_RDONLY, &txn);
static int __cold mdbx_env_copy_asis(MDBX_env *env, MDBX_txn *read_txn,
mdbx_filehandle_t fd, uint8_t *buffer) {
/* We must start the actual read txn after blocking writers */
int rc = mdbx_txn_end(read_txn, MDBX_END_RESET_TMP);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
/* We must start the actual read txn after blocking writers */
rc = mdbx_txn_end(txn, MDBX_END_RESET_TMP);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout; /* FIXME: or just return? */
/* Temporarily block writers until we snapshot the meta pages */
rc = mdbx_txn_lock(env, false);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
return rc;
rc = mdbx_txn_renew0(txn, MDBX_RDONLY);
rc = mdbx_txn_renew0(read_txn, MDBX_RDONLY);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_txn_unlock(env);
goto bailout;
return rc;
}
rc = mdbx_write(fd, env->me_map, pgno2bytes(env, NUM_METAS));
MDBX_meta *const head = mdbx_meta_head(env);
/* Make a snapshot of meta-pages,
* but writing ones after the data was flushed */
memcpy(buffer, env->me_map, pgno2bytes(env, NUM_METAS));
MDBX_meta *const headcopy = /* LY: get pointer to the spanshot copy */
(MDBX_meta *)(buffer + ((uint8_t *)mdbx_meta_head(env) - env->me_map));
const uint64_t size =
mdbx_roundup2(pgno2bytes(env, head->mm_geo.now), env->me_os_psize);
mdbx_roundup2(pgno2bytes(env, headcopy->mm_geo.now), env->me_os_psize);
mdbx_txn_unlock(env);
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_write(fd, env->me_map + pgno2bytes(env, NUM_METAS),
pgno2bytes(env, txn->mt_next_pgno - NUM_METAS));
/* Update signature to steady */
headcopy->mm_datasync_sign = mdbx_meta_sign(headcopy);
/* Copy the data */
rc = mdbx_pwrite(fd, env->me_map + pgno2bytes(env, NUM_METAS),
pgno2bytes(env, read_txn->mt_next_pgno - NUM_METAS),
pgno2bytes(env, NUM_METAS));
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_ftruncate(fd, size);
bailout:
mdbx_txn_abort(txn);
return rc;
}
int __cold mdbx_env_copy2fd(MDBX_env *env, mdbx_filehandle_t fd,
unsigned flags) {
if (flags & MDBX_CP_COMPACT)
return mdbx_env_compact(env, fd);
if (unlikely(!env))
return MDBX_EINVAL;
return mdbx_env_copy_asis(env, fd);
if (unlikely(env->me_signature != MDBX_ME_SIGNATURE))
return MDBX_EBADSIGN;
int rc = mdbx_fseek(fd, 0);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
const size_t buffer_size = pgno2bytes(env, NUM_METAS) +
((flags & MDBX_CP_COMPACT) ? MDBX_WBUF * 2 : 0);
uint8_t *buffer = NULL;
rc = mdbx_memalign_alloc(env->me_os_psize, buffer_size, (void **)&buffer);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
MDBX_txn *read_txn = NULL;
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers. */
rc = mdbx_txn_begin(env, NULL, MDBX_RDONLY, &read_txn);
if (unlikely(rc != MDBX_SUCCESS)) {
mdbx_memalign_free(buffer);
return rc;
}
/* Firstly write a stub to meta-pages.
* Now we sure to incomplete copy will not be used. */
memset(buffer, -1, pgno2bytes(env, NUM_METAS));
rc = mdbx_write(fd, buffer, pgno2bytes(env, NUM_METAS));
if (likely(rc == MDBX_SUCCESS)) {
memset(buffer, 0, pgno2bytes(env, NUM_METAS));
rc = (flags & MDBX_CP_COMPACT)
? mdbx_env_compact(env, read_txn, fd, buffer)
: mdbx_env_copy_asis(env, read_txn, fd, buffer);
}
mdbx_txn_abort(read_txn);
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_filesync(fd, true);
/* Write actual meta */
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_pwrite(fd, buffer, pgno2bytes(env, NUM_METAS), 0);
mdbx_memalign_free(buffer);
return rc;
}
int __cold mdbx_env_copy(MDBX_env *env, const char *dest_path, unsigned flags) {
if (unlikely(!env || !dest_path))
return MDBX_EINVAL;
if (unlikely(env->me_signature != MDBX_ME_SIGNATURE))
return MDBX_EBADSIGN;
char *dxb_pathname;
mdbx_filehandle_t newfd = INVALID_HANDLE_VALUE;