mdbx: rework internal self-audit.

Change-Id: I42a585a61a9040a24d9d8dbd12fe9055d189195c
This commit is contained in:
Leo Yuriev 2018-08-27 19:23:18 +03:00 committed by Leonid Yuriev
parent 20022658be
commit ceac458b4e

View File

@ -1372,61 +1372,6 @@ static void mdbx_cursor_chk(MDBX_cursor *mc) {
}
#endif /* 0 */
/* Count all the pages in each DB and in the freelist and make sure
* it matches the actual number of pages being used.
* All named DBs must be open for a correct count. */
static int mdbx_audit(MDBX_txn *txn) {
MDBX_cursor mc;
MDBX_val key, data;
int rc;
pgno_t freecount = 0;
rc = mdbx_cursor_init(&mc, txn, FREE_DBI, NULL);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
while ((rc = mdbx_cursor_get(&mc, &key, &data, MDBX_NEXT)) == 0)
freecount += *(pgno_t *)data.iov_base;
mdbx_tassert(txn, rc == MDBX_NOTFOUND);
pgno_t count = 0;
for (MDBX_dbi i = 0; i < txn->mt_numdbs; i++) {
MDBX_xcursor mx;
if (!(txn->mt_dbflags[i] & DB_VALID))
continue;
rc = mdbx_cursor_init(&mc, txn, i, &mx);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (txn->mt_dbs[i].md_root == P_INVALID)
continue;
count += txn->mt_dbs[i].md_branch_pages + txn->mt_dbs[i].md_leaf_pages +
txn->mt_dbs[i].md_overflow_pages;
if (txn->mt_dbs[i].md_flags & MDBX_DUPSORT) {
rc = mdbx_page_search(&mc, NULL, MDBX_PS_FIRST);
for (; rc == MDBX_SUCCESS; rc = mdbx_cursor_sibling(&mc, 1)) {
MDBX_page *mp = mc.mc_pg[mc.mc_top];
for (unsigned j = 0; j < NUMKEYS(mp); j++) {
MDBX_node *leaf = NODEPTR(mp, j);
if (leaf->mn_flags & F_SUBDATA) {
MDBX_db db;
memcpy(&db, NODEDATA(leaf), sizeof(db));
count +=
db.md_branch_pages + db.md_leaf_pages + db.md_overflow_pages;
}
}
}
mdbx_tassert(txn, rc == MDBX_NOTFOUND);
}
}
if (freecount + count + NUM_METAS != txn->mt_next_pgno) {
mdbx_print("audit: %" PRIaTXN " freecount: %" PRIaPGNO " count: %" PRIaPGNO
" total: %" PRIaPGNO " next_pgno: %" PRIaPGNO "\n",
txn->mt_txnid, freecount, count + NUM_METAS,
freecount + count + NUM_METAS, txn->mt_next_pgno);
return MDBX_CORRUPTED;
}
return MDBX_SUCCESS;
}
int mdbx_cmp(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *a,
const MDBX_val *b) {
mdbx_assert(NULL, txn->mt_signature == MDBX_MT_SIGNATURE);
@ -3540,6 +3485,92 @@ static int mdbx_prep_backlog(MDBX_txn *txn, MDBX_cursor *mc) {
return MDBX_SUCCESS;
}
/* Count all the pages in each DB and in the freelist and make sure
* it matches the actual number of pages being used.
* All named DBs must be open for a correct count. */
static int mdbx_audit(MDBX_txn *txn, unsigned befree_stored) {
MDBX_cursor mc;
MDBX_val key, data;
const pgno_t pending =
(txn->mt_flags & MDBX_RDONLY)
? 0
: txn->mt_loose_count +
(txn->mt_env->me_reclaimed_pglist
? txn->mt_env->me_reclaimed_pglist[0]
: 0) +
(txn->mt_befree_pages ? txn->mt_befree_pages[0] - befree_stored
: 0);
int rc = mdbx_cursor_init(&mc, txn, FREE_DBI, NULL);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
pgno_t freecount = 0;
while ((rc = mdbx_cursor_get(&mc, &key, &data, MDBX_NEXT)) == 0)
freecount += *(pgno_t *)data.iov_base;
mdbx_tassert(txn, rc == MDBX_NOTFOUND);
pgno_t count = 0;
for (MDBX_dbi i = FREE_DBI; i <= MAIN_DBI; i++) {
MDBX_xcursor mx;
if (!(txn->mt_dbflags[i] & DB_VALID))
continue;
rc = mdbx_cursor_init(&mc, txn, i, &mx);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (txn->mt_dbs[i].md_root == P_INVALID)
continue;
count += txn->mt_dbs[i].md_branch_pages + txn->mt_dbs[i].md_leaf_pages +
txn->mt_dbs[i].md_overflow_pages;
rc = mdbx_page_search(&mc, NULL, MDBX_PS_FIRST);
while (rc == MDBX_SUCCESS) {
MDBX_page *mp = mc.mc_pg[mc.mc_top];
for (unsigned j = 0; j < NUMKEYS(mp); j++) {
MDBX_node *leaf = NODEPTR(mp, j);
if ((leaf->mn_flags & (F_DUPDATA | F_SUBDATA)) == F_SUBDATA) {
MDBX_db db_copy, *db;
memcpy(db = &db_copy, NODEDATA(leaf), sizeof(db_copy));
if ((txn->mt_flags & MDBX_RDONLY) == 0) {
for (MDBX_dbi k = txn->mt_numdbs; --k > MAIN_DBI;) {
if ((txn->mt_dbflags[k] & MDBX_TBL_DIRTY) &&
/* txn->mt_dbxs[k].md_name.iov_len > 0 && */
NODEKSZ(leaf) == txn->mt_dbxs[k].md_name.iov_len &&
memcmp(NODEKEY(leaf), txn->mt_dbxs[k].md_name.iov_base,
NODEKSZ(leaf)) == 0) {
db = txn->mt_dbs + k;
break;
}
}
}
count +=
db->md_branch_pages + db->md_leaf_pages + db->md_overflow_pages;
}
}
rc = mdbx_cursor_sibling(&mc, 1);
}
mdbx_tassert(txn, rc == MDBX_NOTFOUND);
}
if (pending + freecount + count + NUM_METAS == txn->mt_next_pgno)
return MDBX_SUCCESS;
if ((txn->mt_flags & MDBX_RDONLY) == 0)
mdbx_error(
"audit @%" PRIaTXN ": %u(pending) = %u(loose-count) + "
"%u(reclaimed-list) + %u(befree-pending) - %u(befree-stored)",
txn->mt_txnid, pending, txn->mt_loose_count,
txn->mt_env->me_reclaimed_pglist ? txn->mt_env->me_reclaimed_pglist[0]
: 0,
txn->mt_befree_pages ? txn->mt_befree_pages[0] : 0, befree_stored);
mdbx_error("audit @%" PRIaTXN ": %" PRIaPGNO "(pending) + %" PRIaPGNO
"(free) + %" PRIaPGNO "(count) = %" PRIaPGNO
"(total) <> %" PRIaPGNO "(next-pgno)",
txn->mt_txnid, pending, freecount, count + NUM_METAS,
pending + freecount + count + NUM_METAS, txn->mt_next_pgno);
return MDBX_PROBLEM;
}
/* Cleanup reclaimed GC records, than save the befree-list as of this
* transaction to GC (aka freeDB). This recursive changes the reclaimed-list
* loose-list and befree-list. Keep trying until it stabilizes. */
@ -4431,8 +4462,11 @@ int mdbx_txn_commit(MDBX_txn *txn) {
env->me_reclaimed_pglist = NULL;
mdbx_pnl_shrink(&txn->mt_befree_pages);
if (mdbx_audit_enabled())
mdbx_audit(txn);
if (mdbx_audit_enabled()) {
rc = mdbx_audit(txn, 0);
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
}
rc = mdbx_page_flush(txn, 0);
if (likely(rc == MDBX_SUCCESS)) {