mdbx: txnid-safety for non-64-bit and/or not-atomic platforms.

This commit is contained in:
Leo Yuriev 2019-09-20 19:53:42 +03:00
parent da9dc75fbc
commit 8423a0a8bd
2 changed files with 88 additions and 31 deletions

View File

@ -272,6 +272,62 @@ size_t __hot mdbx_e2k_strnlen_bug_workaround(const char *s, size_t maxlen) {
}
#endif /* Elbrus's memcmp() bug. */
/*------------------------------------------------------------------------------
* safe write volatile txnid for non-64-bit and/or not-atomic platforms. */
#ifndef MDBX_64BIT_ATOMIC
#if MDBX_WORDBITS >= 64
#define MDBX_64BIT_ATOMIC 1
#else
#define MDBX_64BIT_ATOMIC 0
#endif
#endif /* MDBX_64BIT_ATOMIC */
static __always_inline void safe_txnid_reset(volatile uint64_t *const ptr) {
mdbx_compiler_barrier();
#if MDBX_64BIT_ATOMIC
*ptr = UINT64_MAX;
#else
volatile uint32_t *const high_part =
((volatile uint32_t *)ptr) + (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__);
*high_part = UINT32_MAX;
#endif /* MDBX_64BIT_ATOMIC */
mdbx_flush_noncoherent_cpu_writeback();
mdbx_jitter4testing(true);
}
static __always_inline void safe_txnid_set(volatile uint64_t *const ptr,
uint64_t v) {
mdbx_compiler_barrier();
assert(*ptr >= BAD_TXNID);
#if MDBX_64BIT_ATOMIC
*ptr = v;
#else
volatile uint32_t *const high_part =
((volatile uint32_t *)ptr) + (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__);
volatile uint32_t *const lower_part =
((volatile uint32_t *)ptr) + (__BYTE_ORDER__ != __ORDER_LITTLE_ENDIAN__);
mdbx_jitter4testing(true);
*lower_part = (uint32_t)v;
mdbx_flush_noncoherent_cpu_writeback();
mdbx_jitter4testing(true);
*high_part = (uint32_t)(v >> 32);
#endif /* MDBX_64BIT_ATOMIC */
mdbx_compiler_barrier();
}
static __always_inline void safe_txnid_update(volatile uint64_t *const ptr,
uint64_t v) {
#if MDBX_64BIT_ATOMIC
mdbx_compiler_barrier();
*ptr = v;
#else
safe_txnid_reset(ptr);
safe_txnid_set(ptr, v);
#endif /* MDBX_64BIT_ATOMIC */
mdbx_flush_noncoherent_cpu_writeback();
}
/*----------------------------------------------------------------------------*/
/* rthc (tls keys and destructors) */
@ -2167,9 +2223,8 @@ static __inline void mdbx_meta_update_begin(const MDBX_env *env,
MDBX_meta *meta, txnid_t txnid) {
mdbx_assert(env, meta >= METAPAGE(env, 0) || meta < METAPAGE_END(env));
mdbx_assert(env, meta->mm_txnid_a < txnid && meta->mm_txnid_b < txnid);
meta->mm_txnid_a = txnid;
(void)env;
mdbx_flush_noncoherent_cpu_writeback();
safe_txnid_update(&meta->mm_txnid_a, txnid);
}
static __inline void mdbx_meta_update_end(const MDBX_env *env, MDBX_meta *meta,
@ -2177,15 +2232,15 @@ static __inline void mdbx_meta_update_end(const MDBX_env *env, MDBX_meta *meta,
mdbx_assert(env, meta >= METAPAGE(env, 0) || meta < METAPAGE_END(env));
mdbx_assert(env, meta->mm_txnid_a == txnid);
mdbx_assert(env, meta->mm_txnid_b < txnid);
(void)env;
mdbx_jitter4testing(true);
meta->mm_txnid_b = txnid;
mdbx_flush_noncoherent_cpu_writeback();
safe_txnid_update(&meta->mm_txnid_b, txnid);
}
static __inline void mdbx_meta_set_txnid(const MDBX_env *env, MDBX_meta *meta,
txnid_t txnid) {
mdbx_assert(env, meta < METAPAGE(env, 0) || meta > METAPAGE_END(env));
(void)env;
meta->mm_txnid_a = txnid;
meta->mm_txnid_b = txnid;
}
@ -2666,8 +2721,9 @@ static int mdbx_page_alloc(MDBX_cursor *mc, unsigned num, MDBX_page **mp,
if (rc == MDBX_NOTFOUND && (flags & MDBX_LIFORECLAIM)) {
if (op == MDBX_SET_RANGE)
continue;
if (oldest < mdbx_find_oldest(txn)) {
oldest = *env->me_oldest;
txnid_t snap = mdbx_find_oldest(txn);
if (oldest < snap) {
oldest = snap;
last = oldest - 1;
key.iov_base = &last;
key.iov_len = sizeof(last);
@ -3374,7 +3430,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
}
if (likely(r)) {
if (unlikely(r->mr_pid != env->me_pid || r->mr_txnid != ~(txnid_t)0))
if (unlikely(r->mr_pid != env->me_pid || r->mr_txnid < BAD_TXNID))
return MDBX_BAD_RSLOT;
} else if (env->me_lck) {
unsigned slot, nreaders;
@ -3429,11 +3485,10 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
* that, it is safe for mdbx_env_close() to touch it.
* When it will be closed, we can finally claim it. */
r->mr_pid = 0;
r->mr_txnid = ~(txnid_t)0;
r->mr_tid = tid;
mdbx_flush_noncoherent_cpu_writeback();
safe_txnid_reset(&r->mr_txnid);
if (slot == nreaders)
env->me_lck->mti_numreaders = ++nreaders;
r->mr_tid = tid;
r->mr_pid = env->me_pid;
mdbx_rdt_unlock(env);
@ -3449,8 +3504,9 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
const txnid_t snap = mdbx_meta_txnid_fluid(env, meta);
mdbx_jitter4testing(false);
if (likely(r)) {
safe_txnid_reset(&r->mr_txnid);
r->mr_snapshot_pages_used = meta->mm_geo.next;
r->mr_txnid = snap;
safe_txnid_set(&r->mr_txnid, snap);
mdbx_jitter4testing(false);
mdbx_assert(env, r->mr_pid == mdbx_getpid());
mdbx_assert(env, r->mr_tid == mdbx_thread_self());
@ -3477,7 +3533,7 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
}
}
if (unlikely(txn->mt_txnid == 0)) {
if (unlikely(txn->mt_txnid == 0 || txn->mt_txnid >= BAD_TXNID)) {
mdbx_error("environment corrupted by died writer, must shutdown!");
rc = MDBX_WANNA_RECOVERY;
goto bailout;
@ -3509,8 +3565,12 @@ static int mdbx_txn_renew0(MDBX_txn *txn, unsigned flags) {
mdbx_jitter4testing(false);
txn->mt_canary = meta->mm_canary;
const txnid_t snap = mdbx_meta_txnid_stable(env, meta);
#if MDBX_DEBUG
txn->mt_txnid = snap + UINT32_MAX / 3;
#else
txn->mt_txnid = snap + 1;
if (unlikely(txn->mt_txnid < snap)) {
#endif
if (unlikely(txn->mt_txnid >= BAD_TXNID)) {
mdbx_debug("txnid overflow!");
rc = MDBX_TXN_FULL;
goto bailout;
@ -3891,16 +3951,15 @@ static int mdbx_txn_end(MDBX_txn *txn, unsigned mode) {
txn->mt_ro_reader->mr_txnid >=
env->me_lck->mti_oldest_reader);
txn->mt_ro_reader->mr_snapshot_pages_used = 0;
txn->mt_ro_reader->mr_txnid = ~(txnid_t)0;
mdbx_memory_barrier();
env->me_lck->mti_readers_refresh_flag = true;
safe_txnid_reset(&txn->mt_ro_reader->mr_txnid);
if (mode & MDBX_END_SLOT) {
if ((env->me_flags & MDBX_ENV_TXKEY) == 0)
txn->mt_ro_reader->mr_pid = 0;
txn->mt_ro_reader = NULL;
}
env->me_lck->mti_readers_refresh_flag = true;
mdbx_flush_noncoherent_cpu_writeback();
}
mdbx_flush_noncoherent_cpu_writeback();
txn->mt_numdbs = 0; /* prevent further DBI activity */
txn->mt_flags = MDBX_RDONLY | MDBX_TXN_FINISHED;
txn->mt_owner = 0;
@ -5739,8 +5798,8 @@ static int mdbx_sync_locked(MDBX_env *env, unsigned flags,
mdbx_jitter4testing(true);
if (likely(target != head)) {
/* LY: 'invalidate' the meta. */
target->mm_datasync_sign = MDBX_DATASIGN_WEAK;
mdbx_meta_update_begin(env, target, pending->mm_txnid_a);
target->mm_datasync_sign = MDBX_DATASIGN_WEAK;
#ifndef NDEBUG
/* debug: provoke failure to catch a violators, but don't touch mm_psize
* and mm_flags to allow readers catch actual pagesize. */
@ -13225,7 +13284,7 @@ int __cold mdbx_reader_list(MDBX_env *env, MDBX_reader_list_func *func,
goto retry_reader;
mdbx_assert(env, txnid > 0);
if (txnid == ~(txnid_t)0)
if (txnid >= BAD_TXNID)
txnid = 0;
size_t bytes_used = 0;
@ -13500,33 +13559,30 @@ static txnid_t __cold mdbx_oomkick(MDBX_env *env, const txnid_t laggard) {
return *env->me_oldest = oldest;
}
mdbx_tid_t tid;
mdbx_pid_t pid;
int rc;
if (!env->me_oom_func)
break;
pid = asleep->mr_pid;
tid = asleep->mr_tid;
mdbx_pid_t pid = asleep->mr_pid;
mdbx_tid_t tid = asleep->mr_tid;
if (asleep->mr_txnid != laggard || pid <= 0)
continue;
const txnid_t gap =
mdbx_meta_txnid_stable(env, mdbx_meta_head(env)) - laggard;
rc = env->me_oom_func(env, pid, tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, retry);
int rc =
env->me_oom_func(env, pid, tid, laggard,
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, retry);
if (rc < 0)
break;
if (rc) {
asleep->mr_txnid = ~(txnid_t)0;
env->me_lck->mti_readers_refresh_flag = true;
safe_txnid_reset(&asleep->mr_txnid);
if (rc > 1) {
asleep->mr_tid = 0;
asleep->mr_pid = 0;
mdbx_flush_noncoherent_cpu_writeback();
}
env->me_lck->mti_readers_refresh_flag = true;
mdbx_flush_noncoherent_cpu_writeback();
}
}

View File

@ -263,6 +263,7 @@ typedef uint32_t pgno_t;
typedef uint64_t txnid_t;
#define PRIaTXN PRIi64
#define MIN_TXNID UINT64_C(1)
#define BAD_TXNID UINT64_C(0xffffFFFF00000000)
/* Used for offsets within a single page.
* Since memory pages are typically 4 or 8KB in size, 12-13 bits,