diff --git a/src/core.c b/src/core.c index a6e64303..d8b22159 100644 --- a/src/core.c +++ b/src/core.c @@ -16320,7 +16320,6 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data, #ifndef MDBX_WBUF #define MDBX_WBUF ((size_t)1024 * 1024) #endif -#define MDBX_EOF 0x10 /* mdbx_env_copythr() is done reading */ /* State needed for a double-buffering compacting copy. */ typedef struct mdbx_copy { @@ -16332,19 +16331,17 @@ typedef struct mdbx_copy { size_t mc_wlen[2]; size_t mc_olen[2]; mdbx_filehandle_t mc_fd; - volatile int mc_error; - pgno_t mc_next_pgno; - short mc_toggle; /* Buffer number in provider */ - short mc_new; /* (0-2 buffers to write) | (MDBX_EOF at end) */ /* Error code. Never cleared if set. Both threads can set nonzero * to fail the copy. Not mutex-protected, MDBX expects atomic int. */ + volatile int mc_error; + pgno_t mc_next_pgno; + volatile unsigned mc_head; + volatile unsigned mc_tail; } mdbx_copy; /* Dedicated writer thread for compacting copy. */ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) { mdbx_copy *my = arg; - uint8_t *ptr; - int toggle = 0; #if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64)) sigset_t sigset; @@ -16355,19 +16352,23 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) { mdbx_condpair_lock(&my->mc_condpair); while (!my->mc_error) { - while (!my->mc_new && !my->mc_error) { + while (my->mc_tail == my->mc_head && !my->mc_error) { int err = mdbx_condpair_wait(&my->mc_condpair, true); if (err != MDBX_SUCCESS) { my->mc_error = err; goto bailout; } } - if (my->mc_new == 0 + MDBX_EOF) /* 0 buffers, just EOF */ - break; + const unsigned toggle = my->mc_tail & 1; size_t wsize = my->mc_wlen[toggle]; - ptr = my->mc_wbuf[toggle]; + if (wsize == 0) { + my->mc_tail += 1; + break /* EOF */; + } + my->mc_wlen[toggle] = 0; + uint8_t *ptr = my->mc_wbuf[toggle]; again: - if (wsize > 0 && !my->mc_error) { + if (!my->mc_error) { int err = mdbx_write(my->mc_fd, ptr, wsize); if (err != MDBX_SUCCESS) { #if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64)) @@ -16384,16 +16385,13 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) { } /* If there's an overflow page tail, write it too */ - if (my->mc_olen[toggle]) { - wsize = my->mc_olen[toggle]; - ptr = my->mc_over[toggle]; + wsize = my->mc_olen[toggle]; + if (wsize) { my->mc_olen[toggle] = 0; + ptr = my->mc_over[toggle]; goto again; } - my->mc_wlen[toggle] = 0; - toggle ^= 1; - /* Return the empty buffer to provider */ - my->mc_new--; + my->mc_tail += 1; mdbx_condpair_signal(&my->mc_condpair, false); } bailout: @@ -16401,24 +16399,19 @@ bailout: return (THREAD_RESULT)0; } -/* Give buffer and/or MDBX_EOF to writer thread, await unused buffer. - * - * [in] my control structure. - * [in] adjust (1 to hand off 1 buffer) | (MDBX_EOF when ending). */ -static __cold int mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) { +/* Give buffer and/or MDBX_EOF to writer thread, await unused buffer. */ +static __cold int mdbx_env_cthr_toggle(mdbx_copy *my) { mdbx_condpair_lock(&my->mc_condpair); - my->mc_new += (short)adjust; + mdbx_assert(my->mc_env, my->mc_head - my->mc_tail < 2 || my->mc_error); + my->mc_head += 1; mdbx_condpair_signal(&my->mc_condpair, true); - while (!my->mc_error && (my->mc_new & 2) /* both buffers in use */) { + while (!my->mc_error && + my->mc_head - my->mc_tail == 2 /* both buffers in use */) { int err = mdbx_condpair_wait(&my->mc_condpair, false); if (err != MDBX_SUCCESS) my->mc_error = err; } mdbx_condpair_unlock(&my->mc_condpair); - - my->mc_toggle ^= (adjust & 1); - /* Both threads reset mc_wlen, to be safe from threading errors */ - my->mc_wlen[my->mc_toggle] = 0; return my->mc_error; } @@ -16430,7 +16423,7 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) { MDBX_cursor_couple couple; MDBX_page *mo, *mp, *leaf; char *buf, *ptr; - int rc, toggle; + int rc; unsigned i; /* Empty DB, nothing to do */ @@ -16466,11 +16459,9 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) { /* This is writable space for a leaf page. Usually not needed. */ leaf = (MDBX_page *)ptr; - toggle = my->mc_toggle; while (couple.outer.mc_snum > 0) { - unsigned n; mp = couple.outer.mc_pg[couple.outer.mc_top]; - n = page_numkeys(mp); + unsigned n = page_numkeys(mp); if (IS_LEAF(mp)) { if (!IS_LEAF2(mp) && !(flags & F_DUPDATA)) { @@ -16493,11 +16484,12 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) { pp_txnid4chk(mp, my->mc_txn)); if (unlikely(rc != MDBX_SUCCESS)) goto done; - if (my->mc_wlen[toggle] >= MDBX_WBUF) { - rc = mdbx_env_cthr_toggle(my, 1); + unsigned toggle = my->mc_head & 1; + if (my->mc_wlen[toggle] + my->mc_env->me_psize > MDBX_WBUF) { + rc = mdbx_env_cthr_toggle(my); if (unlikely(rc != MDBX_SUCCESS)) goto done; - toggle = my->mc_toggle; + toggle = my->mc_head & 1; } mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]); memcpy(mo, omp, my->mc_env->me_psize); @@ -16507,10 +16499,10 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) { if (omp->mp_pages > 1) { my->mc_olen[toggle] = pgno2bytes(my->mc_env, omp->mp_pages - 1); my->mc_over[toggle] = (uint8_t *)omp + my->mc_env->me_psize; - rc = mdbx_env_cthr_toggle(my, 1); + rc = mdbx_env_cthr_toggle(my); if (unlikely(rc != MDBX_SUCCESS)) goto done; - toggle = my->mc_toggle; + toggle = my->mc_head & 1; } } else if (node_flags(node) & F_SUBDATA) { if (node_ds(node) != sizeof(MDBX_db)) { @@ -16528,11 +16520,9 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) { MDBX_db db; memcpy(&db, node_data(node), sizeof(MDBX_db)); - my->mc_toggle = (short)toggle; rc = mdbx_env_cwalk(my, &db.md_root, node_flags(node) & F_DUPDATA); if (rc) goto done; - toggle = my->mc_toggle; memcpy(node_data(node), &db, sizeof(MDBX_db)); } } @@ -16561,11 +16551,12 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) { continue; } } - if (my->mc_wlen[toggle] >= MDBX_WBUF) { - rc = mdbx_env_cthr_toggle(my, 1); + unsigned toggle = my->mc_head & 1; + if (my->mc_wlen[toggle] + my->mc_wlen[toggle] > MDBX_WBUF) { + rc = mdbx_env_cthr_toggle(my); if (unlikely(rc != MDBX_SUCCESS)) goto done; - toggle = my->mc_toggle; + toggle = my->mc_head & 1; } mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]); mdbx_page_copy(mo, mp, my->mc_env->me_psize); @@ -16702,8 +16693,12 @@ static __cold int mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn, } if (rc == MDBX_SUCCESS) rc = mdbx_env_cwalk(&ctx, &root, 0); - mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF); + mdbx_env_cthr_toggle(&ctx); + mdbx_env_cthr_toggle(&ctx); thread_err = mdbx_thread_join(thread); + mdbx_assert(env, (ctx.mc_tail == ctx.mc_head && + ctx.mc_wlen[ctx.mc_head & 1] == 0) || + ctx.mc_error); mdbx_condpair_destroy(&ctx.mc_condpair); } if (unlikely(thread_err != MDBX_SUCCESS))