mdbx: use page's mp_txnid for basic integrity checking.

Change-Id: I50d6f1251e4fd84e535a708e78dd24d84ec53780
This commit is contained in:
Leonid Yuriev 2019-12-21 00:57:47 +03:00
parent d11bfef36b
commit ccb45730f2
2 changed files with 41 additions and 20 deletions

View File

@ -2849,7 +2849,7 @@ static void mdbx_refund_loose(MDBX_txn *txn) {
most -= 1;
}
const unsigned refunded = txn->mt_next_pgno - most;
mdbx_verbose("refund-sorted %u pages %" PRIaPGNO " -> %" PRIaPGNO,
mdbx_verbose("refund-suitable %u pages %" PRIaPGNO " -> %" PRIaPGNO,
refunded, most, txn->mt_next_pgno);
txn->tw.loose_count -= refunded;
txn->tw.dirtyroom += refunded;
@ -2889,7 +2889,7 @@ static void mdbx_refund_loose(MDBX_txn *txn) {
while (dl->length && dl[dl->length].pgno == txn->mt_next_pgno - 1 &&
dl[dl->length].ptr->mp_flags == (P_LOOSE | P_DIRTY)) {
MDBX_page *dp = dl[dl->length].ptr;
mdbx_verbose("refund-unsorted page %" PRIaPGNO, dp->mp_pgno);
mdbx_verbose("refund-sorted page %" PRIaPGNO, dp->mp_pgno);
mdbx_tassert(txn, dp->mp_pgno == dl[dl->length].pgno);
dl->length -= 1;
}
@ -2958,7 +2958,7 @@ static __cold void mdbx_kill_page(MDBX_env *env, MDBX_page *mp, pgno_t pgno,
mdbx_assert(env, pgno >= NUM_METAS && npages);
if (IS_DIRTY(mp) || (env->me_flags & MDBX_WRITEMAP)) {
const size_t bytes = pgno2bytes(env, npages);
memset(mp, 0, bytes);
memset(mp, -1, bytes);
mp->mp_pgno = pgno;
if ((env->me_flags & MDBX_WRITEMAP) == 0)
mdbx_pwrite(env->me_lazy_fd, mp, bytes, pgno2bytes(env, pgno));
@ -2996,6 +2996,7 @@ static int mdbx_page_loose(MDBX_txn *txn, MDBX_page *mp) {
const unsigned npages = IS_OVERFLOW(mp) ? mp->mp_pages : 1;
const pgno_t pgno = mp->mp_pgno;
mp->mp_txnid = INVALID_TXNID;
if (txn->mt_parent) {
mdbx_tassert(txn, (txn->mt_env->me_flags & MDBX_WRITEMAP) == 0);
mdbx_tassert(txn, mp != pgno2page(txn->mt_env, pgno));
@ -3597,6 +3598,8 @@ static __cold pgno_t mdbx_find_largest(MDBX_env *env, pgno_t largest) {
/* Add a page to the txn's dirty list */
static int __must_check_result mdbx_page_dirty(MDBX_txn *txn, MDBX_page *mp) {
mp->mp_txnid = INVALID_TXNID;
mp->mp_flags |= P_DIRTY;
const int rc = mdbx_dpl_append(txn->tw.dirtylist, mp->mp_pgno, mp);
if (unlikely(rc != MDBX_SUCCESS)) {
txn->mt_flags |= MDBX_TXN_ERROR;
@ -3997,6 +4000,8 @@ __hot static int mdbx_page_alloc(MDBX_cursor *mc, const unsigned num,
mdbx_ensure(env, np->mp_pgno >= NUM_METAS);
VALGRIND_MAKE_MEM_UNDEFINED(page_data(np), page_space(txn->mt_env));
ASAN_UNPOISON_MEMORY_REGION(page_data(np), page_space(txn->mt_env));
np->mp_flags = P_DIRTY;
np->mp_txnid = INVALID_TXNID;
*mp = np;
return MDBX_SUCCESS;
}
@ -4488,10 +4493,8 @@ static int __must_check_result mdbx_page_unspill(MDBX_txn *txn, MDBX_page *mp,
* page remains spilled until child commits */
int rc = mdbx_page_dirty(txn, np);
if (likely(rc == MDBX_SUCCESS)) {
np->mp_flags |= P_DIRTY;
if (likely(rc == MDBX_SUCCESS))
*ret = np;
}
return rc;
}
return MDBX_SUCCESS;
@ -4575,6 +4578,7 @@ __hot static int mdbx_page_touch(MDBX_cursor *mc) {
mdbx_page_copy(np, mp, txn->mt_env->me_psize);
np->mp_pgno = pgno;
np->mp_txnid = INVALID_TXNID;
np->mp_flags |= P_DIRTY;
done:
@ -6619,7 +6623,7 @@ __hot static int mdbx_page_flush(MDBX_txn *txn, const unsigned keep) {
(flush_end > dp->mp_pgno + npages) ? flush_end : dp->mp_pgno + npages;
*env->me_unsynced_pages += npages;
dp->mp_flags &= ~P_DIRTY;
dp->mp_validator = 0 /* TODO */;
dp->mp_txnid = txn->mt_txnid;
if ((env->me_flags & MDBX_WRITEMAP) == 0) {
const size_t size = pgno2bytes(env, npages);
@ -9604,19 +9608,19 @@ __hot static int mdbx_page_get(MDBX_cursor *mc, pgno_t pgno, MDBX_page **ret,
* back in from the map (but don't unspill it here,
* leave that unless page_touch happens again). */
if (txn->tw.spill_pages && mdbx_pnl_exist(txn->tw.spill_pages, pgno << 1))
goto mapped;
goto spilled;
p = mdbx_dpl_find(txn->tw.dirtylist, pgno);
if (p)
goto done;
goto dirty;
level++;
} while ((txn = txn->mt_parent) != NULL);
}
level = 0;
mapped:
spilled:
p = pgno2page(env, pgno);
done:
dirty:
if (unlikely(p->mp_pgno != pgno)) {
mdbx_error("mismatch pgno %" PRIaPGNO " (actual) != %" PRIaPGNO
" (expected)",
@ -9624,16 +9628,25 @@ done:
goto corrupted;
}
if (likely(!IS_OVERFLOW(p))) {
if (unlikely(p->mp_upper < p->mp_lower ||
((p->mp_lower | p->mp_upper) & 1) ||
PAGEHDRSZ + p->mp_upper > env->me_psize)) {
if (unlikely((p->mp_flags & (P_LOOSE | P_SUBP | P_META | P_DIRTY)) != 0 ||
p->mp_txnid > mc->mc_txn->mt_txnid)) {
if (unlikely((mc->mc_txn->mt_flags & MDBX_RDONLY) != 0 ||
(p->mp_flags & (P_LOOSE | P_SUBP | P_META | P_DIRTY)) !=
P_DIRTY)) {
mdbx_error("invalid page's flags (0x%x) or txnid %" PRIaTXN
" > (actual) %" PRIaTXN " (expected)",
p->mp_flags, p->mp_txnid, mc->mc_txn->mt_txnid);
goto corrupted;
}
}
if (unlikely(!IS_OVERFLOW(p) && (p->mp_upper < p->mp_lower ||
((p->mp_lower | p->mp_upper) & 1) != 0 ||
PAGEHDRSZ + p->mp_upper > env->me_psize))) {
mdbx_error("invalid page lower(%u)/upper(%u), pg-limit %u", p->mp_lower,
p->mp_upper, page_space(env));
goto corrupted;
}
}
/* TODO: more checks here, including p->mp_validator */
if (mdbx_audit_enabled()) {
int err = mdbx_page_check(env, p, true);
@ -10908,6 +10921,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
insert_key = insert_data = (rc != MDBX_SUCCESS);
uint16_t fp_flags = P_LEAF | P_DIRTY;
MDBX_page *fp = env->me_pbuf;
fp->mp_txnid = INVALID_TXNID;
if (insert_key) {
/* The key does not exist */
mdbx_debug("inserting key at index %i", mc->mc_ki[mc->mc_top]);
@ -10995,6 +11009,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
}
/* Is it dirty? */
if (IS_DIRTY(omp)) {
mdbx_cassert(mc, omp->mp_txnid > SAFE64_INVALID_THRESHOLD);
/* yes, overwrite it. Note in this case we don't
* bother to try shrinking the page if the new data
* is smaller than the overflow threshold. */
@ -11126,6 +11141,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
case MDBX_CURRENT | MDBX_NODUPDATA:
case MDBX_CURRENT:
fp->mp_flags |= P_DIRTY;
fp->mp_txnid = INVALID_TXNID;
fp->mp_pgno = mp->mp_pgno;
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
@ -11167,6 +11183,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
}
if (mp != fp) {
mp->mp_flags = fp_flags | P_DIRTY;
mp->mp_txnid = INVALID_TXNID;
mp->mp_leaf2_ksize = fp->mp_leaf2_ksize;
mp->mp_lower = fp->mp_lower;
mdbx_cassert(mc, fp->mp_upper + offset <= UINT16_MAX);
@ -11514,6 +11531,7 @@ static int mdbx_page_new(MDBX_cursor *mc, unsigned flags, unsigned num,
mdbx_debug("allocated new page #%" PRIaPGNO ", size %u", np->mp_pgno,
mc->mc_txn->mt_env->me_psize);
np->mp_flags = (uint16_t)(flags | P_DIRTY);
np->mp_txnid = INVALID_TXNID;
np->mp_lower = 0;
np->mp_upper = (indx_t)(mc->mc_txn->mt_env->me_psize - PAGEHDRSZ);
@ -13540,6 +13558,7 @@ static int mdbx_page_split(MDBX_cursor *mc, const MDBX_val *newkey,
}
copy->mp_pgno = mp->mp_pgno;
copy->mp_flags = mp->mp_flags;
copy->mp_txnid = INVALID_TXNID;
copy->mp_lower = 0;
copy->mp_upper = (indx_t)page_space(env);
@ -14082,6 +14101,7 @@ static int __cold mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
memcpy(mo, omp, my->mc_env->me_psize);
mo->mp_pgno = my->mc_next_pgno;
mo->mp_txnid = MIN_TXNID;
my->mc_next_pgno += omp->mp_pages;
my->mc_wlen[toggle] += my->mc_env->me_psize;
if (omp->mp_pages > 1) {
@ -14146,6 +14166,7 @@ static int __cold mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
}
mo = (MDBX_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
mdbx_page_copy(mo, mp, my->mc_env->me_psize);
mo->mp_txnid = MIN_TXNID;
mo->mp_pgno = my->mc_next_pgno++;
my->mc_wlen[toggle] += my->mc_env->me_psize;
if (mc.mc_top) {

View File

@ -211,6 +211,7 @@ typedef uint32_t pgno_t;
typedef uint64_t txnid_t;
#define PRIaTXN PRIi64
#define MIN_TXNID UINT64_C(1)
#define INVALID_TXNID UINT64_MAX
/* LY: for testing non-atomic 64-bit txnid on 32-bit arches.
* #define MDBX_TXNID_STEP (UINT32_MAX / 3) */
#ifndef MDBX_TXNID_STEP
@ -347,8 +348,7 @@ typedef struct MDBX_meta {
typedef struct MDBX_page {
union {
struct MDBX_page *mp_next; /* for in-memory list of freed pages */
uint64_t mp_validator; /* checksum of page content or a txnid during
* which the page has been updated */
uint64_t mp_txnid; /* txnid during which the page has been COW-ed */
};
uint16_t mp_leaf2_ksize; /* key size if this is a LEAF2 page */
#define P_BRANCH 0x01 /* branch page */