mdbx: add shared cache for oldest reader's txnid.

Change-Id: I48cbc778b873445dffa8ecef1fc3633e0193131f
This commit is contained in:
Leo Yuriev 2017-06-14 23:33:13 +03:00
parent c01aeb5c68
commit 81661ff952
2 changed files with 78 additions and 64 deletions

View File

@ -365,18 +365,35 @@ typedef struct MDBX_lockinfo {
/* Flags which environment was opened. */
volatile uint32_t mti_envmode;
union {
#ifdef MDBX_OSAL_LOCK
MDBX_OSAL_LOCK mti_wmutex;
MDBX_OSAL_LOCK mti_wmutex;
#endif
uint64_t align_wmutex;
};
/* The number of slots that have been used in the reader table.
* This always records the maximum count, it is not decremented
* when readers release their slots. */
volatile unsigned __cache_aligned mti_numreaders;
union {
/* The number of slots that have been used in the reader table.
* This always records the maximum count, it is not decremented
* when readers release their slots. */
volatile unsigned __cache_aligned mti_numreaders;
uint64_t align_numreaders;
};
union {
#ifdef MDBX_OSAL_LOCK
/* Mutex protecting access to this table. */
MDBX_OSAL_LOCK mti_rmutex;
/* Mutex protecting access to this table. */
MDBX_OSAL_LOCK mti_rmutex;
#endif
uint64_t align_rmutex;
};
union {
volatile txnid_t mti_oldest;
uint64_t align_oldest;
};
uint8_t pad_align[MDBX_CACHELINE_SIZE - sizeof(uint64_t) * 6];
MDBX_reader __cache_aligned mti_readers[1];
} MDBX_lockinfo;
@ -635,23 +652,23 @@ struct MDBX_env {
/* Max MDBX_lockinfo.mti_numreaders of interest to mdbx_env_close() */
unsigned me_close_readers;
mdbx_fastmutex_t me_dbi_lock;
MDBX_dbi me_numdbs; /* number of DBs opened */
MDBX_dbi me_maxdbs; /* size of the DB table */
mdbx_pid_t me_pid; /* process ID of this env */
char *me_path; /* path to the DB files */
char *me_map; /* the memory map of the data file */
MDBX_lockinfo *me_lck; /* the memory map of the lock file, never NULL */
void *me_pbuf; /* scratch area for DUPSORT put() */
MDBX_txn *me_txn; /* current write transaction */
MDBX_txn *me_txn0; /* prealloc'd write transaction */
size_t me_mapsize; /* size of the data memory map */
pgno_t me_maxpg; /* me_mapsize / me_psize */
MDBX_dbx *me_dbxs; /* array of static DB info */
uint16_t *me_dbflags; /* array of flags from MDBX_db.md_flags */
unsigned *me_dbiseqs; /* array of dbi sequence numbers */
mdbx_thread_key_t me_txkey; /* thread-key for readers */
txnid_t me_pgoldest; /* ID of oldest reader last time we looked */
MDBX_pgstate me_pgstate; /* state of old pages from freeDB */
MDBX_dbi me_numdbs; /* number of DBs opened */
MDBX_dbi me_maxdbs; /* size of the DB table */
mdbx_pid_t me_pid; /* process ID of this env */
char *me_path; /* path to the DB files */
char *me_map; /* the memory map of the data file */
MDBX_lockinfo *me_lck; /* the memory map of the lock file, never NULL */
void *me_pbuf; /* scratch area for DUPSORT put() */
MDBX_txn *me_txn; /* current write transaction */
MDBX_txn *me_txn0; /* prealloc'd write transaction */
size_t me_mapsize; /* size of the data memory map */
pgno_t me_maxpg; /* me_mapsize / me_psize */
MDBX_dbx *me_dbxs; /* array of static DB info */
uint16_t *me_dbflags; /* array of flags from MDBX_db.md_flags */
unsigned *me_dbiseqs; /* array of dbi sequence numbers */
mdbx_thread_key_t me_txkey; /* thread-key for readers */
volatile txnid_t *me_oldest; /* ID of oldest reader last time we looked */
MDBX_pgstate me_pgstate; /* state of old pages from freeDB */
#define me_pglast me_pgstate.mf_pglast
#define me_pghead me_pgstate.mf_pghead
MDBX_page *me_dpages; /* list of malloc'd blocks for re-use */
@ -663,16 +680,17 @@ struct MDBX_env {
unsigned me_maxfree_1pg;
/* Max size of a node on a page */
unsigned me_nodemax;
unsigned me_maxkey_limit; /* max size of a key */
mdbx_pid_t me_live_reader; /* have liveness lock in reader table */
void *me_userctx; /* User-settable context */
unsigned me_maxkey_limit; /* max size of a key */
mdbx_pid_t me_live_reader; /* have liveness lock in reader table */
void *me_userctx; /* User-settable context */
size_t me_sync_pending; /* Total dirty/non-sync'ed bytes
* since the last mdbx_env_sync() */
size_t me_sync_threshold; /* Treshold of above to force synchronous flush */
MDBX_oom_func *me_oom_func; /* Callback for kicking laggard readers */
txnid_t me_oldest_stub;
#if MDBX_DEBUG
MDBX_assert_func *me_assert_func; /* Callback for assertion failures */
#endif
size_t me_sync_pending; /* Total dirty/non-sync'ed bytes
* since the last mdbx_env_sync() */
size_t me_sync_threshold; /* Treshold of above to force synchronous flush */
MDBX_oom_func *me_oom_func; /* Callback for kicking laggard readers */
#ifdef USE_VALGRIND
int me_valgrind_handle;
#endif

View File

@ -1483,12 +1483,10 @@ static const char *mdbx_durable_str(const MDBX_meta *const meta) {
/* Find oldest txnid still referenced. */
static txnid_t mdbx_find_oldest(MDBX_txn *txn, int *laggard) {
MDBX_env *env = txn->mt_env;
const MDBX_meta *const head = mdbx_meta_mostrecent(
env, F_ISSET(env->me_flags, MDBX_UTTERLY_NOSYNC) ? false : true);
txnid_t oldest =
meta_txnid(env, head, (txn->mt_flags & MDBX_RDONLY) ? true : false);
mdbx_assert(env, (txn->mt_flags & MDBX_RDONLY) == 0);
int i, reader;
txnid_t oldest = txn->mt_txnid - 1;
const MDBX_reader *const r = env->me_lck->mti_readers;
for (reader = -1, i = env->me_lck->mti_numreaders; --i >= 0;) {
if (r[i].mr_pid) {
@ -1503,7 +1501,9 @@ static txnid_t mdbx_find_oldest(MDBX_txn *txn, int *laggard) {
if (laggard)
*laggard = reader;
return env->me_pgoldest = oldest;
*env->me_oldest = oldest;
return oldest;
}
/* Add a page to the txn's dirty list */
@ -1614,7 +1614,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp,
break;
oldest = (flags & MDBX_LIFORECLAIM) ? mdbx_find_oldest(txn, NULL)
: env->me_pgoldest;
: env->me_oldest[0];
mdbx_cursor_init(&m2, txn, FREE_DBI, NULL);
if (flags & MDBX_LIFORECLAIM) {
/* Begin from oldest reader if any */
@ -1645,7 +1645,7 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp,
if (op == MDBX_SET_RANGE)
continue;
if (oldest < mdbx_find_oldest(txn, NULL)) {
oldest = env->me_pgoldest;
oldest = *env->me_oldest;
last = oldest - 1;
key.iov_base = &last;
key.iov_len = sizeof(last);
@ -2283,7 +2283,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
}
while (1) {
MDBX_meta *const meta = mdbx_meta_head(txn->mt_env);
MDBX_meta *const meta = mdbx_meta_head(env);
mdbx_jitter4testing(false);
const txnid_t snap = mdbx_meta_txnid_fluid(env, meta);
mdbx_jitter4testing(false);
@ -2293,8 +2293,8 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
mdbx_assert(env, r->mr_pid == mdbx_getpid());
mdbx_assert(env, r->mr_tid == mdbx_thread_self());
mdbx_assert(env, r->mr_txnid == snap);
mdbx_coherent_barrier();
}
mdbx_coherent_barrier();
mdbx_jitter4testing(true);
/* Snap the state from current meta-head */
@ -2304,14 +2304,15 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
txn->mt_canary = meta->mm_canary;
/* LY: Retry on a race, ITS#7970. */
if (likely(meta == mdbx_meta_head(txn->mt_env) &&
mdbx_compiler_barrier();
if (likely(meta == mdbx_meta_head(env) &&
snap == mdbx_meta_txnid_fluid(env, meta))) {
mdbx_jitter4testing(false);
break;
}
}
mdbx_assert(env, txn->mt_txnid >= mdbx_find_oldest(txn, nullptr));
mdbx_assert(env, txn->mt_txnid >= *env->me_oldest);
txn->mt_ro_reader = r;
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
} else {
@ -4314,7 +4315,7 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname, int mode) {
err = mdbx_mmap(&addr, (size_t)size, true, env->me_lfd);
if (unlikely(err != MDBX_SUCCESS))
return err;
assert(addr != nullptr);
mdbx_assert(env, addr != nullptr);
env->me_lck = addr;
#ifdef MADV_DODUMP
@ -4359,6 +4360,8 @@ static int __cold mdbx_setup_lck(MDBX_env *env, char *lck_pathname, int mode) {
}
}
mdbx_assert(env, !MDBX_IS_ERROR(rc));
env->me_oldest = &env->me_lck->mti_oldest;
return rc;
}
@ -4451,6 +4454,7 @@ int __cold mdbx_env_open_ex(MDBX_env *env, const char *path, unsigned flags,
goto bailout;
}
env->me_oldest = &env->me_oldest_stub;
const int dxb_rc = mdbx_setup_dxb(env, lck_rc);
if (MDBX_IS_ERROR(dxb_rc)) {
rc = dxb_rc;
@ -4594,8 +4598,9 @@ static void __cold mdbx_env_close0(MDBX_env *env) {
mdbx_munmap((void *)env->me_lck,
(env->me_maxreaders - 1) * sizeof(MDBX_reader) +
sizeof(MDBX_lockinfo));
env->me_lck = NULL;
env->me_lck = nullptr;
env->me_pid = 0;
env->me_oldest = nullptr;
mdbx_lck_destroy(env);
if (env->me_lfd != INVALID_HANDLE_VALUE) {
@ -5087,8 +5092,7 @@ static int mdbx_page_search(MDBX_cursor *mc, MDBX_val *key, int flags) {
return MDBX_BAD_TXN;
}
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
/* Make sure we're using an up-to-date root */
if (unlikely(*mc->mc_dbflag & DB_STALE)) {
MDBX_cursor mc2;
@ -5128,8 +5132,7 @@ static int mdbx_page_search(MDBX_cursor *mc, MDBX_val *key, int flags) {
return MDBX_NOTFOUND;
}
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
mdbx_cassert(mc, root >= NUM_METAS);
if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root)
if (unlikely((rc = mdbx_page_get(mc, root, &mc->mc_pg[0], NULL)) != 0))
@ -5238,8 +5241,7 @@ static __inline int mdbx_node_read(MDBX_cursor *mc, MDBX_node *leaf,
pgno_t pgno;
int rc;
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
data->iov_len = NODEDSZ(leaf);
data->iov_base = NODEDATA(leaf);
@ -5299,8 +5301,7 @@ static int mdbx_cursor_sibling(MDBX_cursor *mc, int move_right) {
MDBX_node *indx;
MDBX_page *mp;
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
if (unlikely(mc->mc_snum < 2)) {
return MDBX_NOTFOUND; /* root has no siblings */
}
@ -5530,8 +5531,7 @@ static int mdbx_cursor_set(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
MDBX_node *leaf = NULL;
DKBUF;
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
if ((mc->mc_db->md_flags & MDBX_INTEGERKEY) &&
unlikely(key->iov_len != sizeof(uint32_t) &&
key->iov_len != sizeof(uint64_t))) {
@ -5818,8 +5818,7 @@ int mdbx_cursor_get(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
if (unlikely(mc->mc_txn->mt_flags & MDBX_TXN_BLOCKED))
return MDBX_BAD_TXN;
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
switch (op) {
case MDBX_GET_CURRENT: {
if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
@ -7072,8 +7071,7 @@ static void mdbx_xcursor_init0(MDBX_cursor *mc) {
static void mdbx_xcursor_init1(MDBX_cursor *mc, MDBX_node *node) {
MDBX_xcursor *mx = mc->mc_xcursor;
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
if (node->mn_flags & F_SUBDATA) {
memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDBX_db));
mx->mx_cursor.mc_pg[0] = 0;
@ -7123,8 +7121,7 @@ static void mdbx_xcursor_init2(MDBX_cursor *mc, MDBX_xcursor *src_mx,
int new_dupdata) {
MDBX_xcursor *mx = mc->mc_xcursor;
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
if (new_dupdata) {
mx->mx_cursor.mc_snum = 1;
mx->mx_cursor.mc_top = 0;
@ -7164,8 +7161,7 @@ static void mdbx_cursor_init(MDBX_cursor *mc, MDBX_txn *txn, MDBX_dbi dbi,
mc->mc_xcursor = mx;
mdbx_xcursor_init0(mc);
}
mdbx_cassert(mc,
mc->mc_txn->mt_txnid >= mdbx_find_oldest(mc->mc_txn, nullptr));
mdbx_cassert(mc, mc->mc_txn->mt_txnid >= mc->mc_txn->mt_env->me_oldest[0]);
if (unlikely(*mc->mc_dbflag & DB_STALE)) {
mdbx_page_search(mc, NULL, MDBX_PS_ROOTONLY);
}
@ -7784,8 +7780,8 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
static void mdbx_cursor_copy(const MDBX_cursor *csrc, MDBX_cursor *cdst) {
unsigned i;
mdbx_cassert(csrc, csrc->mc_txn->mt_txnid >=
mdbx_find_oldest(csrc->mc_txn, nullptr));
mdbx_cassert(csrc,
csrc->mc_txn->mt_txnid >= csrc->mc_txn->mt_env->me_oldest[0]);
cdst->mc_txn = csrc->mc_txn;
cdst->mc_dbi = csrc->mc_dbi;
cdst->mc_db = csrc->mc_db;