mdbx: add 'mti_reader_finished_flag' for speedup find_oldesd().

Change-Id: I4a2c8b80efad0cfc12918969125d258043cbffba
This commit is contained in:
Leo Yuriev 2017-06-30 00:20:33 +03:00
parent acfa096aba
commit 186d2ee065
3 changed files with 56 additions and 38 deletions

View File

@ -402,7 +402,13 @@ typedef struct MDBX_lockinfo {
volatile txnid_t mti_oldest;
uint64_t align_oldest;
};
uint8_t pad_align[MDBX_CACHELINE_SIZE - sizeof(uint64_t) * 6];
union {
volatile uint32_t mti_reader_finished_flag;
uint64_t align_reader_finished_flag;
};
uint8_t pad_align[MDBX_CACHELINE_SIZE - sizeof(uint64_t) * 7];
MDBX_reader __cache_aligned mti_readers[1];
} MDBX_lockinfo;

View File

@ -367,6 +367,8 @@
#define MDBX_TETRAD(a, b, c, d) \
((uint32_t)(a) << 24 | (uint32_t)(b) << 16 | (uint32_t)(c) << 8 | (d))
#define MDBX_STRING_TETRAD(str) MDBX_TETRAD(str[0], str[1], str[2], str[3])
#define FIXME "FIXME: " __FILE__ ", " STRINGIFY(__LINE__)
#ifndef STATIC_ASSERT_MSG

View File

@ -1481,27 +1481,33 @@ static const char *mdbx_durable_str(const MDBX_meta *const meta) {
/* Find oldest txnid still referenced. */
static txnid_t mdbx_find_oldest(MDBX_txn *txn) {
MDBX_env *env = txn->mt_env;
mdbx_assert(env, (txn->mt_flags & MDBX_RDONLY) == 0);
mdbx_tassert(txn, (txn->mt_flags & MDBX_RDONLY) == 0);
MDBX_lockinfo *const lck = txn->mt_env->me_lck;
const txnid_t last_oldest = env->me_oldest[0];
const txnid_t last_oldest = lck->mti_oldest;
txnid_t oldest = txn->mt_txnid - 1;
mdbx_assert(env, oldest >= last_oldest);
mdbx_tassert(txn, oldest >= last_oldest);
if (last_oldest == oldest ||
lck->mti_reader_finished_flag == MDBX_STRING_TETRAD("None"))
return last_oldest;
const MDBX_reader *const rtbl = env->me_lck->mti_readers;
for (int i = env->me_lck->mti_numreaders;
oldest != last_oldest && --i >= 0;) {
if (rtbl[i].mr_pid) {
const unsigned snap_nreaders = lck->mti_numreaders;
lck->mti_reader_finished_flag = MDBX_STRING_TETRAD("None");
for (unsigned i = 0; i < snap_nreaders; ++i) {
if (lck->mti_readers[i].mr_pid) {
mdbx_jitter4testing(true);
const txnid_t snap = rtbl[i].mr_txnid;
if (oldest > snap && last_oldest <= /* ignore pending updates */ snap)
const txnid_t snap = lck->mti_readers[i].mr_txnid;
if (oldest > snap && last_oldest <= /* ignore pending updates */ snap) {
oldest = snap;
if (oldest == last_oldest)
break;
}
}
}
if (oldest != last_oldest) {
mdbx_assert(env, oldest >= env->me_oldest[0]);
env->me_oldest[0] = oldest;
mdbx_tassert(txn, oldest >= lck->mti_oldest);
lck->mti_oldest = oldest;
}
return oldest;
}
@ -2251,7 +2257,6 @@ static void mdbx_cursors_eot(MDBX_txn *txn, unsigned merge) {
/* Common code for mdbx_txn_begin() and mdbx_txn_renew(). */
static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
MDBX_env *env = txn->mt_env;
unsigned i, nr;
int rc;
if (unlikely(env->me_pid != mdbx_getpid())) {
@ -2277,6 +2282,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
if (unlikely(r->mr_pid != env->me_pid || r->mr_txnid != ~(txnid_t)0))
return MDBX_BAD_RSLOT;
} else if (env->me_lck) {
unsigned slot, nreaders;
const mdbx_pid_t pid = env->me_pid;
const mdbx_tid_t tid = mdbx_thread_self();
mdbx_assert(env, env->me_lck->mti_magic_and_version == MDBX_LOCK_MAGIC);
@ -2297,12 +2303,12 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
}
while (1) {
nr = env->me_lck->mti_numreaders;
for (i = 0; i < nr; i++)
if (env->me_lck->mti_readers[i].mr_pid == 0)
nreaders = env->me_lck->mti_numreaders;
for (slot = 0; slot < nreaders; slot++)
if (env->me_lck->mti_readers[slot].mr_pid == 0)
break;
if (likely(i < env->me_maxreaders))
if (likely(slot < env->me_maxreaders))
break;
rc = mdbx_reader_check0(env, true, NULL);
@ -2315,7 +2321,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
STATIC_ASSERT(sizeof(MDBX_reader) == MDBX_CACHELINE_SIZE);
STATIC_ASSERT(
offsetof(MDBX_lockinfo, mti_readers) % MDBX_CACHELINE_SIZE == 0);
r = &env->me_lck->mti_readers[i];
r = &env->me_lck->mti_readers[slot];
/* Claim the reader slot, carefully since other code
* uses the reader table un-mutexed: First reset the
* slot, next publish it in mtb.mti_numreaders. After
@ -2325,10 +2331,10 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
r->mr_txnid = ~(txnid_t)0;
r->mr_tid = tid;
mdbx_coherent_barrier();
if (i == nr)
env->me_lck->mti_numreaders = ++nr;
if (env->me_close_readers < nr)
env->me_close_readers = nr;
if (slot == nreaders)
env->me_lck->mti_numreaders = ++nreaders;
if (env->me_close_readers < nreaders)
env->me_close_readers = nreaders;
r->mr_pid = pid;
mdbx_rdt_unlock(env);
@ -2412,7 +2418,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
/* Setup db info */
txn->mt_numdbs = env->me_numdbs;
for (i = CORE_DBS; i < txn->mt_numdbs; i++) {
for (unsigned i = CORE_DBS; i < txn->mt_numdbs; i++) {
unsigned x = env->me_dbflags[i];
txn->mt_dbs[i].md_flags = x & PERSISTENT_FLAGS;
txn->mt_dbflags[i] =
@ -2647,6 +2653,7 @@ static int mdbx_txn_end(MDBX_txn *txn, unsigned mode) {
if (F_ISSET(txn->mt_flags, MDBX_TXN_RDONLY)) {
if (txn->mt_ro_reader) {
txn->mt_ro_reader->mr_txnid = ~(txnid_t)0;
env->me_lck->mti_reader_finished_flag = true;
if (mode & MDBX_END_SLOT) {
if ((env->me_flags & MDBX_ENV_TXKEY) == 0)
txn->mt_ro_reader->mr_pid = 0;
@ -10127,17 +10134,19 @@ int __cold mdbx_reader_list(MDBX_env *env, MDBX_msg_func *func, void *ctx) {
if (unlikely(env->me_signature != MDBX_ME_SIGNATURE))
return MDBX_EBADSIGN;
unsigned snap_nreaders = env->me_lck->mti_numreaders;
MDBX_reader *mr = env->me_lck->mti_readers;
const MDBX_lockinfo *const lck = env->me_lck;
const unsigned snap_nreaders = lck->mti_numreaders;
for (unsigned i = 0; i < snap_nreaders; i++) {
if (mr[i].mr_pid) {
txnid_t txnid = mr[i].mr_txnid;
if (lck->mti_readers[i].mr_pid) {
const txnid_t txnid = lck->mti_readers[i].mr_txnid;
if (txnid == ~(txnid_t)0)
snprintf(buf, sizeof(buf), "%10" PRIuPTR " %" PRIxPTR " -\n",
(size_t)mr[i].mr_pid, (size_t)mr[i].mr_tid);
(size_t)lck->mti_readers[i].mr_pid,
(size_t)lck->mti_readers[i].mr_tid);
else
snprintf(buf, sizeof(buf), "%10" PRIuPTR " %" PRIxPTR " %" PRIaTXN "\n",
(size_t)mr[i].mr_pid, (size_t)mr[i].mr_tid, txnid);
(size_t)lck->mti_readers[i].mr_pid,
(size_t)lck->mti_readers[i].mr_tid, txnid);
if (first) {
first = 0;
@ -10211,15 +10220,14 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) {
return MDBX_PANIC;
}
unsigned snap_nreaders = env->me_lck->mti_numreaders;
MDBX_lockinfo *const lck = env->me_lck;
const unsigned snap_nreaders = lck->mti_numreaders;
mdbx_pid_t *pids = alloca((snap_nreaders + 1) * sizeof(mdbx_pid_t));
pids[0] = 0;
int rc = MDBX_SUCCESS, count = 0;
MDBX_reader *mr = env->me_lck->mti_readers;
for (unsigned i = 0; i < snap_nreaders; i++) {
const mdbx_pid_t pid = mr[i].mr_pid;
const mdbx_pid_t pid = lck->mti_readers[i].mr_pid;
if (pid == 0)
continue /* skip empty */;
if (pid == env->me_pid)
@ -10252,7 +10260,7 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) {
}
/* a other process may have clean and reused slot, recheck */
if (mr[i].mr_pid != pid)
if (lck->mti_readers[i].mr_pid != pid)
continue;
err = mdbx_rpid_check(env, pid);
@ -10267,10 +10275,11 @@ int __cold mdbx_reader_check0(MDBX_env *env, int rdt_locked, int *dead) {
/* clean it */
for (unsigned j = i; j < snap_nreaders; j++) {
if (mr[j].mr_pid == pid) {
if (lck->mti_readers[j].mr_pid == pid) {
mdbx_debug("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN "",
(size_t)pid, mr[j].mr_txnid);
mr[j].mr_pid = 0;
(size_t)pid, lck->mti_readers[j].mr_txnid);
lck->mti_readers[j].mr_pid = 0;
lck->mti_reader_finished_flag = true;
count++;
}
}
@ -10371,6 +10380,7 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
if (rc) {
asleep->mr_txnid = ~(txnid_t)0;
env->me_lck->mti_reader_finished_flag = true;
if (rc > 1) {
asleep->mr_tid = 0;
asleep->mr_pid = 0;