mdbx: refine multi-thread flipping of buffers during env-copy.

Change-Id: Id132c1af0e1131da70ab1b35bce9f6a6548edbe3
This commit is contained in:
Leonid Yuriev 2020-11-19 11:54:47 +03:00
parent 9054b25441
commit 1bbf20bf39

View File

@ -16320,7 +16320,6 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
#ifndef MDBX_WBUF #ifndef MDBX_WBUF
#define MDBX_WBUF ((size_t)1024 * 1024) #define MDBX_WBUF ((size_t)1024 * 1024)
#endif #endif
#define MDBX_EOF 0x10 /* mdbx_env_copythr() is done reading */
/* State needed for a double-buffering compacting copy. */ /* State needed for a double-buffering compacting copy. */
typedef struct mdbx_copy { typedef struct mdbx_copy {
@ -16332,19 +16331,17 @@ typedef struct mdbx_copy {
size_t mc_wlen[2]; size_t mc_wlen[2];
size_t mc_olen[2]; size_t mc_olen[2];
mdbx_filehandle_t mc_fd; mdbx_filehandle_t mc_fd;
volatile int mc_error;
pgno_t mc_next_pgno;
short mc_toggle; /* Buffer number in provider */
short mc_new; /* (0-2 buffers to write) | (MDBX_EOF at end) */
/* Error code. Never cleared if set. Both threads can set nonzero /* Error code. Never cleared if set. Both threads can set nonzero
* to fail the copy. Not mutex-protected, MDBX expects atomic int. */ * to fail the copy. Not mutex-protected, MDBX expects atomic int. */
volatile int mc_error;
pgno_t mc_next_pgno;
volatile unsigned mc_head;
volatile unsigned mc_tail;
} mdbx_copy; } mdbx_copy;
/* Dedicated writer thread for compacting copy. */ /* Dedicated writer thread for compacting copy. */
static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) { static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
mdbx_copy *my = arg; mdbx_copy *my = arg;
uint8_t *ptr;
int toggle = 0;
#if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64)) #if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64))
sigset_t sigset; sigset_t sigset;
@ -16355,19 +16352,23 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
mdbx_condpair_lock(&my->mc_condpair); mdbx_condpair_lock(&my->mc_condpair);
while (!my->mc_error) { while (!my->mc_error) {
while (!my->mc_new && !my->mc_error) { while (my->mc_tail == my->mc_head && !my->mc_error) {
int err = mdbx_condpair_wait(&my->mc_condpair, true); int err = mdbx_condpair_wait(&my->mc_condpair, true);
if (err != MDBX_SUCCESS) { if (err != MDBX_SUCCESS) {
my->mc_error = err; my->mc_error = err;
goto bailout; goto bailout;
} }
} }
if (my->mc_new == 0 + MDBX_EOF) /* 0 buffers, just EOF */ const unsigned toggle = my->mc_tail & 1;
break;
size_t wsize = my->mc_wlen[toggle]; size_t wsize = my->mc_wlen[toggle];
ptr = my->mc_wbuf[toggle]; if (wsize == 0) {
my->mc_tail += 1;
break /* EOF */;
}
my->mc_wlen[toggle] = 0;
uint8_t *ptr = my->mc_wbuf[toggle];
again: again:
if (wsize > 0 && !my->mc_error) { if (!my->mc_error) {
int err = mdbx_write(my->mc_fd, ptr, wsize); int err = mdbx_write(my->mc_fd, ptr, wsize);
if (err != MDBX_SUCCESS) { if (err != MDBX_SUCCESS) {
#if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64)) #if defined(EPIPE) && !(defined(_WIN32) || defined(_WIN64))
@ -16384,16 +16385,13 @@ static THREAD_RESULT __cold THREAD_CALL mdbx_env_copythr(void *arg) {
} }
/* If there's an overflow page tail, write it too */ /* If there's an overflow page tail, write it too */
if (my->mc_olen[toggle]) {
wsize = my->mc_olen[toggle]; wsize = my->mc_olen[toggle];
ptr = my->mc_over[toggle]; if (wsize) {
my->mc_olen[toggle] = 0; my->mc_olen[toggle] = 0;
ptr = my->mc_over[toggle];
goto again; goto again;
} }
my->mc_wlen[toggle] = 0; my->mc_tail += 1;
toggle ^= 1;
/* Return the empty buffer to provider */
my->mc_new--;
mdbx_condpair_signal(&my->mc_condpair, false); mdbx_condpair_signal(&my->mc_condpair, false);
} }
bailout: bailout:
@ -16401,24 +16399,19 @@ bailout:
return (THREAD_RESULT)0; return (THREAD_RESULT)0;
} }
/* Give buffer and/or MDBX_EOF to writer thread, await unused buffer. /* Give buffer and/or MDBX_EOF to writer thread, await unused buffer. */
* static __cold int mdbx_env_cthr_toggle(mdbx_copy *my) {
* [in] my control structure.
* [in] adjust (1 to hand off 1 buffer) | (MDBX_EOF when ending). */
static __cold int mdbx_env_cthr_toggle(mdbx_copy *my, int adjust) {
mdbx_condpair_lock(&my->mc_condpair); mdbx_condpair_lock(&my->mc_condpair);
my->mc_new += (short)adjust; mdbx_assert(my->mc_env, my->mc_head - my->mc_tail < 2 || my->mc_error);
my->mc_head += 1;
mdbx_condpair_signal(&my->mc_condpair, true); mdbx_condpair_signal(&my->mc_condpair, true);
while (!my->mc_error && (my->mc_new & 2) /* both buffers in use */) { while (!my->mc_error &&
my->mc_head - my->mc_tail == 2 /* both buffers in use */) {
int err = mdbx_condpair_wait(&my->mc_condpair, false); int err = mdbx_condpair_wait(&my->mc_condpair, false);
if (err != MDBX_SUCCESS) if (err != MDBX_SUCCESS)
my->mc_error = err; my->mc_error = err;
} }
mdbx_condpair_unlock(&my->mc_condpair); mdbx_condpair_unlock(&my->mc_condpair);
my->mc_toggle ^= (adjust & 1);
/* Both threads reset mc_wlen, to be safe from threading errors */
my->mc_wlen[my->mc_toggle] = 0;
return my->mc_error; return my->mc_error;
} }
@ -16430,7 +16423,7 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
MDBX_cursor_couple couple; MDBX_cursor_couple couple;
MDBX_page *mo, *mp, *leaf; MDBX_page *mo, *mp, *leaf;
char *buf, *ptr; char *buf, *ptr;
int rc, toggle; int rc;
unsigned i; unsigned i;
/* Empty DB, nothing to do */ /* Empty DB, nothing to do */
@ -16466,11 +16459,9 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
/* This is writable space for a leaf page. Usually not needed. */ /* This is writable space for a leaf page. Usually not needed. */
leaf = (MDBX_page *)ptr; leaf = (MDBX_page *)ptr;
toggle = my->mc_toggle;
while (couple.outer.mc_snum > 0) { while (couple.outer.mc_snum > 0) {
unsigned n;
mp = couple.outer.mc_pg[couple.outer.mc_top]; mp = couple.outer.mc_pg[couple.outer.mc_top];
n = page_numkeys(mp); unsigned n = page_numkeys(mp);
if (IS_LEAF(mp)) { if (IS_LEAF(mp)) {
if (!IS_LEAF2(mp) && !(flags & F_DUPDATA)) { if (!IS_LEAF2(mp) && !(flags & F_DUPDATA)) {
@ -16493,11 +16484,12 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
pp_txnid4chk(mp, my->mc_txn)); pp_txnid4chk(mp, my->mc_txn));
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto done; goto done;
if (my->mc_wlen[toggle] >= MDBX_WBUF) { unsigned toggle = my->mc_head & 1;
rc = mdbx_env_cthr_toggle(my, 1); if (my->mc_wlen[toggle] + my->mc_env->me_psize > MDBX_WBUF) {
rc = mdbx_env_cthr_toggle(my);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto done; goto done;
toggle = my->mc_toggle; toggle = my->mc_head & 1;
} }
mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]); mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
memcpy(mo, omp, my->mc_env->me_psize); memcpy(mo, omp, my->mc_env->me_psize);
@ -16507,10 +16499,10 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
if (omp->mp_pages > 1) { if (omp->mp_pages > 1) {
my->mc_olen[toggle] = pgno2bytes(my->mc_env, omp->mp_pages - 1); my->mc_olen[toggle] = pgno2bytes(my->mc_env, omp->mp_pages - 1);
my->mc_over[toggle] = (uint8_t *)omp + my->mc_env->me_psize; my->mc_over[toggle] = (uint8_t *)omp + my->mc_env->me_psize;
rc = mdbx_env_cthr_toggle(my, 1); rc = mdbx_env_cthr_toggle(my);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto done; goto done;
toggle = my->mc_toggle; toggle = my->mc_head & 1;
} }
} else if (node_flags(node) & F_SUBDATA) { } else if (node_flags(node) & F_SUBDATA) {
if (node_ds(node) != sizeof(MDBX_db)) { if (node_ds(node) != sizeof(MDBX_db)) {
@ -16528,11 +16520,9 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
MDBX_db db; MDBX_db db;
memcpy(&db, node_data(node), sizeof(MDBX_db)); memcpy(&db, node_data(node), sizeof(MDBX_db));
my->mc_toggle = (short)toggle;
rc = mdbx_env_cwalk(my, &db.md_root, node_flags(node) & F_DUPDATA); rc = mdbx_env_cwalk(my, &db.md_root, node_flags(node) & F_DUPDATA);
if (rc) if (rc)
goto done; goto done;
toggle = my->mc_toggle;
memcpy(node_data(node), &db, sizeof(MDBX_db)); memcpy(node_data(node), &db, sizeof(MDBX_db));
} }
} }
@ -16561,11 +16551,12 @@ static __cold int mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
continue; continue;
} }
} }
if (my->mc_wlen[toggle] >= MDBX_WBUF) { unsigned toggle = my->mc_head & 1;
rc = mdbx_env_cthr_toggle(my, 1); if (my->mc_wlen[toggle] + my->mc_wlen[toggle] > MDBX_WBUF) {
rc = mdbx_env_cthr_toggle(my);
if (unlikely(rc != MDBX_SUCCESS)) if (unlikely(rc != MDBX_SUCCESS))
goto done; goto done;
toggle = my->mc_toggle; toggle = my->mc_head & 1;
} }
mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]); mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
mdbx_page_copy(mo, mp, my->mc_env->me_psize); mdbx_page_copy(mo, mp, my->mc_env->me_psize);
@ -16702,8 +16693,12 @@ static __cold int mdbx_env_compact(MDBX_env *env, MDBX_txn *read_txn,
} }
if (rc == MDBX_SUCCESS) if (rc == MDBX_SUCCESS)
rc = mdbx_env_cwalk(&ctx, &root, 0); rc = mdbx_env_cwalk(&ctx, &root, 0);
mdbx_env_cthr_toggle(&ctx, 1 | MDBX_EOF); mdbx_env_cthr_toggle(&ctx);
mdbx_env_cthr_toggle(&ctx);
thread_err = mdbx_thread_join(thread); thread_err = mdbx_thread_join(thread);
mdbx_assert(env, (ctx.mc_tail == ctx.mc_head &&
ctx.mc_wlen[ctx.mc_head & 1] == 0) ||
ctx.mc_error);
mdbx_condpair_destroy(&ctx.mc_condpair); mdbx_condpair_destroy(&ctx.mc_condpair);
} }
if (unlikely(thread_err != MDBX_SUCCESS)) if (unlikely(thread_err != MDBX_SUCCESS))