lmdb: seek to steady meta-page on db-open.

This is 7/9 for https://github.com/ReOpen/ReOpenLDAP/issues/1
and https://github.com/ReOpen/ReOpenLDAP/issues/2

Change-Id: If59a6bfc7c6198e6b1e85f4bdc5b534ecff03123
This commit is contained in:
Leo Yuriev 2015-05-12 21:10:43 +03:00
parent 633f2a10ad
commit 21705fd9a2

188
mdb.c
View File

@ -1891,21 +1891,62 @@ static uint64_t mdb_meta_sign(MDB_meta *meta) {
return (sign > MDB_DATASIGN_WEAK) ? sign : ~sign;
}
/** Check both meta pages to see which one is newer.
* @param[in] env the environment handle
* @return pointer to last meta-page.
*/
static MDB_meta*
mdb_env_meta_head(const MDB_env *env) {
static MDB_meta* mdb_meta_head_w(MDB_env *env) {
MDB_meta* a = METAPAGE_1(env);
MDB_meta* b = METAPAGE_2(env);
return (a->mm_txnid > b->mm_txnid) ? a : b;
txnid_t head_txnid = env->me_txns->mti_txnid;
mdb_assert(env, a->mm_txnid != b->mm_txnid || head_txnid == 0);
if (likely(a->mm_txnid == head_txnid))
return a;
if (likely(b->mm_txnid == head_txnid))
return b;
mdb_assert(env, head_txnid == a->mm_txnid || head_txnid == b->mm_txnid);
env->me_flags |= MDB_FATAL_ERROR;
return a;
}
static MDB_meta* mdb_meta_head_r(MDB_env *env) {
MDB_meta* a = METAPAGE_1(env);
MDB_meta* b = METAPAGE_2(env), *h;
txnid_t head_txnid;
int loop = 0, rc;
do {
head_txnid = env->me_txns->mti_txnid;
mdb_assert(env, a->mm_txnid != b->mm_txnid || head_txnid == 0);
if (likely(a->mm_txnid == head_txnid))
return a;
if (likely(b->mm_txnid == head_txnid))
return b;
/* LY: got a race on env->me_txns->mti_txnid with mdb_env_sync0() */
#if defined(__i386__) || defined(__x86_64__)
__asm__ __volatile__("pause");
#endif
mdb_coherent_barrier();
if (loop > 2)
pthread_yield();
} while (++loop < 5);
rc = mdb_mutex_lock(env, MDB_MUTEX(env, w));
h = mdb_meta_head_w(env);
if (rc == 0)
mdb_mutex_unlock(env, MDB_MUTEX(env, w));
return h;
}
static MDB_meta* mdb_env_meta_flipflop(const MDB_env *env, MDB_meta* meta) {
return (meta == METAPAGE_1(env)) ? METAPAGE_2(env) : METAPAGE_1(env);
}
static int mdb_meta_lt(MDB_meta* a, MDB_meta* b) {
return (META_IS_STEADY(a) == META_IS_STEADY(b))
? a->mm_txnid < b->mm_txnid : META_IS_STEADY(b);
}
/** Find oldest txnid still referenced. */
static txnid_t
mdb_find_oldest(MDB_env *env, int *laggard)
@ -1941,7 +1982,7 @@ mdb_oomkick(MDB_env *env)
{
int reader, retry;
txnid_t snap, oldest = mdb_find_oldest(env, &reader);
MDB_meta* head = mdb_env_meta_head(env);
MDB_meta* head = mdb_meta_head_w(env);
MDB_meta* tail = mdb_env_meta_flipflop(env, head);
if (META_IS_WEAK(head) && oldest == tail->mm_txnid) {
@ -1980,12 +2021,12 @@ mdb_oomkick(MDB_env *env)
continue;
rc = env->me_oom_func(env, pid, (void*) tid, oldest,
mdb_env_meta_head(env)->mm_txnid - oldest, retry);
mdb_meta_head_w(env)->mm_txnid - oldest, retry);
if (rc < 0)
break;
if (rc) {
r->mr_txnid = (txnid_t)-1;
r->mr_txnid = (txnid_t)-1L;
if (rc > 1) {
r->mr_tid = 0;
r->mr_pid = 0;
@ -2505,7 +2546,7 @@ mdb_env_sync(MDB_env *env, int force)
if (unlikely(flags & (MDB_RDONLY | MDB_FATAL_ERROR)))
return EACCES;
head = mdb_env_meta_head(env);
head = mdb_meta_head_r(env);
if (force || head->mm_mapsize != env->me_mapsize)
flags &= MDB_WRITEMAP;
@ -2523,7 +2564,7 @@ mdb_env_sync(MDB_env *env, int force)
} else if (fdatasync(env->me_fd))
return errno;
/* LY: head may be changed during the sync. */
head = mdb_env_meta_head(env);
head = mdb_meta_head_r(env);
}
if (! META_IS_WEAK(head) && env->me_sync_pending == 0
@ -2537,7 +2578,7 @@ mdb_env_sync(MDB_env *env, int force)
return rc;
/* LY: head may be changed while the mutex has been acquired. */
head = mdb_env_meta_head(env);
head = mdb_meta_head_w(env);
rc = MDB_SUCCESS;
if (META_IS_WEAK(head) || env->me_sync_pending != 0
|| env->me_mapsize != head->mm_mapsize) {
@ -2731,7 +2772,7 @@ mdb_txn_renew0(MDB_txn *txn)
}
do { /* LY: Retry on a race, ITS#7970. */
meta = mdb_env_meta_head(env);
meta = mdb_meta_head_r(env);
r->mr_txnid = meta->mm_txnid;
mdb_coherent_barrier();
memcpy(txn->mt_dbs, meta->mm_dbs, 2 * sizeof(MDB_db));
@ -2746,7 +2787,7 @@ mdb_txn_renew0(MDB_txn *txn)
if (unlikely(rc))
return rc;
meta = mdb_env_meta_head(env);
meta = mdb_meta_head_w(env);
txn->mt_txnid = meta->mm_txnid;
/* Setup db info */
@ -2991,7 +3032,7 @@ mdb_txn_straggler(MDB_txn *txn, int *percent)
return -1;
env = txn->mt_env;
meta = mdb_env_meta_head(env);
meta = mdb_meta_head_r(env);
if (percent) {
long cent = env->me_maxpg / 100;
long last = env->me_txn ? env->me_txn0->mt_next_pgno : meta->mm_last_pg;
@ -3794,6 +3835,8 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta)
* Read both meta pages so we can use the latest one.
*/
meta->mm_datasync_sign = MDB_DATASIGN_WEAK;
meta->mm_txnid = 0;
for (i=off=0; i<2; i++, off = meta->mm_psize) {
rc = pread(env->me_fd, &pbuf, Size, off);
if (rc != Size) {
@ -3823,10 +3866,18 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta)
return MDB_VERSION_MISMATCH;
}
if (off == 0 || m->mm_txnid > meta->mm_txnid)
if (m->mm_datasync_sign > MDB_DATASIGN_WEAK && m->mm_datasync_sign != mdb_meta_sign(m))
continue;
if (mdb_meta_lt(meta, m))
*meta = *m;
}
return 0;
if (meta->mm_datasync_sign == MDB_DATASIGN_WEAK)
/* LY: Both meta-pages are weak. */
return MDB_CORRUPTED;
return MDB_SUCCESS;
}
/** Fill in most of the zeroed #MDB_meta for an empty database environment */
@ -3890,7 +3941,7 @@ static int
mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending)
{
int rc;
MDB_meta* head = mdb_env_meta_head(env);
MDB_meta* head = mdb_meta_head_w(env);
size_t prev_mapsize = head->mm_mapsize;
MDB_meta* tail = META_IS_WEAK(head) ? head : mdb_env_meta_flipflop(env, head);
off_t offset = (char*) tail - env->me_map;
@ -3898,8 +3949,7 @@ mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending)
mdb_assert(env, (env->me_flags & (MDB_RDONLY | MDB_FATAL_ERROR)) == 0);
mdb_assert(env, META_IS_WEAK(head) || env->me_sync_pending != 0
|| env->me_mapsize != prev_mapsize);
mdb_assert(env, pending->mm_txnid > head->mm_txnid
|| (pending->mm_txnid == head->mm_txnid && META_IS_WEAK(head)));
mdb_assert(env, pending->mm_txnid > head->mm_txnid || META_IS_WEAK(head));
mdb_assert(env, pending->mm_txnid > tail->mm_txnid || META_IS_WEAK(tail));
MDB_meta* stay = mdb_env_meta_flipflop(env, tail);
@ -3991,8 +4041,9 @@ mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending)
}
mdb_invalidate_cache(env->me_map + offset, sizeof(MDB_meta));
}
env->me_txns->mti_txnid = pending->mm_txnid;
/* LY: step#3 - sync updated meta-pages. */
/* LY: step#3 - sync meta-pages. */
if ((flags & (MDB_NOSYNC | MDB_NOMETASYNC)) == 0) {
if (env->me_flags & MDB_WRITEMAP) {
char* ptr = env->me_map + (offset & ~(env->me_os_psize - 1));
@ -4120,7 +4171,7 @@ mdb_env_set_mapsize(MDB_env *env, size_t size)
void *old;
if (env->me_txn)
return EINVAL;
meta = mdb_env_meta_head(env);
meta = mdb_meta_head_w(env);
if (!size)
size = meta->mm_mapsize;
{
@ -4188,13 +4239,12 @@ mdb_fsize(HANDLE fd, size_t *size)
/** Further setup required for opening an LMDB environment
*/
static int ESECT
mdb_env_open2(MDB_env *env)
mdb_env_open2(MDB_env *env, MDB_meta *meta)
{
unsigned flags = env->me_flags;
int i, newenv = 0, rc;
MDB_meta meta;
if ((i = mdb_env_read_header(env, &meta)) != 0) {
if ((i = mdb_env_read_header(env, meta)) != 0) {
if (i != ENOENT)
return i;
mdb_debug("new mdbenv");
@ -4202,26 +4252,26 @@ mdb_env_open2(MDB_env *env)
env->me_psize = env->me_os_psize;
if (env->me_psize > MAX_PAGESIZE)
env->me_psize = MAX_PAGESIZE;
memset(&meta, 0, sizeof(meta));
mdb_env_init_meta0(env, &meta);
meta.mm_mapsize = DEFAULT_MAPSIZE;
memset(meta, 0, sizeof(*meta));
mdb_env_init_meta0(env, meta);
meta->mm_mapsize = DEFAULT_MAPSIZE;
} else {
env->me_psize = meta.mm_psize;
env->me_psize = meta->mm_psize;
}
/* Was a mapsize configured? */
if (!env->me_mapsize) {
env->me_mapsize = meta.mm_mapsize;
env->me_mapsize = meta->mm_mapsize;
}
{
/* Make sure mapsize >= committed data size. Even when using
* mm_mapsize, which could be broken in old files (ITS#7789).
*/
size_t minsize = (meta.mm_last_pg + 1) * meta.mm_psize;
size_t minsize = (meta->mm_last_pg + 1) * meta->mm_psize;
if (env->me_mapsize < minsize)
env->me_mapsize = minsize;
}
meta.mm_mapsize = env->me_mapsize;
meta->mm_mapsize = env->me_mapsize;
if (newenv && !(flags & MDB_FIXEDMAP)) {
/* mdb_env_map() may grow the datafile. Write the metapages
@ -4231,20 +4281,20 @@ mdb_env_open2(MDB_env *env)
* program might end up doing that - one with a memory layout
* and map address which does not suit the main program.
*/
rc = mdb_env_init_meta(env, &meta);
rc = mdb_env_init_meta(env, meta);
if (rc)
return rc;
newenv = 0;
}
rc = mdb_env_map(env, (flags & MDB_FIXEDMAP) ? meta.mm_address : NULL);
rc = mdb_env_map(env, (flags & MDB_FIXEDMAP) ? meta->mm_address : NULL);
if (rc)
return rc;
if (newenv) {
if (flags & MDB_FIXEDMAP)
meta.mm_address = env->me_map;
i = mdb_env_init_meta(env, &meta);
meta->mm_address = env->me_map;
i = mdb_env_init_meta(env, meta);
if (i != MDB_SUCCESS) {
return i;
}
@ -4258,24 +4308,6 @@ mdb_env_open2(MDB_env *env)
#endif
env->me_maxpg = env->me_mapsize / env->me_psize;
#if MDB_DEBUG
{
MDB_meta *meta = mdb_env_meta_head(env);
MDB_db *db = &meta->mm_dbs[MAIN_DBI];
int toggle = ((char*) meta == PAGEDATA(env->me_map)) ? 0 : 1;
mdb_debug("opened database version %u, pagesize %u",
meta->mm_version, env->me_psize);
mdb_debug("using meta page %d, txn %zu", toggle, meta->mm_txnid);
mdb_debug("depth: %u", db->md_depth);
mdb_debug("entries: %zu", db->md_entries);
mdb_debug("branch pages: %zu", db->md_branch_pages);
mdb_debug("leaf pages: %zu", db->md_leaf_pages);
mdb_debug("overflow pages: %zu", db->md_overflow_pages);
mdb_debug("root: %zu", db->md_root);
}
#endif
return MDB_SUCCESS;
}
@ -4295,10 +4327,9 @@ mdb_env_reader_dest(void *ptr)
/** Downgrade the exclusive lock on the region back to shared */
static int ESECT
mdb_env_share_locks(MDB_env *env, int *excl)
mdb_env_share_locks(MDB_env *env, int *excl, MDB_meta *meta)
{
struct flock lock_info;
MDB_meta *meta = mdb_env_meta_head(env);
int rc = 0;
env->me_txns->mti_txnid = meta->mm_txnid;
@ -4507,7 +4538,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
env->me_txns->mti_magic = MDB_MAGIC;
env->me_txns->mti_format = MDB_LOCK_FORMAT;
env->me_txns->mti_txnid = 0;
env->me_txns->mti_txnid = ~0L;
env->me_txns->mti_numreaders = 0;
} else {
if (env->me_txns->mti_magic != MDB_MAGIC) {
@ -4629,10 +4660,11 @@ mdb_env_open(MDB_env *env, const char *path, unsigned flags, mode_t mode)
goto leave;
}
if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) {
MDB_meta meta;
if ((rc = mdb_env_open2(env, &meta)) == MDB_SUCCESS) {
mdb_debug("opened dbenv %p", (void *) env);
if (excl > 0) {
rc = mdb_env_share_locks(env, &excl);
rc = mdb_env_share_locks(env, &excl, &meta);
if (rc)
goto leave;
}
@ -4656,6 +4688,24 @@ mdb_env_open(MDB_env *env, const char *path, unsigned flags, mode_t mode)
}
}
#if MDB_DEBUG
{
MDB_meta *meta = mdb_meta_head_r(env);
MDB_db *db = &meta->mm_dbs[MAIN_DBI];
int toggle = ((char*) meta == PAGEDATA(env->me_map)) ? 0 : 1;
mdb_debug("opened database version %u, pagesize %u",
meta->mm_version, env->me_psize);
mdb_debug("using meta page %d, txn %zu", toggle, meta->mm_txnid);
mdb_debug("depth: %u", db->md_depth);
mdb_debug("entries: %zu", db->md_entries);
mdb_debug("branch pages: %zu", db->md_branch_pages);
mdb_debug("leaf pages: %zu", db->md_leaf_pages);
mdb_debug("overflow pages: %zu", db->md_overflow_pages);
mdb_debug("root: %zu", db->md_root);
}
#endif
leave:
if (rc)
mdb_env_close0(env);
@ -8982,7 +9032,7 @@ mdb_env_stat(MDB_env *env, MDB_stat *arg)
if (env == NULL || arg == NULL)
return EINVAL;
meta = mdb_env_meta_head(env);
meta = mdb_meta_head_r(env);
return mdb_stat0(env, &meta->mm_dbs[MAIN_DBI], arg);
}
@ -8994,7 +9044,7 @@ mdb_env_info(MDB_env *env, MDB_envinfo *arg)
if (env == NULL || arg == NULL)
return EINVAL;
meta = mdb_env_meta_head(env);
meta = mdb_meta_head_r(env);
arg->me_mapaddr = meta->mm_address;
arg->me_mapsize = env->me_mapsize;
arg->me_maxreaders = env->me_maxreaders;
@ -9513,7 +9563,6 @@ static int mdb_mutex_failed(MDB_env *env, pthread_mutex_t *mutex, int rc)
#ifdef EOWNERDEAD
if (unlikely(rc == EOWNERDEAD)) {
int rlocked, rc2;
MDB_meta *meta;
/* We own the mutex. Clean up after dead previous owner. */
rc = MDB_SUCCESS;
@ -9522,8 +9571,13 @@ static int mdb_mutex_failed(MDB_env *env, pthread_mutex_t *mutex, int rc)
/* Keep mti_txnid updated, otherwise next writer can
* overwrite data which latest meta page refers to.
*/
meta = mdb_env_meta_head(env);
#if 0
/* LY: Hm, how this can happen, if the mti_txnid
* is updating only at the finish of a successful commit ? */
MDB_meta *meta = mdb_env_meta_head(env);
env->me_txns->mti_txnid = meta->mm_txnid;
#endif
/* env is hosed if the dead thread was ours */
if (env->me_txn) {
env->me_flags |= MDB_FATAL_ERROR;
@ -9544,8 +9598,10 @@ static int mdb_mutex_failed(MDB_env *env, pthread_mutex_t *mutex, int rc)
#endif /* EOWNERDEAD */
if (unlikely(rc)) {
mdb_debug("lock mutex failed, %s", mdb_strerror(rc));
env->me_flags |= MDB_FATAL_ERROR;
rc = MDB_PANIC;
if (rc != EDEADLK) {
env->me_flags |= MDB_FATAL_ERROR;
rc = MDB_PANIC;
}
}
return rc;