lmdb: MDB_LIFORECLAIM & MDB_COALESCE modes.

Reclaim FreeDB in LIFO order - this is a main feature.
Also aim to coalesce small FreeDFB records.

Change-Id: I76aa062ef59359616e5b697233ec47cbd14f43cd
This commit is contained in:
Leo Yuriev 2015-01-01 16:03:34 +03:00
parent 9eedc88441
commit f00d2cdef6
2 changed files with 339 additions and 78 deletions

10
lmdb.h
View File

@ -292,6 +292,10 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
#define MDB_NORDAHEAD 0x800000
/** don't initialize malloc'd memory before writing to datafile */
#define MDB_NOMEMINIT 0x1000000
/** aim to coalesce FreeDB records */
#define MDB_COALESCE 0x2000000
/** LIFO policy for reclaiming FreeDB records */
#define MDB_LIFORECLAIM 0x4000000
/** @} */
/** @defgroup mdb_dbi_open Database Flags
@ -593,6 +597,12 @@ int mdb_env_create(MDB_env **env);
* caller is expected to overwrite all of the memory that was
* reserved in that case.
* This flag may be changed at any time using #mdb_env_set_flags().
* <li>#MDB_COALESCE
* Aim to coalesce records while reclaiming FreeDB.
* This flag may be changed at any time using #mdb_env_set_flags().
* <li>#MDB_LIFORECLAIM
* LIFO policy for reclaiming FreeDB records. This significantly reduce
* write IPOS in case MDB_NOSYNC with periodically checkpoints.
* </ul>
* @param[in] mode The UNIX permissions to set on created files. This parameter
* is ignored on Windows.

335
mdb.c
View File

@ -992,6 +992,8 @@ struct MDB_txn {
*/
txnid_t mt_txnid;
MDB_env *mt_env; /**< the DB environment */
/** The list of reclaimed txns from freeDB */
MDB_IDL mt_lifo_reclaimed;
/** The list of pages that became unused during this transaction.
*/
MDB_IDL mt_free_pgs;
@ -1100,6 +1102,7 @@ struct MDB_cursor {
#define C_DEL 0x08 /**< last op was a cursor_del */
#define C_SPLITTING 0x20 /**< Cursor is in page_split */
#define C_UNTRACK 0x40 /**< Un-track cursor when closing */
#define C_RECLAIMING 0x80 /**< FreeDB lookup is prohibited */
/** @} */
unsigned int mc_flags; /**< @ref mdb_cursor */
MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */
@ -2056,31 +2059,25 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
* will always be satisfied by a single contiguous chunk of memory.
* @return 0 on success, non-zero on failure.
*/
static int
mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
{
#ifdef MDB_PARANOID /* Seems like we can ignore this now */
/* Get at most <Max_retries> more freeDB records once me_pghead
* has enough pages. If not enough, use new pages from the map.
* If <Paranoid> and mc is updating the freeDB, only get new
* records if me_pghead is empty. Then the freelist cannot play
* catch-up with itself by growing while trying to save it.
*/
enum { Paranoid = 1, Max_retries = 500 };
#else
enum { Paranoid = 0, Max_retries = INT_MAX /*infinite*/ };
#endif
int rc, retry = num * 60;
MDB_txn *txn = mc->mc_txn;
MDB_env *env = txn->mt_env;
pgno_t pgno, *mop = env->me_pghead;
unsigned i, j, mop_len = mop ? mop[0] : 0, n2 = num-1;
unsigned i = 0, j, mop_len = mop ? mop[0] : 0, n2 = num-1;
MDB_page *np;
txnid_t oldest = 0, last;
txnid_t oldest = 0, last = 0;
MDB_cursor_op op;
MDB_cursor m2;
int found_old = 0;
unsigned enought = env->me_maxfree_1pg / 2;
/* mp == NULL when mdb_freelist_save() force reclaim to
* get one more id for saving list of pages. */
if (mp) {
/* If there are any loose pages, just use them */
if (num == 1 && txn->mt_loose_pgs) {
np = txn->mt_loose_pgs;
@ -2093,6 +2090,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
}
*mp = NULL;
}
/* If our dirty list is already full, we can't do anything */
if (txn->mt_dirty_room == 0) {
@ -2100,8 +2098,13 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
goto fail;
}
int coalesce = (env->me_flags & MDB_COALESCE) ? 1 : 0;
if (coalesce && env->me_pgoldest == 0)
coalesce = 2;
const int lifo = (env->me_flags & MDB_LIFORECLAIM) != 0;
oomkick_retry:;
for (op = MDB_FIRST;; op = MDB_NEXT) {
for (op = MDB_FIRST;; op = lifo ? MDB_PREV : MDB_NEXT) {
MDB_val key, data;
MDB_node *leaf;
pgno_t *idl;
@ -2109,7 +2112,7 @@ oomkick_retry:;
/* Seek a big enough contiguous page range. Prefer
* pages at the tail, just truncating the list.
*/
if (mop_len > n2) {
if (mp && mop_len > n2 && (! coalesce || op == MDB_FIRST)) {
i = mop_len;
do {
pgno = mop[i];
@ -2122,23 +2125,42 @@ oomkick_retry:;
if (op == MDB_FIRST) { /* 1st iteration */
/* Prepare to fetch more and coalesce */
last = env->me_pglast;
if (mc->mc_flags & C_RECLAIMING) {
/* If mc is updating the freeDB, then the freelist cannot play
* catch-up with itself by growing while trying to save it.
*/
break;
}
oldest = env->me_pgoldest;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
if (last) {
if (lifo) {
if (env->me_pglast > 1) {
/* Continue lookup from env->me_pglast */
last = env->me_pglast - 1;
op = MDB_SET_RANGE;
key.mv_data = &last; /* will look up last+1 */
} else {
oldest = mdb_find_oldest(txn);
env->me_pgoldest = oldest;
found_old = 1;
/* Begin from oldest reader if any */
if (oldest > 2) {
last = oldest - 1;
op = MDB_SET_RANGE;
}
}
} else if (env->me_pglast) {
/* Continue lookup from env->me_pglast */
last = env->me_pglast;
op = MDB_SET_RANGE;
}
key.mv_data = &last;
key.mv_size = sizeof(last);
}
if (Paranoid && mc->mc_dbi == FREE_DBI)
retry = -1;
}
if (Paranoid && retry < 0 && mop_len)
break;
last++;
if (! lifo) {
/* Do not fetch more if the record will be too recent */
if (oldest <= last) {
if (op != MDB_FIRST && ++last >= oldest) {
if (!found_old) {
oldest = mdb_find_oldest(txn);
env->me_pgoldest = oldest;
@ -2147,12 +2169,29 @@ oomkick_retry:;
if (oldest <= last)
break;
}
}
rc = mdb_cursor_get(&m2, &key, NULL, op);
if (rc == MDB_NOTFOUND && lifo) {
if (op == MDB_SET_RANGE)
continue;
env->me_pgoldest = mdb_find_oldest(txn);
found_old = 1;
if (oldest < env->me_pgoldest) {
oldest = env->me_pgoldest;
last = oldest - 1;
key.mv_data = &last;
key.mv_size = sizeof(last);
op = MDB_SET_RANGE;
rc = mdb_cursor_get(&m2, &key, NULL, op);
}
}
if (rc) {
if (rc == MDB_NOTFOUND)
break;
goto fail;
}
last = *(txnid_t*)key.mv_data;
if (oldest <= last) {
if (!found_old) {
@ -2160,15 +2199,38 @@ oomkick_retry:;
env->me_pgoldest = oldest;
found_old = 1;
}
if (oldest <= last)
if (oldest <= last) {
if (lifo)
continue;
break;
}
}
if (lifo) {
if (txn->mt_lifo_reclaimed) {
for(i = txn->mt_lifo_reclaimed[0]; i > 0; --i)
if (txn->mt_lifo_reclaimed[i] == last)
break;
if (i)
continue;
}
}
np = m2.mc_pg[m2.mc_top];
leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]);
if ((rc = mdb_node_read(txn, leaf, &data)) != MDB_SUCCESS)
return rc;
if (lifo && !txn->mt_lifo_reclaimed) {
txn->mt_lifo_reclaimed = mdb_midl_alloc(env->me_maxfree_1pg);
if (!txn->mt_lifo_reclaimed) {
rc = ENOMEM;
goto fail;
}
}
idl = (MDB_ID *) data.mv_data;
mdb_tassert(txn, idl[0] == 0 || data.mv_size == (idl[0] + 1) * sizeof(MDB_ID));
i = idl[0];
if (!mop) {
if (!(env->me_pghead = mop = mdb_midl_alloc(i))) {
@ -2180,6 +2242,10 @@ oomkick_retry:;
goto fail;
mop = env->me_pghead;
}
if (lifo) {
if ((rc = mdb_midl_append(&txn->mt_lifo_reclaimed, last)) != 0)
goto fail;
}
env->me_pglast = last;
#if (MDB_DEBUG) > 1
DPRINTF(("IDL read txn %"Z"u root %"Z"u num %u",
@ -2190,6 +2256,31 @@ oomkick_retry:;
/* Merge in descending sorted order */
mdb_midl_xmerge(mop, idl);
mop_len = mop[0];
if (! mp) {
/* force reclaim mode */
return 0;
}
/* Don't try to coalesce too much. */
if (mop_len > MDB_IDL_UM_SIZE / 2)
break;
if (coalesce == 1 && (mop_len >= enought || i >= enought / 2))
coalesce = 0;
}
if (! mp) {
/* force reclaim mode */
return MDB_NOTFOUND;
}
if (mop_len > n2 && coalesce) {
i = mop_len;
do {
pgno = mop[i];
if (mop[i-n2] == pgno+n2)
goto search_done;
} while (--i > n2);
}
/* Use new pages from the map when nothing suitable in the freeDB */
@ -2197,7 +2288,7 @@ oomkick_retry:;
pgno = txn->mt_next_pgno;
if (pgno + num > env->me_maxpg) {
DPUTS("DB size maxed out");
if (mdb_oomkick_laggard(env))
if ((mc->mc_flags & C_RECLAIMING) == 0 && mdb_oomkick_laggard(env))
goto oomkick_retry;
rc = MDB_MAP_FULL;
goto fail;
@ -2766,6 +2857,8 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_free_pgs = env->me_free_pgs;
txn->mt_free_pgs[0] = 0;
txn->mt_spill_pgs = NULL;
if (txn->mt_lifo_reclaimed)
txn->mt_lifo_reclaimed[0] = 0;
env->me_txn = txn;
memcpy(txn->mt_dbiseqs, env->me_dbiseqs, env->me_maxdbs * sizeof(unsigned int));
}
@ -2870,6 +2963,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
txn->mt_dbiseqs = (unsigned int *)(txn->mt_cursors + env->me_maxdbs);
txn->mt_dbflags = (unsigned char *)(txn->mt_dbiseqs + env->me_maxdbs);
}
txn->mt_dbxs = env->me_dbxs;
}
txn->mt_env = env;
@ -3025,6 +3119,14 @@ mdb_txn_reset0(MDB_txn *txn, const char *act)
mdb_dlist_free(txn);
}
if (txn->mt_lifo_reclaimed) {
txn->mt_lifo_reclaimed[0] = 0;
if (txn != env->me_txn0) {
mdb_midl_free(txn->mt_lifo_reclaimed);
txn->mt_lifo_reclaimed = NULL;
}
}
if (!txn->mt_parent) {
if (mdb_midl_shrink(&txn->mt_free_pgs))
env->me_free_pgs = txn->mt_free_pgs;
@ -3095,10 +3197,12 @@ mdb_freelist_save(MDB_txn *txn)
txnid_t pglast = 0, head_id = 0;
pgno_t freecnt = 0, *free_pgs, *mop;
ssize_t head_room = 0, total_room = 0, mop_len, clean_limit;
unsigned cleanup_idx = 0, refill_idx = 0;
const int lifo = (env->me_flags & MDB_LIFORECLAIM) != 0;
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
if (env->me_pghead) {
if (! lifo && env->me_pghead) {
/* Make sure first page of freeDB is touched and on freelist */
rc = mdb_page_search(&mc, NULL, MDB_PS_FIRST|MDB_PS_MODIFY);
if (rc && rc != MDB_NOTFOUND)
@ -3128,20 +3232,59 @@ mdb_freelist_save(MDB_txn *txn)
pgno_t *pgs;
ssize_t j;
if (! lifo) {
/* If using records from freeDB which we have not yet
* deleted, delete them and any we reserved for me_pghead.
*/
while (pglast < env->me_pglast) {
/* The great answer is 42, and seems to be enough to prevent search in
* mdb_page_alloc() during a deleting, when freeDB tree is unbalanced. */
while (!env->me_pghead || env->me_pghead[0] < 42) {
if (mdb_page_alloc(&mc, 0, NULL))
break;
}
rc = mdb_cursor_first(&mc, &key, NULL);
if (rc)
return rc;
pglast = head_id = *(txnid_t *)key.mv_data;
total_room = head_room = 0;
more = 1;
mdb_tassert(txn, pglast <= env->me_pglast);
mc.mc_flags |= C_RECLAIMING;
rc = mdb_cursor_del(&mc, 0);
mc.mc_flags &= ~C_RECLAIMING;
if (rc)
return rc;
}
} else if (txn->mt_lifo_reclaimed) {
again:
/* LY: cleanup reclaimed records. */
while(cleanup_idx < txn->mt_lifo_reclaimed[0]) {
pglast = txn->mt_lifo_reclaimed[++cleanup_idx];
key.mv_data = &pglast;
key.mv_size = sizeof(pglast);
/* The great answer is 42, and seems to be enough to prevent search in
* mdb_page_alloc() during a deleting, when freeDB tree is unbalanced. */
while (!env->me_pghead || env->me_pghead[0] < 42) {
if (mdb_page_alloc(&mc, 0, NULL)) {
rc = mdb_page_search(&mc, &key, MDB_PS_MODIFY);
if (rc && rc != MDB_NOTFOUND)
goto ballout;
break;
}
}
rc = mdb_cursor_get(&mc, &key, NULL, MDB_SET);
if (rc != MDB_NOTFOUND) {
if (rc)
goto ballout;
mc.mc_flags |= C_RECLAIMING;
rc = mdb_cursor_del(&mc, 0);
mc.mc_flags &= ~C_RECLAIMING;
if (rc)
goto ballout;
}
}
}
/* Save the IDL of pages freed by this txn, to a single record */
if (freecnt < txn->mt_free_pgs[0]) {
@ -3149,7 +3292,7 @@ mdb_freelist_save(MDB_txn *txn)
/* Make sure last page of freeDB is touched and on freelist */
rc = mdb_page_search(&mc, NULL, MDB_PS_LAST|MDB_PS_MODIFY);
if (rc && rc != MDB_NOTFOUND)
return rc;
goto ballout;
}
free_pgs = txn->mt_free_pgs;
/* Write to last page of freeDB */
@ -3160,7 +3303,7 @@ mdb_freelist_save(MDB_txn *txn)
data.mv_size = MDB_IDL_SIZEOF(free_pgs);
rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
return rc;
goto ballout;
/* Retry if mt_free_pgs[] grew during the Put() */
free_pgs = txn->mt_free_pgs;
} while (freecnt < free_pgs[0]);
@ -3181,6 +3324,9 @@ mdb_freelist_save(MDB_txn *txn)
mop = env->me_pghead;
mop_len = (mop ? mop[0] : 0) + txn->mt_loose_count;
if (mop_len && refill_idx == 0)
refill_idx = 1;
/* Reserve records for me_pghead[]. Split it if multi-page,
* to avoid searching freeDB for a page range. Use keys in
* range [1,me_pglast]: Smaller than txnid of oldest reader.
@ -3191,8 +3337,46 @@ mdb_freelist_save(MDB_txn *txn)
} else if (head_room >= maxfree_1pg && head_id > 1) {
/* Keep current record (overflow page), add a new one */
head_id--;
refill_idx++;
head_room = 0;
}
if (lifo) {
if (refill_idx > (txn->mt_lifo_reclaimed ? txn->mt_lifo_reclaimed[0] : 0)) {
/* LY: need a more txn-id for save page list. */
rc = mdb_page_alloc(&mc, 0, NULL);
if (rc == 0)
/* LY: ок, reclaimed from freedb. */
continue;
if (rc != MDB_NOTFOUND)
/* LY: other troubles... */
goto ballout;
/* LY: freedb is empty, will look any free txn-id in high2low order. */
if (env->me_pglast < 1) {
/* LY: not any txn in the past of freedb. */
rc = MDB_MAP_FULL;
goto ballout;
}
if (! txn->mt_lifo_reclaimed) {
txn->mt_lifo_reclaimed = mdb_midl_alloc(env->me_maxfree_1pg);
if (! txn->mt_lifo_reclaimed) {
rc = ENOMEM;
goto ballout;
}
}
/* LY: append the list. */
rc = mdb_midl_append(&txn->mt_lifo_reclaimed, env->me_pglast - 1);
if (rc)
goto ballout;
--env->me_pglast;
/* LY: note that freeDB cleanup is not needed. */
++cleanup_idx;
}
head_id = txn->mt_lifo_reclaimed[refill_idx];
}
/* (Re)write {key = head_id, IDL length = head_room} */
total_room -= head_room;
head_room = mop_len - total_room;
@ -3203,13 +3387,14 @@ mdb_freelist_save(MDB_txn *txn)
} else if (head_room < 0) {
/* Rare case, not bothering to delete this record */
head_room = 0;
continue;
}
key.mv_size = sizeof(head_id);
key.mv_data = &head_id;
data.mv_size = (head_room + 1) * sizeof(pgno_t);
rc = mdb_cursor_put(&mc, &key, &data, MDB_RESERVE);
if (rc)
return rc;
goto ballout;
/* IDL is initially empty, zero out at least the length */
pgs = (pgno_t *)data.mv_data;
j = head_room > clean_limit ? head_room : 0;
@ -3219,6 +3404,8 @@ mdb_freelist_save(MDB_txn *txn)
total_room += head_room;
}
mdb_tassert(txn, cleanup_idx == (txn->mt_lifo_reclaimed ? txn->mt_lifo_reclaimed[0] : 0));
/* Return loose page numbers to me_pghead, though usually none are
* left at this point. The pages themselves remain in dirty_list.
*/
@ -3228,7 +3415,7 @@ mdb_freelist_save(MDB_txn *txn)
MDB_IDL loose;
/* Room for loose pages + temp IDL with same */
if ((rc = mdb_midl_need(&env->me_pghead, 2*count+1)) != 0)
return rc;
goto ballout;
mop = env->me_pghead;
loose = mop + MDB_IDL_ALLOCLEN(mop) - count;
for (count = 0; mp; mp = NEXT_LOOSE_PAGE(mp))
@ -3247,27 +3434,75 @@ mdb_freelist_save(MDB_txn *txn)
MDB_val key, data;
mop += mop_len;
if (! lifo) {
rc = mdb_cursor_first(&mc, &key, &data);
for (; !rc; rc = mdb_cursor_next(&mc, &key, &data, MDB_NEXT)) {
txnid_t id = *(txnid_t *)key.mv_data;
ssize_t len = (ssize_t)(data.mv_size / sizeof(MDB_ID)) - 1;
if (rc)
goto ballout;
}
for(;;) {
txnid_t id;
ssize_t len;
MDB_ID save;
mdb_tassert(txn, len >= 0 && id <= env->me_pglast);
if (! lifo) {
id = *(txnid_t *)key.mv_data;
mdb_tassert(txn, id <= env->me_pglast);
} else {
mdb_tassert(txn, refill_idx > 0 && refill_idx <= txn->mt_lifo_reclaimed[0]);
id = txn->mt_lifo_reclaimed[refill_idx--];
key.mv_data = &id;
if (len > mop_len) {
key.mv_size = sizeof(id);
rc = mdb_cursor_get(&mc, &key, &data, MDB_SET);
if (rc)
goto ballout;
}
mdb_tassert(txn, cleanup_idx == (txn->mt_lifo_reclaimed ? txn->mt_lifo_reclaimed[0] : 0));
len = (ssize_t)(data.mv_size / sizeof(MDB_ID)) - 1;
mdb_tassert(txn, len >= 0);
if (len > mop_len)
len = mop_len;
data.mv_size = (len + 1) * sizeof(MDB_ID);
}
key.mv_data = &id;
key.mv_size = sizeof(id);
data.mv_data = mop -= len;
save = mop[0];
mop[0] = len;
rc = mdb_cursor_put(&mc, &key, &data, MDB_CURRENT);
mdb_tassert(txn, cleanup_idx == (txn->mt_lifo_reclaimed ? txn->mt_lifo_reclaimed[0] : 0));
mop[0] = save;
if (rc || !(mop_len -= len))
break;
if (rc || (mop_len -= len) == 0)
goto ballout;
if (! lifo) {
rc = mdb_cursor_next(&mc, &key, &data, MDB_NEXT);
if (rc)
goto ballout;
}
}
}
ballout:
if (txn->mt_lifo_reclaimed) {
mdb_tassert(txn, rc || cleanup_idx == txn->mt_lifo_reclaimed[0]);
if (rc == 0 && cleanup_idx != txn->mt_lifo_reclaimed[0]) {
mdb_tassert(txn, cleanup_idx < txn->mt_lifo_reclaimed[0]);
/* LY: zeroed cleanup_idx to force cleanup & refill created freeDB records. */
cleanup_idx = 0;
/* LY: restart filling */
refill_idx = total_room = head_room = 0;
more = 1;
goto again;
}
txn->mt_lifo_reclaimed[0] = 0;
if (txn != env->me_txn0) {
mdb_midl_free(txn->mt_lifo_reclaimed);
txn->mt_lifo_reclaimed = NULL;
}
}
return rc;
}
@ -3460,6 +3695,18 @@ mdb_txn_commit(MDB_txn *txn)
MDB_IDL pspill;
unsigned x, y, len, ps_len;
/* Append our reclaim list to parent's */
if (txn->mt_lifo_reclaimed) {
if (parent->mt_lifo_reclaimed) {
rc = mdb_midl_append_list(&parent->mt_lifo_reclaimed, txn->mt_lifo_reclaimed);
if (rc)
goto fail;
mdb_midl_free(txn->mt_lifo_reclaimed);
} else
parent->mt_lifo_reclaimed = txn->mt_lifo_reclaimed;
txn->mt_lifo_reclaimed = NULL;
}
/* Append our free list to parent's */
rc = mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs);
if (rc)
@ -4707,9 +4954,10 @@ fail:
* at runtime. Changing other flags requires closing the
* environment and re-opening it with the new flags.
*/
#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC|MDB_NOMEMINIT)
#define CHANGEABLE (MDB_NOSYNC|MDB_NOMETASYNC|MDB_MAPASYNC| \
MDB_NOMEMINIT|MDB_COALESCE)
#define CHANGELESS (MDB_FIXEDMAP|MDB_NOSUBDIR|MDB_RDONLY| \
MDB_WRITEMAP|MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD)
MDB_WRITEMAP|MDB_NOTLS|MDB_NOLOCK|MDB_NORDAHEAD|MDB_LIFORECLAIM)
#if VALID_FLAGS & PERSISTENT_FLAGS & (CHANGEABLE|CHANGELESS)
# error "Persistent DB flags & env flags overlap, but both go in mm_flags"
@ -4879,6 +5127,8 @@ mdb_env_close0(MDB_env *env, int excl)
free(env->me_dbxs);
free(env->me_path);
free(env->me_dirty_list);
if (env->me_txn0)
mdb_midl_free(env->me_txn0->mt_lifo_reclaimed);
free(env->me_txn0);
mdb_midl_free(env->me_free_pgs);
@ -7301,6 +7551,7 @@ mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
mc->mc_top = 0;
mc->mc_pg[0] = 0;
mc->mc_flags = 0;
mc->mc_ki[0] = 0;
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
mdb_tassert(txn, mx != NULL);
mc->mc_xcursor = mx;