lmdb: deferred cleanup of reader's threads.

This should fix https://github.com/ReOpen/ReOpenLDAP/issues/48

Change-Id: I67feb3a9852f183dcbe83626321896ba43a591b7
This commit is contained in:
Leo Yuriev 2015-09-07 02:38:10 +03:00
parent 83ba70e3be
commit 8e2a9a9a78

98
mdb.c
View File

@ -516,6 +516,8 @@ typedef struct MDB_rxbody {
volatile pid_t mrb_pid;
/** The thread ID of the thread owning this txn. */
volatile pthread_t mrb_tid;
/** Pointer to the context for deferred cleanup reader thread. */
struct MDB_rthc *mrb_rthc;
} MDB_rxbody;
/** The actual reader record, with cacheline padding. */
@ -526,6 +528,7 @@ typedef struct MDB_reader {
#define mr_txnid mru.mrx.mrb_txnid
#define mr_pid mru.mrx.mrb_pid
#define mr_tid mru.mrx.mrb_tid
#define mr_rthc mru.mrx.mrb_rthc
/** cache line alignment */
char pad[(sizeof(MDB_rxbody)+CACHELINE_SIZE-1) & ~(CACHELINE_SIZE-1)];
} mru;
@ -1033,6 +1036,11 @@ typedef struct MDB_pgstate {
txnid_t mf_pglast; /**< ID of last used record, or 0 if !mf_pghead */
} MDB_pgstate;
/** Context for deferred cleanup of reader's threads.
* to avoid https://github.com/ReOpen/ReOpenLDAP/issues/48 */
struct MDB_rthc {
MDB_reader *rc_reader;
};
/** The database environment. */
struct MDB_env {
HANDLE me_fd; /**< The main data file */
@ -1353,20 +1361,20 @@ mdb_debug_log(int type, const char *function, int line,
} while(0)
#define mdb_ensure_msg(env, expr, msg) \
do \
do { \
if (unlikely(!(expr))) \
mdb_assert_fail(env, msg, __FUNCTION__, __LINE__); \
while(0)
} while(0)
#define mdb_ensure(env, expr) \
mdb_ensure_msg(env, expr, #expr)
/** assert(3) variant in environment context */
#define mdb_assert(env, expr) \
do \
do { \
if (mdb_assert_enabled()) \
mdb_ensure(env, expr); \
while(0)
} while(0)
/** assert(3) variant in cursor context */
#define mdb_cassert(mc, expr) \
@ -2774,9 +2782,31 @@ mdb_txn_renew0(MDB_txn *txn)
int rc, new_notls = 0;
if ((flags &= MDB_TXN_RDONLY) != 0) {
MDB_reader *r = (env->me_flags & MDB_NOTLS)
? txn->mt_u.reader : pthread_getspecific(env->me_txkey);
struct MDB_rthc* rthc = NULL;
MDB_reader *r = NULL;
if (likely(env->me_flags & MDB_ENV_TXKEY)) {
mdb_assert(env, !(env->me_flags & MDB_NOTLS));
rthc = pthread_getspecific(env->me_txkey);
if (unlikely(! rthc)) {
rthc = calloc(1, sizeof(struct MDB_rthc));
if (unlikely(! rthc))
return ENOMEM;
rc = pthread_setspecific(env->me_txkey, rthc);
if (unlikely(rc)) {
free(rthc);
return rc;
}
}
r = rthc->rc_reader;
if (r) {
mdb_assert(env, r->mr_pid == env->me_pid);
mdb_assert(env, r->mr_tid == pthread_self());
mdb_assert(env, r->mr_rthc == rthc);
}
} else {
mdb_assert(env, env->me_flags & MDB_NOTLS);
r = txn->mt_u.reader;
}
if (likely(r)) {
if (unlikely(r->mr_pid != env->me_pid || r->mr_txnid != (txnid_t)-1))
@ -2822,11 +2852,11 @@ mdb_txn_renew0(MDB_txn *txn)
r->mr_pid = pid;
mdb_mutex_unlock(env, rmutex);
new_notls = (env->me_flags & MDB_NOTLS);
if (!new_notls && unlikely(rc=pthread_setspecific(env->me_txkey, r))) {
r->mr_pid = 0;
mdb_coherent_barrier();
return rc;
new_notls = MDB_END_SLOT;
if (likely(rthc)) {
rthc->rc_reader = r;
r->mr_rthc = rthc;
new_notls = 0;
}
}
@ -4394,18 +4424,36 @@ mdb_env_open2(MDB_env *env, MDB_meta *meta)
return MDB_SUCCESS;
}
static pthread_mutex_t mdb_rthc_lock = PTHREAD_MUTEX_INITIALIZER;
/** Release a reader thread's slot in the reader lock table.
* This function is called automatically when a thread exits.
* @param[in] ptr This points to the slot in the reader lock table.
* @param[in] ptr This points to the MDB_rthc of a slot in the reader lock table.
*/
/* LY: TODO: Yet another problem is here - segfault in case if a DSO will
* be unloaded before a thread would been finished. */
static void
mdb_env_reader_dest(void *ptr)
{
MDB_reader *reader = ptr;
struct MDB_rthc* rthc = ptr;
MDB_reader *reader;
reader->mr_pid = 0;
mdb_coherent_barrier();
mdb_ensure(NULL, pthread_mutex_lock(&mdb_rthc_lock) == 0);
/* LY: Here may be a race with mdb_env_close(),
* see https://github.com/ReOpen/ReOpenLDAP/issues/48
*/
reader = rthc->rc_reader;
if (reader) {
mdb_ensure(NULL, reader->mr_rthc == rthc);
rthc->rc_reader = NULL;
reader->mr_rthc = NULL;
mdb_compiler_barrier();
reader->mr_pid = 0;
mdb_coherent_barrier();
}
mdb_ensure(NULL, pthread_mutex_unlock(&mdb_rthc_lock) == 0);
free(rthc);
}
/** Downgrade the exclusive lock on the region back to shared */
@ -4858,10 +4906,20 @@ mdb_env_close0(MDB_env *env)
* data owned by this process (me_close_readers and
* our readers), and clear each reader atomically.
*/
for (i = env->me_close_readers; --i >= 0; )
if (env->me_txns->mti_readers[i].mr_pid == pid)
env->me_txns->mti_readers[i].mr_pid = 0;
mdb_ensure(env, pthread_mutex_lock(&mdb_rthc_lock) == 0);
for (i = env->me_close_readers; --i >= 0; ) {
MDB_reader *reader = &env->me_txns->mti_readers[i];
if (reader->mr_pid == pid) {
mdb_ensure(env, reader->mr_rthc->rc_reader == reader);
reader->mr_rthc->rc_reader = NULL;
reader->mr_rthc = NULL;
mdb_compiler_barrier();
reader->mr_pid = 0;
}
}
mdb_coherent_barrier();
mdb_ensure(env, pthread_mutex_unlock(&mdb_rthc_lock) == 0);
munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
if (env->me_lfd != INVALID_HANDLE_VALUE) {
@ -9703,6 +9761,8 @@ mdb_reader_check0(MDB_env *env, int rlocked, int *dead)
if (mr[j].mr_pid == pid) {
mdb_debug("clear stale reader pid %u txn %zd",
(unsigned) pid, mr[j].mr_txnid);
mr[j].mr_rthc = NULL;
mdb_compiler_barrier();
mr[j].mr_pid = 0;
count++;
}