mirror of
https://github.com/isar/libmdbx.git
synced 2025-08-25 21:54:28 +08:00
mdbx: новые настройки clang-format (косметика).
This commit is contained in:
@@ -45,8 +45,7 @@ bsr_t mvcc_bind_slot(MDBX_env *env) {
|
||||
result.err = mvcc_cleanup_dead(env, true, nullptr);
|
||||
if (result.err != MDBX_RESULT_TRUE) {
|
||||
lck_rdt_unlock(env);
|
||||
result.err =
|
||||
(result.err == MDBX_SUCCESS) ? MDBX_READERS_FULL : result.err;
|
||||
result.err = (result.err == MDBX_SUCCESS) ? MDBX_READERS_FULL : result.err;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -61,8 +60,7 @@ bsr_t mvcc_bind_slot(MDBX_env *env) {
|
||||
safe64_reset(&result.rslot->txnid, true);
|
||||
if (slot == nreaders)
|
||||
env->lck->rdt_length.weak = (uint32_t)++nreaders;
|
||||
result.rslot->tid.weak =
|
||||
(env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
|
||||
result.rslot->tid.weak = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
|
||||
atomic_store32(&result.rslot->pid, env->pid, mo_AcquireRelease);
|
||||
lck_rdt_unlock(env);
|
||||
|
||||
@@ -84,17 +82,14 @@ __hot txnid_t mvcc_shapshot_oldest(MDBX_env *const env, const txnid_t steady) {
|
||||
return env->lck->cached_oldest.weak = steady;
|
||||
}
|
||||
|
||||
const txnid_t prev_oldest =
|
||||
atomic_load64(&lck->cached_oldest, mo_AcquireRelease);
|
||||
const txnid_t prev_oldest = atomic_load64(&lck->cached_oldest, mo_AcquireRelease);
|
||||
eASSERT(env, steady >= prev_oldest);
|
||||
|
||||
txnid_t new_oldest = prev_oldest;
|
||||
while (nothing_changed !=
|
||||
atomic_load32(&lck->rdt_refresh_flag, mo_AcquireRelease)) {
|
||||
while (nothing_changed != atomic_load32(&lck->rdt_refresh_flag, mo_AcquireRelease)) {
|
||||
lck->rdt_refresh_flag.weak = nothing_changed;
|
||||
jitter4testing(false);
|
||||
const size_t snap_nreaders =
|
||||
atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
const size_t snap_nreaders = atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
new_oldest = steady;
|
||||
|
||||
for (size_t i = 0; i < snap_nreaders; ++i) {
|
||||
@@ -105,11 +100,9 @@ __hot txnid_t mvcc_shapshot_oldest(MDBX_env *const env, const txnid_t steady) {
|
||||
|
||||
const txnid_t rtxn = safe64_read(&lck->rdt[i].txnid);
|
||||
if (unlikely(rtxn < prev_oldest)) {
|
||||
if (unlikely(nothing_changed == atomic_load32(&lck->rdt_refresh_flag,
|
||||
mo_AcquireRelease)) &&
|
||||
if (unlikely(nothing_changed == atomic_load32(&lck->rdt_refresh_flag, mo_AcquireRelease)) &&
|
||||
safe64_reset_compare(&lck->rdt[i].txnid, rtxn)) {
|
||||
NOTICE("kick stuck reader[%zu of %zu].pid_%u %" PRIaTXN
|
||||
" < prev-oldest %" PRIaTXN ", steady-txn %" PRIaTXN,
|
||||
NOTICE("kick stuck reader[%zu of %zu].pid_%u %" PRIaTXN " < prev-oldest %" PRIaTXN ", steady-txn %" PRIaTXN,
|
||||
i, snap_nreaders, pid, rtxn, prev_oldest, steady);
|
||||
}
|
||||
continue;
|
||||
@@ -135,17 +128,13 @@ pgno_t mvcc_snapshot_largest(const MDBX_env *env, pgno_t last_used_page) {
|
||||
lck_t *const lck = env->lck_mmap.lck;
|
||||
if (likely(lck != nullptr /* check for exclusive without-lck mode */)) {
|
||||
retry:;
|
||||
const size_t snap_nreaders =
|
||||
atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
const size_t snap_nreaders = atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
for (size_t i = 0; i < snap_nreaders; ++i) {
|
||||
if (atomic_load32(&lck->rdt[i].pid, mo_AcquireRelease)) {
|
||||
/* jitter4testing(true); */
|
||||
const pgno_t snap_pages =
|
||||
atomic_load32(&lck->rdt[i].snapshot_pages_used, mo_Relaxed);
|
||||
const pgno_t snap_pages = atomic_load32(&lck->rdt[i].snapshot_pages_used, mo_Relaxed);
|
||||
const txnid_t snap_txnid = safe64_read(&lck->rdt[i].txnid);
|
||||
if (unlikely(snap_pages !=
|
||||
atomic_load32(&lck->rdt[i].snapshot_pages_used,
|
||||
mo_AcquireRelease) ||
|
||||
if (unlikely(snap_pages != atomic_load32(&lck->rdt[i].snapshot_pages_used, mo_AcquireRelease) ||
|
||||
snap_txnid != safe64_read(&lck->rdt[i].txnid)))
|
||||
goto retry;
|
||||
if (last_used_page < snap_pages && snap_txnid <= env->basal_txn->txnid)
|
||||
@@ -161,18 +150,14 @@ pgno_t mvcc_snapshot_largest(const MDBX_env *env, pgno_t last_used_page) {
|
||||
pgno_t mvcc_largest_this(MDBX_env *env, pgno_t largest) {
|
||||
lck_t *const lck = env->lck_mmap.lck;
|
||||
if (likely(lck != nullptr /* exclusive mode */)) {
|
||||
const size_t snap_nreaders =
|
||||
atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
const size_t snap_nreaders = atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
for (size_t i = 0; i < snap_nreaders; ++i) {
|
||||
retry:
|
||||
if (atomic_load32(&lck->rdt[i].pid, mo_AcquireRelease) == env->pid) {
|
||||
/* jitter4testing(true); */
|
||||
const pgno_t snap_pages =
|
||||
atomic_load32(&lck->rdt[i].snapshot_pages_used, mo_Relaxed);
|
||||
const pgno_t snap_pages = atomic_load32(&lck->rdt[i].snapshot_pages_used, mo_Relaxed);
|
||||
const txnid_t snap_txnid = safe64_read(&lck->rdt[i].txnid);
|
||||
if (unlikely(snap_pages !=
|
||||
atomic_load32(&lck->rdt[i].snapshot_pages_used,
|
||||
mo_AcquireRelease) ||
|
||||
if (unlikely(snap_pages != atomic_load32(&lck->rdt[i].snapshot_pages_used, mo_AcquireRelease) ||
|
||||
snap_txnid != safe64_read(&lck->rdt[i].txnid)))
|
||||
goto retry;
|
||||
if (largest < snap_pages &&
|
||||
@@ -219,8 +204,7 @@ static bool pid_insert(uint32_t *list, uint32_t pid) {
|
||||
return true;
|
||||
}
|
||||
|
||||
__cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked,
|
||||
int *dead) {
|
||||
__cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked, int *dead) {
|
||||
int rc = check_env(env, true);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return rc;
|
||||
@@ -234,13 +218,11 @@ __cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked,
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
const size_t snap_nreaders =
|
||||
atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
const size_t snap_nreaders = atomic_load32(&lck->rdt_length, mo_AcquireRelease);
|
||||
uint32_t pidsbuf_onstask[142];
|
||||
uint32_t *const pids =
|
||||
(snap_nreaders < ARRAY_LENGTH(pidsbuf_onstask))
|
||||
? pidsbuf_onstask
|
||||
: osal_malloc((snap_nreaders + 1) * sizeof(uint32_t));
|
||||
uint32_t *const pids = (snap_nreaders < ARRAY_LENGTH(pidsbuf_onstask))
|
||||
? pidsbuf_onstask
|
||||
: osal_malloc((snap_nreaders + 1) * sizeof(uint32_t));
|
||||
if (unlikely(!pids))
|
||||
return MDBX_ENOMEM;
|
||||
|
||||
@@ -296,8 +278,7 @@ __cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked,
|
||||
/* clean it */
|
||||
for (size_t ii = i; ii < snap_nreaders; ii++) {
|
||||
if (lck->rdt[ii].pid.weak == pid) {
|
||||
DEBUG("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN, (size_t)pid,
|
||||
lck->rdt[ii].txnid.weak);
|
||||
DEBUG("clear stale reader pid %" PRIuPTR " txn %" PRIaTXN, (size_t)pid, lck->rdt[ii].txnid.weak);
|
||||
atomic_store32(&lck->rdt[ii].pid, 0, mo_Relaxed);
|
||||
atomic_store32(&lck->rdt_refresh_flag, true, mo_AcquireRelease);
|
||||
count++;
|
||||
@@ -321,11 +302,9 @@ __cold MDBX_INTERNAL int mvcc_cleanup_dead(MDBX_env *env, int rdt_locked,
|
||||
|
||||
int txn_park(MDBX_txn *txn, bool autounpark) {
|
||||
reader_slot_t *const rslot = txn->to.reader;
|
||||
tASSERT(txn, (txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY |
|
||||
MDBX_TXN_PARKED)) == MDBX_TXN_RDONLY);
|
||||
tASSERT(txn, (txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) == MDBX_TXN_RDONLY);
|
||||
tASSERT(txn, txn->to.reader->tid.weak < MDBX_TID_TXN_OUSTED);
|
||||
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY |
|
||||
MDBX_TXN_PARKED)) != MDBX_TXN_RDONLY))
|
||||
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) != MDBX_TXN_RDONLY))
|
||||
return MDBX_BAD_TXN;
|
||||
|
||||
const uint32_t pid = atomic_load32(&rslot->pid, mo_Relaxed);
|
||||
@@ -344,14 +323,12 @@ int txn_park(MDBX_txn *txn, bool autounpark) {
|
||||
|
||||
atomic_store64(&rslot->tid, MDBX_TID_TXN_PARKED, mo_AcquireRelease);
|
||||
atomic_store32(&txn->env->lck->rdt_refresh_flag, true, mo_Relaxed);
|
||||
txn->flags +=
|
||||
autounpark ? MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK : MDBX_TXN_PARKED;
|
||||
txn->flags += autounpark ? MDBX_TXN_PARKED | MDBX_TXN_AUTOUNPARK : MDBX_TXN_PARKED;
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
int txn_unpark(MDBX_txn *txn) {
|
||||
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD |
|
||||
MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) !=
|
||||
if (unlikely((txn->flags & (MDBX_TXN_FINISHED | MDBX_TXN_HAS_CHILD | MDBX_TXN_RDONLY | MDBX_TXN_PARKED)) !=
|
||||
(MDBX_TXN_RDONLY | MDBX_TXN_PARKED)))
|
||||
return MDBX_BAD_TXN;
|
||||
|
||||
@@ -363,14 +340,11 @@ int txn_unpark(MDBX_txn *txn) {
|
||||
ERROR("unexpected pid %u%s%u", pid, " != expected ", txn->env->pid);
|
||||
return MDBX_PROBLEM;
|
||||
}
|
||||
if (unlikely(tid == MDBX_TID_TXN_OUSTED ||
|
||||
txnid >= SAFE64_INVALID_THRESHOLD))
|
||||
if (unlikely(tid == MDBX_TID_TXN_OUSTED || txnid >= SAFE64_INVALID_THRESHOLD))
|
||||
break;
|
||||
if (unlikely(tid != MDBX_TID_TXN_PARKED || txnid != txn->txnid)) {
|
||||
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%" PRIx64
|
||||
" and/or txn-id %" PRIaTXN "%s%" PRIaTXN,
|
||||
tid, " != must ", MDBX_TID_TXN_OUSTED, txnid, " != must ",
|
||||
txn->txnid);
|
||||
ERROR("unexpected thread-id 0x%" PRIx64 "%s0x%" PRIx64 " and/or txn-id %" PRIaTXN "%s%" PRIaTXN, tid, " != must ",
|
||||
MDBX_TID_TXN_OUSTED, txnid, " != must ", txn->txnid);
|
||||
break;
|
||||
}
|
||||
if (unlikely((txn->flags & MDBX_TXN_ERROR)))
|
||||
@@ -380,12 +354,9 @@ int txn_unpark(MDBX_txn *txn) {
|
||||
if (unlikely(!atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, txn->owner)))
|
||||
continue;
|
||||
#else
|
||||
atomic_store32(&rslot->tid.high, (uint32_t)((uint64_t)txn->owner >> 32),
|
||||
mo_Relaxed);
|
||||
if (unlikely(!atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED,
|
||||
(uint32_t)txn->owner))) {
|
||||
atomic_store32(&rslot->tid.high, (uint32_t)(MDBX_TID_TXN_PARKED >> 32),
|
||||
mo_AcquireRelease);
|
||||
atomic_store32(&rslot->tid.high, (uint32_t)((uint64_t)txn->owner >> 32), mo_Relaxed);
|
||||
if (unlikely(!atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED, (uint32_t)txn->owner))) {
|
||||
atomic_store32(&rslot->tid.high, (uint32_t)(MDBX_TID_TXN_PARKED >> 32), mo_AcquireRelease);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
@@ -413,8 +384,7 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
|
||||
bool notify_eof_of_loop = false;
|
||||
int retry = 0;
|
||||
do {
|
||||
const txnid_t steady =
|
||||
env->txn->tw.troika.txnid[env->txn->tw.troika.prefer_steady];
|
||||
const txnid_t steady = env->txn->tw.troika.txnid[env->txn->tw.troika.prefer_steady];
|
||||
env->lck->rdt_refresh_flag.weak = /* force refresh */ true;
|
||||
oldest = mvcc_shapshot_oldest(env, steady);
|
||||
eASSERT(env, oldest < env->basal_txn->txnid);
|
||||
@@ -435,8 +405,7 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
|
||||
reader_slot_t *const rslot = &lck->rdt[i];
|
||||
txnid_t rtxn = safe64_read(&rslot->txnid);
|
||||
retry:
|
||||
if (rtxn == straggler &&
|
||||
(pid = atomic_load32(&rslot->pid, mo_AcquireRelease)) != 0) {
|
||||
if (rtxn == straggler && (pid = atomic_load32(&rslot->pid, mo_AcquireRelease)) != 0) {
|
||||
const uint64_t tid = safe64_read(&rslot->tid);
|
||||
if (tid == MDBX_TID_TXN_PARKED) {
|
||||
/* Читающая транзакция была помечена владельцем как "припаркованная",
|
||||
@@ -454,25 +423,21 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
|
||||
*/
|
||||
bool ousted =
|
||||
#if MDBX_64BIT_CAS
|
||||
atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED,
|
||||
MDBX_TID_TXN_OUSTED);
|
||||
atomic_cas64(&rslot->tid, MDBX_TID_TXN_PARKED, MDBX_TID_TXN_OUSTED);
|
||||
#else
|
||||
atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED,
|
||||
(uint32_t)MDBX_TID_TXN_OUSTED);
|
||||
atomic_cas32(&rslot->tid.low, (uint32_t)MDBX_TID_TXN_PARKED, (uint32_t)MDBX_TID_TXN_OUSTED);
|
||||
#endif
|
||||
if (likely(ousted)) {
|
||||
ousted = safe64_reset_compare(&rslot->txnid, rtxn);
|
||||
NOTICE("ousted-%s parked read-txn %" PRIaTXN
|
||||
", pid %u, tid 0x%" PRIx64,
|
||||
ousted ? "complete" : "half", rtxn, pid, tid);
|
||||
NOTICE("ousted-%s parked read-txn %" PRIaTXN ", pid %u, tid 0x%" PRIx64, ousted ? "complete" : "half", rtxn,
|
||||
pid, tid);
|
||||
eASSERT(env, ousted || safe64_read(&rslot->txnid) > straggler);
|
||||
continue;
|
||||
}
|
||||
rtxn = safe64_read(&rslot->txnid);
|
||||
goto retry;
|
||||
}
|
||||
hold_retired =
|
||||
atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed);
|
||||
hold_retired = atomic_load64(&lck->rdt[i].snapshot_pages_retired, mo_Relaxed);
|
||||
stucked = rslot;
|
||||
}
|
||||
}
|
||||
@@ -487,15 +452,10 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
|
||||
|
||||
const meta_ptr_t head = meta_recent(env, &env->txn->tw.troika);
|
||||
const txnid_t gap = (head.txnid - straggler) / xMDBX_TXNID_STEP;
|
||||
const uint64_t head_retired =
|
||||
unaligned_peek_u64(4, head.ptr_c->pages_retired);
|
||||
const size_t space =
|
||||
(head_retired > hold_retired)
|
||||
? pgno2bytes(env, (pgno_t)(head_retired - hold_retired))
|
||||
: 0;
|
||||
int rc =
|
||||
callback(env, env->txn, pid, (mdbx_tid_t)((intptr_t)tid), straggler,
|
||||
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry);
|
||||
const uint64_t head_retired = unaligned_peek_u64(4, head.ptr_c->pages_retired);
|
||||
const size_t space = (head_retired > hold_retired) ? pgno2bytes(env, (pgno_t)(head_retired - hold_retired)) : 0;
|
||||
int rc = callback(env, env->txn, pid, (mdbx_tid_t)((intptr_t)tid), straggler,
|
||||
(gap < UINT_MAX) ? (unsigned)gap : UINT_MAX, space, retry);
|
||||
if (rc < 0)
|
||||
/* hsr returned error and/or agree MDBX_MAP_FULL error */
|
||||
break;
|
||||
@@ -523,10 +483,8 @@ __cold txnid_t mvcc_kick_laggards(MDBX_env *env, const txnid_t straggler) {
|
||||
/* notify end of hsr-loop */
|
||||
const txnid_t turn = oldest - straggler;
|
||||
if (turn)
|
||||
NOTICE("hsr-kick: done turn %" PRIaTXN " -> %" PRIaTXN " +%" PRIaTXN,
|
||||
straggler, oldest, turn);
|
||||
callback(env, env->txn, 0, 0, straggler,
|
||||
(turn < UINT_MAX) ? (unsigned)turn : UINT_MAX, 0, -retry);
|
||||
NOTICE("hsr-kick: done turn %" PRIaTXN " -> %" PRIaTXN " +%" PRIaTXN, straggler, oldest, turn);
|
||||
callback(env, env->txn, 0, 0, straggler, (turn < UINT_MAX) ? (unsigned)turn : UINT_MAX, 0, -retry);
|
||||
}
|
||||
return oldest;
|
||||
}
|
||||
|
Reference in New Issue
Block a user