lmdb: major rework of traversal b-tree for mdb_chk.

Change-Id: I9d382516f76092f44fc1a12d7554039582b87656
This commit is contained in:
Leo Yuriev
2015-09-02 13:30:53 +03:00
parent 8ff2458003
commit 15e0600b6c
3 changed files with 169 additions and 153 deletions

239
mdb.c
View File

@@ -1472,8 +1472,7 @@ mdb_page_list(MDB_page *mp)
key.mv_data = node->mn_data;
nsize = NODESIZE + key.mv_size;
if (IS_BRANCH(mp)) {
mdb_print("key %d: page %zu, %s\n", i, NODEPGNO(node),
DKEY(&key));
mdb_print("key %d: page %zu, %s\n", i, NODEPGNO(node), DKEY(&key));
total += nsize;
} else {
if (F_ISSET(node->mn_flags, F_BIGDATA))
@@ -9794,154 +9793,141 @@ mdb_env_get_oomfunc(MDB_env *env)
struct mdb_walk_ctx {
MDB_txn *mw_txn;
void *mw_user;
MDB_pgwalk_func *mw_visitor;
MDB_pgvisitor_func *mw_visitor;
};
typedef struct mdb_walk_ctx mdb_walk_ctx_t;
/** Depth-first tree traversal. */
static int ESECT
mdb_env_walk(mdb_walk_ctx_t *ctx, const char* dbi, pgno_t pg, int flags, int deep)
{
MDB_cursor mc;
MDB_node *ni;
MDB_page *mp;
int rc;
unsigned i;
int rc, i, nkeys;
unsigned header_size, unused_size, payload_size, align_bytes;
const char* type;
/* Empty DB, nothing to do */
if (pg == P_INVALID)
return MDB_SUCCESS;
return MDB_CORRUPTED;
if (deep < 2) {
if ((rc = mdb_page_get(ctx->mw_txn, pg, &mp, NULL)) != 0)
return rc;
rc = ctx->mw_visitor(pg, 0, ctx->mw_user, dbi, 'R',
ctx->mw_txn->mt_env->me_psize - PAGEHDRSZ - SIZELEFT(mp), PAGEHDRSZ);
if (rc)
return rc;
}
mc.mc_snum = 1;
mc.mc_top = 0;
mc.mc_txn = ctx->mw_txn;
rc = mdb_page_get(ctx->mw_txn, pg, &mc.mc_pg[0], NULL);
rc = mdb_page_get(ctx->mw_txn, pg, &mp, NULL);
if (rc)
return rc;
if (pg != mp->mp_p.p_pgno)
return MDB_CORRUPTED;
for (mp = mc.mc_pg[mc.mc_top]; IS_BRANCH(mp); ) {
MDB_node *node;
nkeys = NUMKEYS(mp);
header_size = IS_LEAF2(mp) ? PAGEHDRSZ : PAGEBASE + mp->mp_lower;
unused_size = SIZELEFT(mp);
payload_size = 0;
rc = ctx->mw_visitor(mp->mp_p.p_pgno, 1, ctx->mw_user, dbi, 'B',
ctx->mw_txn->mt_env->me_psize - PAGEHDRSZ - SIZELEFT(mp), PAGEHDRSZ);
if (rc)
return rc;
if (NUMKEYS(mp) < 1)
/* LY: Don't use mask here, e.g bitwise (P_BRANCH|P_LEAF|P_LEAF2|P_META|P_OVERFLOW|P_SUBP).
* Pages should not me marked dirty/loose or otherwise. */
switch (mp->mp_flags) {
case P_BRANCH:
type = "branch";
if (nkeys < 1)
return MDB_CORRUPTED;
mdb_debug("branch page %zu has %u keys", mp->mp_pgno, NUMKEYS(mp));
mdb_cassert(&mc, NUMKEYS(mp) > 1);
mdb_debug("found index 0 to page %zu", NODEPGNO(NODEPTR(mp, 0)));
node = NODEPTR(mp, 0);
if ((rc = mdb_page_get(mc.mc_txn, NODEPGNO(node), &mp, NULL)) != 0)
return rc;
mc.mc_ki[mc.mc_top] = 0;
if ((rc = mdb_cursor_push(&mc, mp)))
return rc;
}
if (!IS_LEAF(mp)) {
mdb_debug("internal error, index points to a %02X page!?",
mp->mp_flags);
mc.mc_txn->mt_flags |= MDB_TXN_ERROR;
break;
case P_LEAF:
type = "leaf";
break;
case P_LEAF|P_SUBP:
type = "leaf-dupsort";
break;
case P_LEAF|P_LEAF2:
/* #MDB_DUPFIXED records */
type = "leaf-dupfixed";
break;
case P_LEAF|P_LEAF2|P_SUBP:
/* #MDB_DUPSORT sub-pages */
type = "leaf-dupfixed-dupsort";
break;
case P_META:
case P_OVERFLOW:
default:
return MDB_CORRUPTED;
}
mc.mc_flags |= C_INITIALIZED;
mc.mc_flags &= ~C_EOF;
for (align_bytes = i = 0; i < nkeys;
align_bytes += ((payload_size + align_bytes) & 1), i++) {
MDB_node *node;
rc = ctx->mw_visitor(mp->mp_p.p_pgno, 1, ctx->mw_user, dbi, 'L',
ctx->mw_txn->mt_env->me_psize - PAGEHDRSZ - SIZELEFT(mp), PAGEHDRSZ);
if (rc)
return rc;
while (mc.mc_snum > 0) {
unsigned n;
mp = mc.mc_pg[mc.mc_top];
n = NUMKEYS(mp);
if (IS_LEAF(mp)) {
if (!IS_LEAF2(mp) && !(flags & F_DUPDATA)) {
for (i = 0; i < n; i++) {
ni = NODEPTR(mp, i);
if (ni->mn_flags & F_BIGDATA) {
MDB_page *omp;
pgno_t *pg;
pg = NODEDATA(ni);
rc = mdb_page_get(ctx->mw_txn, *pg, &omp, NULL);
if (rc)
return rc;
rc = ctx->mw_visitor(*pg, omp->mp_pages, ctx->mw_user, dbi, 'L',
ctx->mw_txn->mt_env->me_psize - PAGEHDRSZ - SIZELEFT(mp), PAGEHDRSZ);
if (rc)
return rc;
} else if (ni->mn_flags & F_SUBDATA) {
MDB_db *db = NODEDATA(ni);
char* name = NULL;
if (! (ni->mn_flags & F_DUPDATA)) {
name = NODEKEY(ni);
int namelen = (char*) db - name;
name = memcpy(alloca(namelen + 1), name, namelen);
name[namelen] = 0;
}
rc = mdb_env_walk(ctx, (name && name[0]) ? name : dbi, db->md_root, ni->mn_flags & F_DUPDATA, deep + 1);
if (rc)
return rc;
}
}
}
} else {
mc.mc_ki[mc.mc_top]++;
if (mc.mc_ki[mc.mc_top] < n) {
pgno_t pg;
do {
ni = NODEPTR(mp, mc.mc_ki[mc.mc_top]);
pg = NODEPGNO(ni);
rc = mdb_page_get(ctx->mw_txn, pg, &mp, NULL);
if (rc)
return rc;
rc = ctx->mw_visitor(pg, 1, ctx->mw_user, dbi, IS_BRANCH(mp) ? 'B' : 'L',
ctx->mw_txn->mt_env->me_psize - PAGEHDRSZ - SIZELEFT(mp), PAGEHDRSZ);
if (rc)
return rc;
mc.mc_top++;
mc.mc_snum++;
mc.mc_ki[mc.mc_top] = 0;
mc.mc_pg[mc.mc_top] = mp;
}
/* Whenever we advance to a sibling branch page,
* we must proceed all the way down to its first leaf.
*/
while (IS_BRANCH(mp));
continue;
}
if (IS_LEAF2(mp)) {
/* LEAF2 pages have no mp_ptrs[] or node headers */
payload_size += mp->mp_ksize;
continue;
}
if (! mc.mc_top)
break;
node = NODEPTR(mp, i);
payload_size += NODESIZE + node->mn_ksize;
mdb_cursor_pop(&mc);
if (IS_BRANCH(mp)) {
rc = mdb_env_walk(ctx, dbi, NODEPGNO(node), flags, deep);
if (rc)
return rc;
continue;
}
assert(IS_LEAF(mp));
if (node->mn_ksize < 1)
return MDB_CORRUPTED;
if (node->mn_flags & F_BIGDATA) {
MDB_page *omp;
pgno_t *opg;
size_t over_header, over_payload, over_unused;
payload_size += sizeof(pgno_t);
opg = NODEDATA(node);
rc = mdb_page_get(ctx->mw_txn, *opg, &omp, NULL);
if (rc)
return rc;
if (*opg != omp->mp_p.p_pgno)
return MDB_CORRUPTED;
/* LY: Don't use mask here, e.g bitwise (P_BRANCH|P_LEAF|P_LEAF2|P_META|P_OVERFLOW|P_SUBP).
* Pages should not me marked dirty/loose or otherwise. */
if (P_OVERFLOW != omp->mp_flags)
return MDB_CORRUPTED;
over_header = PAGEHDRSZ;
over_payload = NODEDSZ(node);
over_unused = omp->mp_pages * ctx->mw_txn->mt_env->me_psize
- over_payload - over_header;
rc = ctx->mw_visitor(*opg, omp->mp_pages, ctx->mw_user, dbi, "overflow-data",
over_payload, over_header, over_unused);
if (rc)
return rc;
continue;
}
payload_size += NODEDSZ(node);
if (node->mn_flags & F_SUBDATA) {
MDB_db *db = NODEDATA(node);
char* name = NULL;
if (NODEDSZ(node) < 1)
return MDB_CORRUPTED;
if (! (node->mn_flags & F_DUPDATA)) {
name = NODEKEY(node);
int namelen = (char*) db - name;
name = memcpy(alloca(namelen + 1), name, namelen);
name[namelen] = 0;
}
rc = mdb_env_walk(ctx, (name && name[0]) ? name : dbi,
db->md_root, node->mn_flags & F_DUPDATA, deep + 1);
if (rc)
return rc;
}
}
return rc;
return ctx->mw_visitor(mp->mp_p.p_pgno, 1, ctx->mw_user, dbi,
type, payload_size, header_size, unused_size + align_bytes);
}
int ESECT
mdb_env_pgwalk(MDB_txn *txn, MDB_pgwalk_func* visitor, void* user)
mdb_env_pgwalk(MDB_txn *txn, MDB_pgvisitor_func* visitor, void* user)
{
mdb_walk_ctx_t ctx;
int rc;
@@ -9950,13 +9936,14 @@ mdb_env_pgwalk(MDB_txn *txn, MDB_pgwalk_func* visitor, void* user)
ctx.mw_user = user;
ctx.mw_visitor = visitor;
rc = visitor(0, 2, user, "meta", 'M', sizeof(MDB_meta), PAGEHDRSZ);
if (! rc)
rc = visitor(0, 2, user, "lmdb", "meta", sizeof(MDB_meta)*2, PAGEHDRSZ*2,
(txn->mt_env->me_psize - sizeof(MDB_meta) - PAGEHDRSZ) *2);
if (! rc && txn->mt_dbs[FREE_DBI].md_root != P_INVALID)
rc = mdb_env_walk(&ctx, "free", txn->mt_dbs[FREE_DBI].md_root, 0, 0);
if (! rc)
if (! rc && txn->mt_dbs[MAIN_DBI].md_root != P_INVALID)
rc = mdb_env_walk(&ctx, "main", txn->mt_dbs[MAIN_DBI].md_root, 0, 0);
if (! rc)
rc = visitor(P_INVALID, 0, user, NULL, 0, -1, 0);
rc = visitor(P_INVALID, 0, user, NULL, NULL, -1, 0, 0);
return rc;
}