mdbx: backport - ITS#8209 fix MDB_CP_COMPACT.

Handle errors.  Fix cond_wait condition so mc_new
is the sole control var.  Drop specious cond_waits.
Do not look at 'mo' while copythr writes it.

Preserve DB flags (use metapage#1) when main DB is empty.
Fail if metapage root != actual root in output file.

Some _aligned_malloc() doc seems to think arg NULL = user error.
Don't know if posix_memalign() pointer is defined after failure.

Change-Id: Idfdc118b4848bb96bace0f29db9dcdd710b7a1f4
This commit is contained in:
Hallvard Furuseth 2016-06-25 07:55:34 +02:00 committed by Leo Yuriev
parent 6d99bb59a8
commit 3befcdab01
3 changed files with 81 additions and 75 deletions

1
lmdb.h
View File

@ -730,6 +730,7 @@ int mdb_env_copyfd(MDB_env *env, mdb_filehandle_t fd);
* <li>#MDB_CP_COMPACT - Perform compaction while copying: omit free * <li>#MDB_CP_COMPACT - Perform compaction while copying: omit free
* pages and sequentially renumber all pages in output. This option * pages and sequentially renumber all pages in output. This option
* consumes more CPU and runs more slowly than the default. * consumes more CPU and runs more slowly than the default.
* Currently it fails if the environment has suffered a page leak.
* </ul> * </ul>
* @return A non-zero error value on failure and 0 on success. * @return A non-zero error value on failure and 0 on success.
*/ */

154
mdb.c
View File

@ -109,7 +109,7 @@
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include <alloca.h> #include <malloc.h>
#include <pthread.h> #include <pthread.h>
#if !(defined(BYTE_ORDER) || defined(__BYTE_ORDER)) #if !(defined(BYTE_ORDER) || defined(__BYTE_ORDER))
@ -9040,13 +9040,14 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
} }
#ifndef MDB_WBUF #ifndef MDB_WBUF
#define MDB_WBUF (1024*1024) # define MDB_WBUF (1024*1024)
#endif #endif
#define MDB_EOF 0x10 /**< #mdb_env_copyfd1() is done reading */
/** State needed for a compacting copy. */ /** State needed for a double-buffering compacting copy. */
typedef struct mdb_copy { typedef struct mdb_copy {
pthread_mutex_t mc_mutex; pthread_mutex_t mc_mutex;
pthread_cond_t mc_cond; pthread_cond_t mc_cond; /**< Condition variable for #mc_new */
char *mc_wbuf[2]; char *mc_wbuf[2];
char *mc_over[2]; char *mc_over[2];
MDB_env *mc_env; MDB_env *mc_env;
@ -9055,10 +9056,9 @@ typedef struct mdb_copy {
int mc_olen[2]; int mc_olen[2];
pgno_t mc_next_pgno; pgno_t mc_next_pgno;
HANDLE mc_fd; HANDLE mc_fd;
int mc_status; int mc_toggle; /**< Buffer number in provider */
volatile int mc_new; int mc_new; /**< (0-2 buffers to write) | (#MDB_EOF at end) */
int mc_toggle; volatile int mc_error; /**< Error code, never cleared if set */
} mdb_copy; } mdb_copy;
/** Dedicated writer thread for compacting copy. */ /** Dedicated writer thread for compacting copy. */
@ -9071,20 +9071,16 @@ mdb_env_copythr(void *arg)
int len; int len;
pthread_mutex_lock(&my->mc_mutex); pthread_mutex_lock(&my->mc_mutex);
my->mc_new = 0;
pthread_cond_signal(&my->mc_cond);
for(;;) { for(;;) {
while (!my->mc_new) while (!my->mc_new)
pthread_cond_wait(&my->mc_cond, &my->mc_mutex); pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
if (my->mc_new < 0) { if (my->mc_new == 0 + MDB_EOF) /* 0 buffers, just EOF */
my->mc_new = 0;
break; break;
}
my->mc_new = 0;
wsize = my->mc_wlen[toggle]; wsize = my->mc_wlen[toggle];
ptr = my->mc_wbuf[toggle]; ptr = my->mc_wbuf[toggle];
again: again:
while (wsize > 0) { rc = MDB_SUCCESS;
while (wsize > 0 && !my->mc_error) {
len = write(my->mc_fd, ptr, wsize); len = write(my->mc_fd, ptr, wsize);
if (len < 0) { if (len < 0) {
rc = errno; rc = errno;
@ -9100,8 +9096,7 @@ again:
} }
} }
if (rc) { if (rc) {
my->mc_status = rc; my->mc_error = rc;
break;
} }
/* If there's an overflow page tail, write it too */ /* If there's an overflow page tail, write it too */
if (my->mc_olen[toggle]) { if (my->mc_olen[toggle]) {
@ -9112,30 +9107,33 @@ again:
} }
my->mc_wlen[toggle] = 0; my->mc_wlen[toggle] = 0;
toggle ^= 1; toggle ^= 1;
/* Return the empty buffer to provider */
my->mc_new--;
pthread_cond_signal(&my->mc_cond); pthread_cond_signal(&my->mc_cond);
} }
pthread_cond_signal(&my->mc_cond);
pthread_mutex_unlock(&my->mc_mutex); pthread_mutex_unlock(&my->mc_mutex);
return (void*)0; return NULL;
} }
/** Tell the writer thread there's a buffer ready to write */ /** Give buffer and/or #MDB_EOF to writer thread, await unused buffer.
*
* @param[in] my control structure.
* @param[in] adjust (1 to hand off 1 buffer) | (MDB_EOF when ending).
*/
static int __cold static int __cold
mdb_env_cthr_toggle(mdb_copy *my, int st) mdb_env_cthr_toggle(mdb_copy *my, int adjust)
{ {
int toggle = my->mc_toggle ^ 1;
pthread_mutex_lock(&my->mc_mutex); pthread_mutex_lock(&my->mc_mutex);
if (my->mc_status) { my->mc_new += adjust;
pthread_mutex_unlock(&my->mc_mutex);
return my->mc_status;
}
while (my->mc_new == 1)
pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
my->mc_new = st;
my->mc_toggle = toggle;
pthread_cond_signal(&my->mc_cond); pthread_cond_signal(&my->mc_cond);
while (my->mc_new & 2) /* both buffers in use */
pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
pthread_mutex_unlock(&my->mc_mutex); pthread_mutex_unlock(&my->mc_mutex);
return 0;
my->mc_toggle ^= (adjust & 1);
/* Both threads reset mc_wlen, to be safe from threading errors */
my->mc_wlen[my->mc_toggle] = 0;
return my->mc_error;
} }
/** Depth-first tree traversal for compacting copy. */ /** Depth-first tree traversal for compacting copy. */
@ -9202,6 +9200,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
} }
memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(&pg, NODEDATA(ni), sizeof(pg));
memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t));
rc = mdb_page_get(txn, pg, &omp, NULL); rc = mdb_page_get(txn, pg, &omp, NULL);
if (rc) if (rc)
goto done; goto done;
@ -9224,7 +9223,6 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
goto done; goto done;
toggle = my->mc_toggle; toggle = my->mc_toggle;
} }
memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
} else if (ni->mn_flags & F_SUBDATA) { } else if (ni->mn_flags & F_SUBDATA) {
MDB_db db; MDB_db db;
@ -9302,34 +9300,33 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
{ {
MDB_meta *mm; MDB_meta *mm;
MDB_page *mp; MDB_page *mp;
mdb_copy my; mdb_copy my = {0};
MDB_txn *txn = NULL; MDB_txn *txn = NULL;
pthread_t thr; pthread_t thr;
int rc; pgno_t root, new_root;
int rc = MDB_SUCCESS;
pthread_mutex_init(&my.mc_mutex, NULL); if ((rc = pthread_mutex_init(&my.mc_mutex, NULL)) != 0)
pthread_cond_init(&my.mc_cond, NULL);
rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_os_psize, MDB_WBUF*2);
if (rc)
return rc; return rc;
if ((rc = pthread_cond_init(&my.mc_cond, NULL)) != 0)
goto done2;
my.mc_wbuf[0] = memalign(env->me_os_psize, MDB_WBUF*2);
if (my.mc_wbuf[0] == NULL) {
rc = errno;
goto done;
}
memset(my.mc_wbuf[0], 0, MDB_WBUF*2); memset(my.mc_wbuf[0], 0, MDB_WBUF*2);
my.mc_wbuf[1] = my.mc_wbuf[0] + MDB_WBUF; my.mc_wbuf[1] = my.mc_wbuf[0] + MDB_WBUF;
my.mc_wlen[0] = 0;
my.mc_wlen[1] = 0;
my.mc_olen[0] = 0;
my.mc_olen[1] = 0;
my.mc_next_pgno = NUM_METAS; my.mc_next_pgno = NUM_METAS;
my.mc_status = 0;
my.mc_new = 1;
my.mc_toggle = 0;
my.mc_env = env; my.mc_env = env;
my.mc_fd = fd; my.mc_fd = fd;
rc = pthread_create(&thr, NULL, mdb_env_copythr, &my); rc = pthread_create(&thr, NULL, mdb_env_copythr, &my);
assert(rc == 0); if (rc)
goto done;
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn); rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn);
if (rc) if (rc)
return rc; goto finish;
mp = (MDB_page *)my.mc_wbuf[0]; mp = (MDB_page *)my.mc_wbuf[0];
memset(mp, 0, NUM_METAS * env->me_psize); memset(mp, 0, NUM_METAS * env->me_psize);
@ -9345,51 +9342,58 @@ mdb_env_copyfd1(MDB_env *env, HANDLE fd)
*(MDB_meta *)PAGEDATA(mp) = *mm; *(MDB_meta *)PAGEDATA(mp) = *mm;
mm = (MDB_meta *)PAGEDATA(mp); mm = (MDB_meta *)PAGEDATA(mp);
/* Count the number of free pages, subtract from lastpg to find /* Set metapage 1 with current main DB */
* number of active pages root = new_root = txn->mt_dbs[MAIN_DBI].md_root;
*/ if (root != P_INVALID) {
{ /* Count free pages + freeDB pages. Subtract from last_pg
* to find the new last_pg, which also becomes the new root.
*/
MDB_ID freecount = 0; MDB_ID freecount = 0;
MDB_cursor mc; MDB_cursor mc;
MDB_val key, data; MDB_val key, data;
mdb_cursor_init(&mc, txn, FREE_DBI, NULL); mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0) while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
freecount += *(MDB_ID *)data.mv_data; freecount += *(MDB_ID *)data.mv_data;
if (rc != MDB_NOTFOUND)
goto finish;
freecount += txn->mt_dbs[FREE_DBI].md_branch_pages + freecount += txn->mt_dbs[FREE_DBI].md_branch_pages +
txn->mt_dbs[FREE_DBI].md_leaf_pages + txn->mt_dbs[FREE_DBI].md_leaf_pages +
txn->mt_dbs[FREE_DBI].md_overflow_pages; txn->mt_dbs[FREE_DBI].md_overflow_pages;
/* Set metapage 1 */ new_root = txn->mt_next_pgno - 1 - freecount;
mm->mm_last_pg = txn->mt_next_pgno - freecount - 1; mm->mm_last_pg = new_root;
mm->mm_dbs[MAIN_DBI] = txn->mt_dbs[MAIN_DBI]; mm->mm_dbs[MAIN_DBI] = txn->mt_dbs[MAIN_DBI];
if (mm->mm_last_pg > NUM_METAS-1) { mm->mm_dbs[MAIN_DBI].md_root = new_root;
mm->mm_dbs[MAIN_DBI].md_root = mm->mm_last_pg; } else {
mm->mm_txnid = 1; /* When the DB is empty, handle it specially to
} else { * fix any breakage like page leaks from ITS#8174.
mm->mm_dbs[MAIN_DBI].md_root = P_INVALID; */
} mm->mm_dbs[MAIN_DBI].md_flags = txn->mt_dbs[MAIN_DBI].md_flags;
} }
if (root != P_INVALID || mm->mm_dbs[MAIN_DBI].md_flags) {
mm->mm_txnid = 1; /* use metapage 1 */
}
my.mc_wlen[0] = env->me_psize * NUM_METAS; my.mc_wlen[0] = env->me_psize * NUM_METAS;
my.mc_txn = txn; my.mc_txn = txn;
pthread_mutex_lock(&my.mc_mutex); rc = mdb_env_cwalk(&my, &root, 0);
while(my.mc_new) if (rc == MDB_SUCCESS && root != new_root) {
pthread_cond_wait(&my.mc_cond, &my.mc_mutex); rc = MDB_INCOMPATIBLE; /* page leak or corrupt DB */
pthread_mutex_unlock(&my.mc_mutex); }
rc = mdb_env_cwalk(&my, &txn->mt_dbs[MAIN_DBI].md_root, 0);
if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
rc = mdb_env_cthr_toggle(&my, 1);
mdb_env_cthr_toggle(&my, -1);
pthread_mutex_lock(&my.mc_mutex);
while(my.mc_new)
pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
pthread_mutex_unlock(&my.mc_mutex);
pthread_join(thr, NULL);
finish:
if (rc)
my.mc_error = rc;
mdb_env_cthr_toggle(&my, 1 | MDB_EOF);
rc = pthread_join(thr, NULL);
mdb_txn_abort(txn); mdb_txn_abort(txn);
pthread_cond_destroy(&my.mc_cond);
pthread_mutex_destroy(&my.mc_mutex); done:
free(my.mc_wbuf[0]); free(my.mc_wbuf[0]);
return rc; pthread_cond_destroy(&my.mc_cond);
done2:
pthread_mutex_destroy(&my.mc_mutex);
return rc ? rc : my.mc_error;
} }
/** Copy environment as-is. */ /** Copy environment as-is. */

View File

@ -56,6 +56,7 @@ Write the library version number to the standard output, and exit.
Compact while copying. Only current data pages will be copied; freed Compact while copying. Only current data pages will be copied; freed
or unused pages will be omitted from the copy. This option will or unused pages will be omitted from the copy. This option will
slow down the backup process as it is more CPU-intensive. slow down the backup process as it is more CPU-intensive.
Currently it fails if the environment has suffered a page leak.
.TP .TP
.BR \-n .BR \-n
Open LDMB environment(s) which do not use subdirectories. Open LDMB environment(s) which do not use subdirectories.