mirror of
https://github.com/isar/libmdbx.git
synced 2025-01-21 18:18:21 +08:00
mdbx: rework copy-with-compactification.
Кроме небольшого рефакторинга здесь реализуется более регулярный способ обхода дерева при копировании с компактификаций. В частности, полная инициализация курсоров позволяет выполнять больше проверок/контроля структуры БД и избавиться от флажка CC_COPYING. Beside a small refactoring, a more regular way of traversing the tree when copying with compactification is implemented here. In particular, full initialization of cursors allows to perform more checks/control of the DB structure and get rid of the CC_COPYING flag.
This commit is contained in:
parent
2d300d807b
commit
1740f8227a
406
src/core.c
406
src/core.c
@ -19373,7 +19373,7 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
|
||||
/**** COPYING *****************************************************************/
|
||||
|
||||
/* State needed for a double-buffering compacting copy. */
|
||||
typedef struct mdbx_copy {
|
||||
typedef struct mdbx_compacting_ctx {
|
||||
MDBX_env *mc_env;
|
||||
MDBX_txn *mc_txn;
|
||||
mdbx_condpair_t mc_condpair;
|
||||
@ -19388,39 +19388,39 @@ typedef struct mdbx_copy {
|
||||
pgno_t mc_next_pgno;
|
||||
volatile unsigned mc_head;
|
||||
volatile unsigned mc_tail;
|
||||
} mdbx_copy;
|
||||
} mdbx_compacting_ctx;
|
||||
|
||||
/* Dedicated writer thread for compacting copy. */
|
||||
__cold static THREAD_RESULT THREAD_CALL mdbx_env_copythr(void *arg) {
|
||||
mdbx_copy *my = arg;
|
||||
__cold static THREAD_RESULT THREAD_CALL compacting_write_thread(void *arg) {
|
||||
mdbx_compacting_ctx *const ctx = arg;
|
||||
|
||||
#if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64))
|
||||
sigset_t sigset;
|
||||
sigemptyset(&sigset);
|
||||
sigaddset(&sigset, SIGPIPE);
|
||||
my->mc_error = pthread_sigmask(SIG_BLOCK, &sigset, NULL);
|
||||
ctx->mc_error = pthread_sigmask(SIG_BLOCK, &sigset, NULL);
|
||||
#endif /* EPIPE */
|
||||
|
||||
mdbx_condpair_lock(&my->mc_condpair);
|
||||
while (!my->mc_error) {
|
||||
while (my->mc_tail == my->mc_head && !my->mc_error) {
|
||||
int err = mdbx_condpair_wait(&my->mc_condpair, true);
|
||||
mdbx_condpair_lock(&ctx->mc_condpair);
|
||||
while (!ctx->mc_error) {
|
||||
while (ctx->mc_tail == ctx->mc_head && !ctx->mc_error) {
|
||||
int err = mdbx_condpair_wait(&ctx->mc_condpair, true);
|
||||
if (err != MDBX_SUCCESS) {
|
||||
my->mc_error = err;
|
||||
ctx->mc_error = err;
|
||||
goto bailout;
|
||||
}
|
||||
}
|
||||
const unsigned toggle = my->mc_tail & 1;
|
||||
size_t wsize = my->mc_wlen[toggle];
|
||||
const unsigned toggle = ctx->mc_tail & 1;
|
||||
size_t wsize = ctx->mc_wlen[toggle];
|
||||
if (wsize == 0) {
|
||||
my->mc_tail += 1;
|
||||
ctx->mc_tail += 1;
|
||||
break /* EOF */;
|
||||
}
|
||||
my->mc_wlen[toggle] = 0;
|
||||
uint8_t *ptr = my->mc_wbuf[toggle];
|
||||
ctx->mc_wlen[toggle] = 0;
|
||||
uint8_t *ptr = ctx->mc_wbuf[toggle];
|
||||
again:
|
||||
if (!my->mc_error) {
|
||||
int err = mdbx_write(my->mc_fd, ptr, wsize);
|
||||
if (!ctx->mc_error) {
|
||||
int err = mdbx_write(ctx->mc_fd, ptr, wsize);
|
||||
if (err != MDBX_SUCCESS) {
|
||||
#if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64))
|
||||
if (err == EPIPE) {
|
||||
@ -19430,128 +19430,118 @@ __cold static THREAD_RESULT THREAD_CALL mdbx_env_copythr(void *arg) {
|
||||
sigwait(&sigset, &unused);
|
||||
}
|
||||
#endif /* EPIPE */
|
||||
my->mc_error = err;
|
||||
ctx->mc_error = err;
|
||||
goto bailout;
|
||||
}
|
||||
}
|
||||
|
||||
/* If there's an overflow page tail, write it too */
|
||||
wsize = my->mc_olen[toggle];
|
||||
wsize = ctx->mc_olen[toggle];
|
||||
if (wsize) {
|
||||
my->mc_olen[toggle] = 0;
|
||||
ptr = my->mc_over[toggle];
|
||||
ctx->mc_olen[toggle] = 0;
|
||||
ptr = ctx->mc_over[toggle];
|
||||
goto again;
|
||||
}
|
||||
my->mc_tail += 1;
|
||||
mdbx_condpair_signal(&my->mc_condpair, false);
|
||||
ctx->mc_tail += 1;
|
||||
mdbx_condpair_signal(&ctx->mc_condpair, false);
|
||||
}
|
||||
bailout:
|
||||
mdbx_condpair_unlock(&my->mc_condpair);
|
||||
mdbx_condpair_unlock(&ctx->mc_condpair);
|
||||
return (THREAD_RESULT)0;
|
||||
}
|
||||
|
||||
/* Give buffer and/or MDBX_EOF to writer thread, await unused buffer. */
|
||||
__cold static int mdbx_env_cthr_toggle(mdbx_copy *my) {
|
||||
mdbx_condpair_lock(&my->mc_condpair);
|
||||
mdbx_assert(my->mc_env, my->mc_head - my->mc_tail < 2 || my->mc_error);
|
||||
my->mc_head += 1;
|
||||
mdbx_condpair_signal(&my->mc_condpair, true);
|
||||
while (!my->mc_error &&
|
||||
my->mc_head - my->mc_tail == 2 /* both buffers in use */) {
|
||||
int err = mdbx_condpair_wait(&my->mc_condpair, false);
|
||||
__cold static int compacting_toggle_write_buffers(mdbx_compacting_ctx *ctx) {
|
||||
mdbx_condpair_lock(&ctx->mc_condpair);
|
||||
mdbx_assert(ctx->mc_env, ctx->mc_head - ctx->mc_tail < 2 || ctx->mc_error);
|
||||
ctx->mc_head += 1;
|
||||
mdbx_condpair_signal(&ctx->mc_condpair, true);
|
||||
while (!ctx->mc_error &&
|
||||
ctx->mc_head - ctx->mc_tail == 2 /* both buffers in use */) {
|
||||
int err = mdbx_condpair_wait(&ctx->mc_condpair, false);
|
||||
if (err != MDBX_SUCCESS)
|
||||
my->mc_error = err;
|
||||
ctx->mc_error = err;
|
||||
}
|
||||
mdbx_condpair_unlock(&my->mc_condpair);
|
||||
return my->mc_error;
|
||||
mdbx_condpair_unlock(&ctx->mc_condpair);
|
||||
return ctx->mc_error;
|
||||
}
|
||||
|
||||
/* Depth-first tree traversal for compacting copy. */
|
||||
__cold static int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg,
|
||||
const unsigned hive_flags) {
|
||||
MDBX_cursor_couple couple;
|
||||
MDBX_page *copy;
|
||||
__cold static int compacting_walk_sdb(mdbx_compacting_ctx *ctx, MDBX_db *sdb);
|
||||
|
||||
/* Empty DB, nothing to do */
|
||||
if (*pg == P_INVALID)
|
||||
return MDBX_SUCCESS;
|
||||
|
||||
memset(&couple, 0, sizeof(couple));
|
||||
couple.outer.mc_snum = 1;
|
||||
couple.outer.mc_txn = my->mc_txn;
|
||||
couple.outer.mc_checking = couple.inner.mx_cursor.mc_checking =
|
||||
(hive_flags & MDBX_DUPFIXED)
|
||||
? CC_PAGECHECK | CC_COPYING | CC_SKIPORD | CC_LEAF | CC_LEAF2
|
||||
: CC_PAGECHECK | CC_COPYING | CC_SKIPORD | CC_LEAF;
|
||||
|
||||
int rc = mdbx_page_get(&couple.outer, *pg, &couple.outer.mc_pg[0],
|
||||
my->mc_txn->mt_txnid);
|
||||
__cold static int compacting_walk_tree(mdbx_compacting_ctx *ctx,
|
||||
MDBX_cursor *mc, pgno_t *root,
|
||||
txnid_t parent_txnid) {
|
||||
mc->mc_snum = 1;
|
||||
int rc = mdbx_page_get(mc, *root, &mc->mc_pg[0], parent_txnid);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
rc = mdbx_page_search_root(&couple.outer, NULL, MDBX_PS_FIRST);
|
||||
|
||||
rc = mdbx_page_search_root(mc, nullptr, MDBX_PS_FIRST);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
|
||||
/* Make cursor pages writable */
|
||||
char *const buf = mdbx_malloc(pgno2bytes(my->mc_env, couple.outer.mc_snum));
|
||||
char *const buf = mdbx_malloc(pgno2bytes(ctx->mc_env, mc->mc_snum));
|
||||
if (buf == NULL)
|
||||
return MDBX_ENOMEM;
|
||||
|
||||
char *ptr = buf;
|
||||
for (unsigned i = 0; i < couple.outer.mc_top; i++) {
|
||||
mdbx_page_copy((MDBX_page *)ptr, couple.outer.mc_pg[i],
|
||||
my->mc_env->me_psize);
|
||||
couple.outer.mc_pg[i] = (MDBX_page *)ptr;
|
||||
ptr += my->mc_env->me_psize;
|
||||
for (unsigned i = 0; i < mc->mc_top; i++) {
|
||||
mdbx_page_copy((MDBX_page *)ptr, mc->mc_pg[i], ctx->mc_env->me_psize);
|
||||
mc->mc_pg[i] = (MDBX_page *)ptr;
|
||||
ptr += ctx->mc_env->me_psize;
|
||||
}
|
||||
|
||||
/* This is writable space for a leaf page. Usually not needed. */
|
||||
MDBX_page *const leaf = (MDBX_page *)ptr;
|
||||
MDBX_page *copy;
|
||||
|
||||
while (couple.outer.mc_snum > 0) {
|
||||
MDBX_page *mp = couple.outer.mc_pg[couple.outer.mc_top];
|
||||
while (mc->mc_snum > 0) {
|
||||
MDBX_page *mp = mc->mc_pg[mc->mc_top];
|
||||
unsigned n = page_numkeys(mp);
|
||||
|
||||
if (IS_LEAF(mp)) {
|
||||
if (hive_flags == 0 /* may have nested F_SUBDATA or F_BIGDATA nodes */) {
|
||||
if (!(mc->mc_flags &
|
||||
C_SUB) /* may have nested F_SUBDATA or F_BIGDATA nodes */) {
|
||||
for (unsigned i = 0; i < n; i++) {
|
||||
MDBX_node *node = page_node(mp, i);
|
||||
if (node_flags(node) & F_BIGDATA) {
|
||||
MDBX_page *osrc;
|
||||
|
||||
if (node_flags(node) == F_BIGDATA) {
|
||||
/* Need writable leaf */
|
||||
if (mp != leaf) {
|
||||
couple.outer.mc_pg[couple.outer.mc_top] = leaf;
|
||||
mdbx_page_copy(leaf, mp, my->mc_env->me_psize);
|
||||
mc->mc_pg[mc->mc_top] = leaf;
|
||||
mdbx_page_copy(leaf, mp, ctx->mc_env->me_psize);
|
||||
mp = leaf;
|
||||
node = page_node(mp, i);
|
||||
}
|
||||
|
||||
const pgno_t pgno = node_largedata_pgno(node);
|
||||
poke_pgno(node_data(node), my->mc_next_pgno);
|
||||
rc = mdbx_page_get(&couple.outer, pgno, &osrc, mp->mp_txnid);
|
||||
poke_pgno(node_data(node), ctx->mc_next_pgno);
|
||||
MDBX_page *osrc;
|
||||
rc = mdbx_page_get(mc, pgno, &osrc, mp->mp_txnid);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
goto done;
|
||||
unsigned toggle = my->mc_head & 1;
|
||||
if (my->mc_wlen[toggle] + my->mc_env->me_psize >
|
||||
((size_t)(MDBX_ENVCOPY_WRITEBUF))) {
|
||||
rc = mdbx_env_cthr_toggle(my);
|
||||
|
||||
unsigned side = ctx->mc_head & 1;
|
||||
if (ctx->mc_wlen[side] + ctx->mc_env->me_psize >
|
||||
(size_t)MDBX_ENVCOPY_WRITEBUF) {
|
||||
rc = compacting_toggle_write_buffers(ctx);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
goto done;
|
||||
toggle = my->mc_head & 1;
|
||||
side = ctx->mc_head & 1;
|
||||
}
|
||||
copy = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
|
||||
memcpy(copy, osrc, my->mc_env->me_psize);
|
||||
copy->mp_pgno = my->mc_next_pgno;
|
||||
my->mc_next_pgno += osrc->mp_pages;
|
||||
my->mc_wlen[toggle] += my->mc_env->me_psize;
|
||||
copy = (MDBX_page *)(ctx->mc_wbuf[side] + ctx->mc_wlen[side]);
|
||||
memcpy(copy, osrc, ctx->mc_env->me_psize);
|
||||
copy->mp_pgno = ctx->mc_next_pgno;
|
||||
ctx->mc_next_pgno += osrc->mp_pages;
|
||||
ctx->mc_wlen[side] += ctx->mc_env->me_psize;
|
||||
|
||||
if (osrc->mp_pages > 1) {
|
||||
my->mc_olen[toggle] = pgno2bytes(my->mc_env, osrc->mp_pages - 1);
|
||||
my->mc_over[toggle] = (uint8_t *)osrc + my->mc_env->me_psize;
|
||||
rc = mdbx_env_cthr_toggle(my);
|
||||
ctx->mc_olen[side] = pgno2bytes(ctx->mc_env, osrc->mp_pages - 1);
|
||||
ctx->mc_over[side] = (uint8_t *)osrc + ctx->mc_env->me_psize;
|
||||
rc = compacting_toggle_write_buffers(ctx);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
goto done;
|
||||
toggle = my->mc_head & 1;
|
||||
side = ctx->mc_head & 1;
|
||||
}
|
||||
} else if (node_flags(node) & F_SUBDATA) {
|
||||
if (!MDBX_DISABLE_VALIDATION &&
|
||||
@ -19562,77 +19552,84 @@ __cold static int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg,
|
||||
|
||||
/* Need writable leaf */
|
||||
if (mp != leaf) {
|
||||
couple.outer.mc_pg[couple.outer.mc_top] = leaf;
|
||||
mdbx_page_copy(leaf, mp, my->mc_env->me_psize);
|
||||
mc->mc_pg[mc->mc_top] = leaf;
|
||||
mdbx_page_copy(leaf, mp, ctx->mc_env->me_psize);
|
||||
mp = leaf;
|
||||
node = page_node(mp, i);
|
||||
}
|
||||
|
||||
MDBX_db db;
|
||||
memcpy(&db, node_data(node), sizeof(MDBX_db));
|
||||
STATIC_ASSERT(F_DUPDATA == MDBX_DUPSORT);
|
||||
rc = mdbx_env_cwalk(my, &db.md_root,
|
||||
(node_flags(node) & F_DUPDATA)
|
||||
? MDBX_DUPSORT |
|
||||
(db.md_flags & MDBX_DUPFIXED)
|
||||
: 0);
|
||||
MDBX_db *nested = nullptr;
|
||||
if (node_flags(node) & F_DUPDATA) {
|
||||
rc = mdbx_xcursor_init1(mc, node, mp);
|
||||
if (likely(rc == MDBX_SUCCESS)) {
|
||||
nested = &mc->mc_xcursor->mx_db;
|
||||
rc = compacting_walk_tree(ctx, &mc->mc_xcursor->mx_cursor,
|
||||
&nested->md_root, mp->mp_txnid);
|
||||
}
|
||||
} else {
|
||||
MDBX_cursor_couple *couple =
|
||||
container_of(mc, MDBX_cursor_couple, inner.mx_cursor);
|
||||
nested = &couple->inner.mx_db;
|
||||
memcpy(nested, node_data(node), sizeof(MDBX_db));
|
||||
rc = compacting_walk_sdb(ctx, nested);
|
||||
}
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
goto done;
|
||||
memcpy(node_data(node), &db, sizeof(MDBX_db));
|
||||
memcpy(node_data(node), nested, sizeof(MDBX_db));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
couple.outer.mc_ki[couple.outer.mc_top]++;
|
||||
if (couple.outer.mc_ki[couple.outer.mc_top] < n) {
|
||||
mc->mc_ki[mc->mc_top]++;
|
||||
if (mc->mc_ki[mc->mc_top] < n) {
|
||||
again:;
|
||||
const MDBX_node *node =
|
||||
page_node(mp, couple.outer.mc_ki[couple.outer.mc_top]);
|
||||
if (unlikely(node->mn_flags)) {
|
||||
const MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
|
||||
if (unlikely(node_flags(node))) {
|
||||
mdbx_error("unexpected type 0x%x of node #%u on page #%" PRIaPGNO,
|
||||
node->mn_flags, couple.outer.mc_ki[couple.outer.mc_top],
|
||||
couple.outer.mc_pg[couple.outer.mc_top]->mp_pgno);
|
||||
node_flags(node), mc->mc_ki[mc->mc_top],
|
||||
mc->mc_pg[mc->mc_top]->mp_pgno);
|
||||
rc = MDBX_CORRUPTED;
|
||||
goto done;
|
||||
}
|
||||
rc = mdbx_page_get(&couple.outer, node_pgno(node), &mp, mp->mp_txnid);
|
||||
rc = mdbx_page_get(mc, node_pgno(node), &mp, mp->mp_txnid);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
goto done;
|
||||
couple.outer.mc_top++;
|
||||
couple.outer.mc_snum++;
|
||||
couple.outer.mc_ki[couple.outer.mc_top] = 0;
|
||||
mc->mc_top++;
|
||||
mc->mc_snum++;
|
||||
mc->mc_ki[mc->mc_top] = 0;
|
||||
if (IS_BRANCH(mp)) {
|
||||
/* Whenever we advance to a sibling branch page,
|
||||
* we must proceed all the way down to its first leaf. */
|
||||
mdbx_page_copy(couple.outer.mc_pg[couple.outer.mc_top], mp,
|
||||
my->mc_env->me_psize);
|
||||
mdbx_page_copy(mc->mc_pg[mc->mc_top], mp, ctx->mc_env->me_psize);
|
||||
goto again;
|
||||
} else
|
||||
couple.outer.mc_pg[couple.outer.mc_top] = mp;
|
||||
mc->mc_pg[mc->mc_top] = mp;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
unsigned toggle = my->mc_head & 1;
|
||||
if (my->mc_wlen[toggle] + my->mc_wlen[toggle] >
|
||||
((size_t)(MDBX_ENVCOPY_WRITEBUF))) {
|
||||
rc = mdbx_env_cthr_toggle(my);
|
||||
|
||||
unsigned side = ctx->mc_head & 1;
|
||||
if (ctx->mc_wlen[side] + ctx->mc_env->me_psize >
|
||||
(size_t)MDBX_ENVCOPY_WRITEBUF) {
|
||||
rc = compacting_toggle_write_buffers(ctx);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
goto done;
|
||||
toggle = my->mc_head & 1;
|
||||
side = ctx->mc_head & 1;
|
||||
}
|
||||
copy = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
|
||||
mdbx_page_copy(copy, mp, my->mc_env->me_psize);
|
||||
copy->mp_pgno = my->mc_next_pgno++;
|
||||
my->mc_wlen[toggle] += my->mc_env->me_psize;
|
||||
if (couple.outer.mc_top) {
|
||||
copy = (MDBX_page *)(ctx->mc_wbuf[side] + ctx->mc_wlen[side]);
|
||||
mdbx_page_copy(copy, mp, ctx->mc_env->me_psize);
|
||||
copy->mp_pgno = ctx->mc_next_pgno++;
|
||||
ctx->mc_wlen[side] += ctx->mc_env->me_psize;
|
||||
|
||||
if (mc->mc_top) {
|
||||
/* Update parent if there is one */
|
||||
node_set_pgno(page_node(couple.outer.mc_pg[couple.outer.mc_top - 1],
|
||||
couple.outer.mc_ki[couple.outer.mc_top - 1]),
|
||||
node_set_pgno(
|
||||
page_node(mc->mc_pg[mc->mc_top - 1], mc->mc_ki[mc->mc_top - 1]),
|
||||
copy->mp_pgno);
|
||||
mdbx_cursor_pop(&couple.outer);
|
||||
mdbx_cursor_pop(mc);
|
||||
} else {
|
||||
/* Otherwise we're done */
|
||||
*pg = copy->mp_pgno;
|
||||
*root = copy->mp_pgno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -19641,7 +19638,25 @@ done:
|
||||
return rc;
|
||||
}
|
||||
|
||||
__cold static void compact_fixup_meta(MDBX_env *env, MDBX_meta *meta) {
|
||||
__cold static int compacting_walk_sdb(mdbx_compacting_ctx *ctx, MDBX_db *sdb) {
|
||||
if (unlikely(sdb->md_root == P_INVALID))
|
||||
return MDBX_SUCCESS; /* empty db */
|
||||
|
||||
MDBX_cursor_couple couple;
|
||||
MDBX_dbx dbx = {.md_klen_min = INT_MAX};
|
||||
uint8_t dbistate = DBI_VALID | DBI_AUDITED;
|
||||
int rc = mdbx_couple_init(&couple, ~0u, ctx->mc_txn, sdb, &dbx, &dbistate);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
|
||||
couple.outer.mc_checking |= CC_SKIPORD | CC_PAGECHECK;
|
||||
couple.inner.mx_cursor.mc_checking |= CC_SKIPORD | CC_PAGECHECK;
|
||||
return compacting_walk_tree(ctx, &couple.outer, &sdb->md_root,
|
||||
sdb->md_mod_txnid ? sdb->md_mod_txnid
|
||||
: ctx->mc_txn->mt_txnid);
|
||||
}
|
||||
|
||||
__cold static void compacting_fixup_meta(MDBX_env *env, MDBX_meta *meta) {
|
||||
/* Calculate filesize taking in account shrink/growing thresholds */
|
||||
if (meta->mm_geo.next != meta->mm_geo.now) {
|
||||
meta->mm_geo.now = meta->mm_geo.next;
|
||||
@ -19665,7 +19680,7 @@ __cold static void compact_fixup_meta(MDBX_env *env, MDBX_meta *meta) {
|
||||
}
|
||||
|
||||
/* Make resizeable */
|
||||
__cold static void make_sizeable(MDBX_meta *meta) {
|
||||
__cold static void meta_make_sizeable(MDBX_meta *meta) {
|
||||
meta->mm_geo.lower = MIN_PAGENO;
|
||||
if (meta->mm_geo.grow_pv == 0) {
|
||||
const pgno_t step = 1 + (meta->mm_geo.upper - meta->mm_geo.lower) / 42;
|
||||
@ -19688,7 +19703,7 @@ __cold static int mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
|
||||
meta_set_txnid(env, meta, read_txn->mt_txnid);
|
||||
|
||||
if (flags & MDBX_CP_FORCE_DYNAMIC_SIZE)
|
||||
make_sizeable(meta);
|
||||
meta_make_sizeable(meta);
|
||||
|
||||
/* copy canary sequences if present */
|
||||
if (read_txn->mt_canary.v) {
|
||||
@ -19696,67 +19711,96 @@ __cold static int mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
|
||||
meta->mm_canary.v = constmeta_txnid(env, meta);
|
||||
}
|
||||
|
||||
/* Set metapage 1 with current main DB */
|
||||
pgno_t new_root, root = read_txn->mt_dbs[MAIN_DBI].md_root;
|
||||
if ((new_root = root) == P_INVALID) {
|
||||
if (read_txn->mt_dbs[MAIN_DBI].md_root == P_INVALID) {
|
||||
/* When the DB is empty, handle it specially to
|
||||
* fix any breakage like page leaks from ITS#8174. */
|
||||
meta->mm_dbs[MAIN_DBI].md_flags = read_txn->mt_dbs[MAIN_DBI].md_flags;
|
||||
compact_fixup_meta(env, meta);
|
||||
compacting_fixup_meta(env, meta);
|
||||
if (dest_is_pipe) {
|
||||
int rc = mdbx_write(fd, buffer, meta_bytes);
|
||||
if (rc != MDBX_SUCCESS)
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
}
|
||||
} else {
|
||||
/* Count free pages + GC pages. Subtract from last_pg
|
||||
* to find the new last_pg, which also becomes the new root. */
|
||||
pgno_t freecount = 0;
|
||||
/* Count free pages + GC pages. */
|
||||
MDBX_cursor_couple couple;
|
||||
MDBX_val key, data;
|
||||
|
||||
int rc = mdbx_cursor_init(&couple.outer, read_txn, FREE_DBI);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
while ((rc = mdbx_cursor_get(&couple.outer, &key, &data, MDBX_NEXT)) == 0)
|
||||
freecount += *(pgno_t *)data.iov_base;
|
||||
pgno_t gc = read_txn->mt_dbs[FREE_DBI].md_branch_pages +
|
||||
read_txn->mt_dbs[FREE_DBI].md_leaf_pages +
|
||||
read_txn->mt_dbs[FREE_DBI].md_overflow_pages;
|
||||
MDBX_val key, data;
|
||||
while ((rc = mdbx_cursor_get(&couple.outer, &key, &data, MDBX_NEXT)) ==
|
||||
MDBX_SUCCESS) {
|
||||
const MDBX_PNL pnl = data.iov_base;
|
||||
if (unlikely(data.iov_len % sizeof(pgno_t) ||
|
||||
data.iov_len < MDBX_PNL_SIZEOF(pnl) ||
|
||||
!(mdbx_pnl_check(pnl, read_txn->mt_next_pgno))))
|
||||
return MDBX_CORRUPTED;
|
||||
gc += MDBX_PNL_SIZE(pnl);
|
||||
}
|
||||
if (unlikely(rc != MDBX_NOTFOUND))
|
||||
return rc;
|
||||
|
||||
freecount += read_txn->mt_dbs[FREE_DBI].md_branch_pages +
|
||||
read_txn->mt_dbs[FREE_DBI].md_leaf_pages +
|
||||
read_txn->mt_dbs[FREE_DBI].md_overflow_pages;
|
||||
|
||||
new_root = read_txn->mt_next_pgno - 1 - freecount;
|
||||
meta->mm_geo.next = new_root + 1;
|
||||
/* Substract GC-pages from mt_next_pgno to find the new mt_next_pgno. */
|
||||
meta->mm_geo.next = read_txn->mt_next_pgno - gc;
|
||||
/* Set with current main DB */
|
||||
meta->mm_dbs[MAIN_DBI] = read_txn->mt_dbs[MAIN_DBI];
|
||||
meta->mm_dbs[MAIN_DBI].md_root = new_root;
|
||||
|
||||
mdbx_copy ctx;
|
||||
mdbx_compacting_ctx ctx;
|
||||
memset(&ctx, 0, sizeof(ctx));
|
||||
rc = mdbx_condpair_init(&ctx.mc_condpair);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
|
||||
memset(data_buffer, 0, ((size_t)(MDBX_ENVCOPY_WRITEBUF)) * 2);
|
||||
memset(data_buffer, 0, 2 * (size_t)MDBX_ENVCOPY_WRITEBUF);
|
||||
ctx.mc_wbuf[0] = data_buffer;
|
||||
ctx.mc_wbuf[1] = data_buffer + ((size_t)(MDBX_ENVCOPY_WRITEBUF));
|
||||
ctx.mc_wbuf[1] = data_buffer + (size_t)MDBX_ENVCOPY_WRITEBUF;
|
||||
ctx.mc_next_pgno = NUM_METAS;
|
||||
ctx.mc_env = env;
|
||||
ctx.mc_fd = fd;
|
||||
ctx.mc_txn = read_txn;
|
||||
|
||||
mdbx_thread_t thread;
|
||||
int thread_err = mdbx_thread_create(&thread, mdbx_env_copythr, &ctx);
|
||||
int thread_err = mdbx_thread_create(&thread, compacting_write_thread, &ctx);
|
||||
if (likely(thread_err == MDBX_SUCCESS)) {
|
||||
if (dest_is_pipe) {
|
||||
compact_fixup_meta(env, meta);
|
||||
compacting_fixup_meta(env, meta);
|
||||
rc = mdbx_write(fd, buffer, meta_bytes);
|
||||
}
|
||||
if (rc == MDBX_SUCCESS)
|
||||
rc = mdbx_env_cwalk(&ctx, &root, 0);
|
||||
mdbx_env_cthr_toggle(&ctx);
|
||||
mdbx_env_cthr_toggle(&ctx);
|
||||
if (likely(rc == MDBX_SUCCESS))
|
||||
rc = compacting_walk_sdb(&ctx, &meta->mm_dbs[MAIN_DBI]);
|
||||
if (ctx.mc_wlen[ctx.mc_head & 1])
|
||||
/* toggle to flush non-empty buffers */
|
||||
compacting_toggle_write_buffers(&ctx);
|
||||
|
||||
if (likely(rc == MDBX_SUCCESS) &&
|
||||
unlikely(meta->mm_geo.next != ctx.mc_next_pgno)) {
|
||||
if (ctx.mc_next_pgno > meta->mm_geo.next) {
|
||||
mdbx_error(
|
||||
"the source DB %s: post-compactification used pages %" PRIaPGNO
|
||||
" %c expected %" PRIaPGNO,
|
||||
"has double-used pages or other corruption", ctx.mc_next_pgno,
|
||||
'>', meta->mm_geo.next);
|
||||
rc = MDBX_CORRUPTED; /* corrupted DB */
|
||||
}
|
||||
if (ctx.mc_next_pgno < meta->mm_geo.next) {
|
||||
mdbx_warning(
|
||||
"the source DB %s: post-compactification used pages %" PRIaPGNO
|
||||
" %c expected %" PRIaPGNO,
|
||||
"has page leak(s)", ctx.mc_next_pgno, '<', meta->mm_geo.next);
|
||||
if (dest_is_pipe)
|
||||
/* the root within already written meta-pages is wrong */
|
||||
rc = MDBX_CORRUPTED;
|
||||
}
|
||||
/* fixup meta */
|
||||
meta->mm_geo.next = ctx.mc_next_pgno;
|
||||
}
|
||||
|
||||
/* toggle with empty buffers to exit thread's loop */
|
||||
mdbx_assert(env, (ctx.mc_wlen[ctx.mc_head & 1]) == 0);
|
||||
compacting_toggle_write_buffers(&ctx);
|
||||
thread_err = mdbx_thread_join(thread);
|
||||
mdbx_assert(env, (ctx.mc_tail == ctx.mc_head &&
|
||||
ctx.mc_wlen[ctx.mc_head & 1] == 0) ||
|
||||
@ -19769,32 +19813,8 @@ __cold static int mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
|
||||
return rc;
|
||||
if (unlikely(ctx.mc_error != MDBX_SUCCESS))
|
||||
return ctx.mc_error;
|
||||
|
||||
if (dest_is_pipe) {
|
||||
if (unlikely(root != new_root)) {
|
||||
mdbx_error("post-compactification root %" PRIaPGNO
|
||||
" NE expected %" PRIaPGNO
|
||||
" (source DB corrupted or has a page leak(s))",
|
||||
root, new_root);
|
||||
return MDBX_CORRUPTED; /* page leak or corrupt DB */
|
||||
}
|
||||
} else {
|
||||
if (unlikely(root > new_root)) {
|
||||
mdbx_error("post-compactification root %" PRIaPGNO
|
||||
" GT expected %" PRIaPGNO " (source DB corrupted)",
|
||||
root, new_root);
|
||||
return MDBX_CORRUPTED; /* page leak or corrupt DB */
|
||||
}
|
||||
if (unlikely(root < new_root)) {
|
||||
mdbx_warning("post-compactification root %" PRIaPGNO
|
||||
" LT expected %" PRIaPGNO " (page leak(s) in source DB)",
|
||||
root, new_root);
|
||||
/* fixup meta */
|
||||
meta->mm_dbs[MAIN_DBI].md_root = root;
|
||||
meta->mm_geo.next = root + 1;
|
||||
}
|
||||
compact_fixup_meta(env, meta);
|
||||
}
|
||||
if (!dest_is_pipe)
|
||||
compacting_fixup_meta(env, meta);
|
||||
}
|
||||
|
||||
/* Extend file if required */
|
||||
@ -19804,11 +19824,10 @@ __cold static int mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
|
||||
return mdbx_ftruncate(fd, whole_size);
|
||||
|
||||
const size_t used_size = pgno2bytes(env, meta->mm_geo.next);
|
||||
memset(data_buffer, 0, ((size_t)(MDBX_ENVCOPY_WRITEBUF)));
|
||||
memset(data_buffer, 0, (size_t)MDBX_ENVCOPY_WRITEBUF);
|
||||
for (size_t offset = used_size; offset < whole_size;) {
|
||||
const size_t chunk =
|
||||
(((size_t)(MDBX_ENVCOPY_WRITEBUF)) < whole_size - offset)
|
||||
? ((size_t)(MDBX_ENVCOPY_WRITEBUF))
|
||||
const size_t chunk = ((size_t)MDBX_ENVCOPY_WRITEBUF < whole_size - offset)
|
||||
? (size_t)MDBX_ENVCOPY_WRITEBUF
|
||||
: whole_size - offset;
|
||||
/* copy to avoid EFAULT in case swapped-out */
|
||||
int rc = mdbx_write(fd, data_buffer, chunk);
|
||||
@ -19850,7 +19869,7 @@ __cold static int mdbx_env_copy_asis(MDBX_env *env, MDBX_txn *read_txn,
|
||||
mdbx_txn_unlock(env);
|
||||
|
||||
if (flags & MDBX_CP_FORCE_DYNAMIC_SIZE)
|
||||
make_sizeable(headcopy);
|
||||
meta_make_sizeable(headcopy);
|
||||
/* Update signature to steady */
|
||||
unaligned_poke_u64(4, headcopy->mm_datasync_sign, meta_sign(headcopy));
|
||||
|
||||
@ -19910,9 +19929,8 @@ __cold static int mdbx_env_copy_asis(MDBX_env *env, MDBX_txn *read_txn,
|
||||
#endif /* MDBX_USE_COPYFILERANGE */
|
||||
|
||||
/* fallback to portable */
|
||||
const size_t chunk =
|
||||
(((size_t)(MDBX_ENVCOPY_WRITEBUF)) < used_size - offset)
|
||||
? ((size_t)(MDBX_ENVCOPY_WRITEBUF))
|
||||
const size_t chunk = ((size_t)MDBX_ENVCOPY_WRITEBUF < used_size - offset)
|
||||
? (size_t)MDBX_ENVCOPY_WRITEBUF
|
||||
: used_size - offset;
|
||||
/* copy to avoid EFAULT in case swapped-out */
|
||||
memcpy(data_buffer, env->me_map + offset, chunk);
|
||||
@ -19925,12 +19943,12 @@ __cold static int mdbx_env_copy_asis(MDBX_env *env, MDBX_txn *read_txn,
|
||||
if (!dest_is_pipe)
|
||||
rc = mdbx_ftruncate(fd, whole_size);
|
||||
else {
|
||||
memset(data_buffer, 0, ((size_t)(MDBX_ENVCOPY_WRITEBUF)));
|
||||
memset(data_buffer, 0, (size_t)MDBX_ENVCOPY_WRITEBUF);
|
||||
for (size_t offset = used_size;
|
||||
rc == MDBX_SUCCESS && offset < whole_size;) {
|
||||
const size_t chunk =
|
||||
(((size_t)(MDBX_ENVCOPY_WRITEBUF)) < whole_size - offset)
|
||||
? ((size_t)(MDBX_ENVCOPY_WRITEBUF))
|
||||
((size_t)MDBX_ENVCOPY_WRITEBUF < whole_size - offset)
|
||||
? (size_t)MDBX_ENVCOPY_WRITEBUF
|
||||
: whole_size - offset;
|
||||
/* copy to avoid EFAULT in case swapped-out */
|
||||
rc = mdbx_write(fd, data_buffer, chunk);
|
||||
@ -19961,8 +19979,8 @@ __cold int mdbx_env_copy2fd(MDBX_env *env, mdbx_filehandle_t fd,
|
||||
const size_t buffer_size =
|
||||
pgno_align2os_bytes(env, NUM_METAS) +
|
||||
ceil_powerof2(((flags & MDBX_CP_COMPACT)
|
||||
? ((size_t)(MDBX_ENVCOPY_WRITEBUF)) * 2
|
||||
: ((size_t)(MDBX_ENVCOPY_WRITEBUF))),
|
||||
? 2 * (size_t)MDBX_ENVCOPY_WRITEBUF
|
||||
: (size_t)MDBX_ENVCOPY_WRITEBUF),
|
||||
env->me_os_psize);
|
||||
|
||||
uint8_t *buffer = NULL;
|
||||
|
Loading…
x
Reference in New Issue
Block a user