mdbx: Merge branch 'devel' into 'pt' branch.

This commit is contained in:
Leo Yuriev 2016-12-15 22:05:45 +03:00
commit b4dc91d276
3 changed files with 110 additions and 84 deletions

5
lmdb.h
View File

@ -66,6 +66,11 @@
* This does not use actual memory or disk space, but users may need * This does not use actual memory or disk space, but users may need
* to understand the difference so they won't be scared off. * to understand the difference so they won't be scared off.
* *
* - An LMDB configuration will often reserve considerable \b unused
* memory address space and maybe file size for future growth.
* This does not use actual memory or disk space, but users may need
* to understand the difference so they won't be scared off.
*
* - By default, in versions before 0.9.10, unused portions of the data * - By default, in versions before 0.9.10, unused portions of the data
* file might receive garbage data from memory freed by other code. * file might receive garbage data from memory freed by other code.
* (This does not happen when using the #MDB_WRITEMAP flag.) As of * (This does not happen when using the #MDB_WRITEMAP flag.) As of

144
mdb.c
View File

@ -863,11 +863,12 @@ struct MDB_txn {
* @ingroup internal * @ingroup internal
* @{ * @{
*/ */
#define DB_DIRTY 0x01 /**< DB was modified or is DUPSORT data */ #define DB_DIRTY 0x01 /**< DB was written in this txn */
#define DB_STALE 0x02 /**< Named-DB record is older than txnID */ #define DB_STALE 0x02 /**< Named-DB record is older than txnID */
#define DB_NEW 0x04 /**< Named-DB handle opened in this txn */ #define DB_NEW 0x04 /**< Named-DB handle opened in this txn */
#define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */ #define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */
#define DB_USRVALID 0x10 /**< As #DB_VALID, but not set for #FREE_DBI */ #define DB_USRVALID 0x10 /**< As #DB_VALID, but not set for #FREE_DBI */
#define DB_DUPDATA 0x20 /**< DB is #MDB_DUPSORT data */
/** @} */ /** @} */
/** In write txns, array of cursors for each DB */ /** In write txns, array of cursors for each DB */
MDB_cursor **mt_cursors; MDB_cursor **mt_cursors;
@ -1117,7 +1118,7 @@ enum {
#define MDB_END_SLOT MDB_NOTLS /**< release any reader slot if #MDB_NOTLS */ #define MDB_END_SLOT MDB_NOTLS /**< release any reader slot if #MDB_NOTLS */
static int mdb_txn_end(MDB_txn *txn, unsigned mode); static int mdb_txn_end(MDB_txn *txn, unsigned mode);
static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp, int *lvl); static int mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **mp, int *lvl);
static int mdb_page_search_root(MDB_cursor *mc, static int mdb_page_search_root(MDB_cursor *mc,
MDB_val *key, int modify); MDB_val *key, int modify);
#define MDB_PS_MODIFY 1 #define MDB_PS_MODIFY 1
@ -1142,7 +1143,7 @@ static int mdb_node_add(MDB_cursor *mc, indx_t indx,
static void mdb_node_del(MDB_cursor *mc, int ksize); static void mdb_node_del(MDB_cursor *mc, int ksize);
static void mdb_node_shrink(MDB_page *mp, indx_t indx); static void mdb_node_shrink(MDB_page *mp, indx_t indx);
static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft); static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft);
static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data); static int mdb_node_read(MDB_cursor *mc, MDB_node *leaf, MDB_val *data);
static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
static size_t mdb_branch_size(MDB_env *env, MDB_val *key); static size_t mdb_branch_size(MDB_env *env, MDB_val *key);
@ -1550,6 +1551,7 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
/** Allocate memory for a page. /** Allocate memory for a page.
* Re-use old malloc'd pages first for singletons, otherwise just malloc. * Re-use old malloc'd pages first for singletons, otherwise just malloc.
* Set #MDB_TXN_ERROR on failure.
*/ */
static MDB_page * static MDB_page *
mdb_page_malloc(MDB_txn *txn, unsigned num) mdb_page_malloc(MDB_txn *txn, unsigned num)
@ -1722,20 +1724,16 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all)
{ {
enum { Mask = P_SUBP|P_DIRTY|P_LOOSE|P_KEEP }; enum { Mask = P_SUBP|P_DIRTY|P_LOOSE|P_KEEP };
MDB_txn *txn = mc->mc_txn; MDB_txn *txn = mc->mc_txn;
MDB_cursor *m3; MDB_cursor *m3, *m0 = mc;
MDB_xcursor *mx; MDB_xcursor *mx;
MDB_page *dp, *mp; MDB_page *dp, *mp;
MDB_node *leaf; MDB_node *leaf;
unsigned i, j; unsigned i, j;
int rc = MDB_SUCCESS, level; int rc = MDB_SUCCESS, level;
/* Mark pages seen by cursors */ /* Mark pages seen by cursors: First m0, then tracked cursors */
if (mc->mc_flags & C_UNTRACK) for (i = txn->mt_numdbs;; ) {
mc = NULL; /* will find mc in mt_cursors */ if (mc->mc_flags & C_INITIALIZED) {
for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) {
for (; mc; mc=mc->mc_next) {
if (!(mc->mc_flags & C_INITIALIZED))
continue;
for (m3 = mc;; m3 = &mx->mx_cursor) { for (m3 = mc;; m3 = &mx->mx_cursor) {
mp = NULL; mp = NULL;
for (j=0; j<m3->mc_snum; j++) { for (j=0; j<m3->mc_snum; j++) {
@ -1754,10 +1752,13 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all)
break; break;
} }
} }
mc = mc->mc_next;
for (; !mc || mc == m0; mc = txn->mt_cursors[--i])
if (i == 0) if (i == 0)
break; goto mark_done;
} }
mark_done:
if (all) { if (all) {
/* Mark dirty root pages */ /* Mark dirty root pages */
for (i=0; i<txn->mt_numdbs; i++) { for (i=0; i<txn->mt_numdbs; i++) {
@ -1765,7 +1766,7 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all)
pgno_t pgno = txn->mt_dbs[i].md_root; pgno_t pgno = txn->mt_dbs[i].md_root;
if (pgno == P_INVALID) if (pgno == P_INVALID)
continue; continue;
if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) if (unlikely((rc = mdb_page_get(m0, pgno, &dp, &level)) != MDB_SUCCESS))
break; break;
if ((dp->mp_flags & Mask) == pflags && level <= 1) if ((dp->mp_flags & Mask) == pflags && level <= 1)
dp->mp_flags ^= P_KEEP; dp->mp_flags ^= P_KEEP;
@ -2053,7 +2054,7 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
} }
/** Allocate page numbers and memory for writing. Maintain me_pglast, /** Allocate page numbers and memory for writing. Maintain me_pglast,
* me_pghead and mt_next_pgno. * me_pghead and mt_next_pgno. Set #MDB_TXN_ERROR on failure.
* *
* If there are free pages available from older transactions, they * If there are free pages available from older transactions, they
* are re-used first. Otherwise allocate a new page at mt_next_pgno. * are re-used first. Otherwise allocate a new page at mt_next_pgno.
@ -2221,7 +2222,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp, int flags)
np = m2.mc_pg[m2.mc_top]; np = m2.mc_pg[m2.mc_top];
leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]); leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]);
if (unlikely((rc = mdb_node_read(txn, leaf, &data)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(&m2, leaf, &data)) != MDB_SUCCESS))
goto fail; goto fail;
if ((flags & MDBX_LIFORECLAIM) && !txn->mt_lifo_reclaimed) { if ((flags & MDBX_LIFORECLAIM) && !txn->mt_lifo_reclaimed) {
@ -2480,6 +2481,7 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret)
} }
/** Touch a page: make it dirty and re-insert into tree with updated pgno. /** Touch a page: make it dirty and re-insert into tree with updated pgno.
* Set #MDB_TXN_ERROR on failure.
* @param[in] mc cursor pointing to the page to be touched * @param[in] mc cursor pointing to the page to be touched
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
@ -5375,7 +5377,9 @@ mdb_cursor_pop(MDB_cursor *mc)
} }
} }
/** Push a page onto the top of the cursor's stack. */ /** Push a page onto the top of the cursor's stack.
* Set #MDB_TXN_ERROR on failure.
*/
static int static int
mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) mdb_cursor_push(MDB_cursor *mc, MDB_page *mp)
{ {
@ -5395,15 +5399,17 @@ mdb_cursor_push(MDB_cursor *mc, MDB_page *mp)
} }
/** Find the address of the page corresponding to a given page number. /** Find the address of the page corresponding to a given page number.
* @param[in] txn the transaction for this access. * Set #MDB_TXN_ERROR on failure.
* @param[in] mc the cursor accessing the page.
* @param[in] pgno the page number for the page to retrieve. * @param[in] pgno the page number for the page to retrieve.
* @param[out] ret address of a pointer where the page's address will be stored. * @param[out] ret address of a pointer where the page's address will be stored.
* @param[out] lvl dirty_list inheritance level of found page. 1=current txn, 0=mapped page. * @param[out] lvl dirty_list inheritance level of found page. 1=current txn, 0=mapped page.
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
static int static int
mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl)
{ {
MDB_txn *txn = mc->mc_txn;
MDB_env *env = txn->mt_env; MDB_env *env = txn->mt_env;
MDB_page *p = NULL; MDB_page *p = NULL;
int level; int level;
@ -5497,7 +5503,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags)
mdb_cassert(mc, i < NUMKEYS(mp)); mdb_cassert(mc, i < NUMKEYS(mp));
node = NODEPTR(mp, i); node = NODEPTR(mp, i);
if (unlikely((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)) if (unlikely((rc = mdb_page_get(mc, NODEPGNO(node), &mp, NULL)) != 0))
return rc; return rc;
mc->mc_ki[mc->mc_top] = i; mc->mc_ki[mc->mc_top] = i;
@ -5539,7 +5545,7 @@ mdb_page_search_lowest(MDB_cursor *mc)
MDB_node *node = NODEPTR(mp, 0); MDB_node *node = NODEPTR(mp, 0);
int rc; int rc;
if (unlikely((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)) if (unlikely((rc = mdb_page_get(mc, NODEPGNO(node), &mp, NULL)) != 0))
return rc; return rc;
mc->mc_ki[mc->mc_top] = 0; mc->mc_ki[mc->mc_top] = 0;
@ -5591,7 +5597,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
return MDB_NOTFOUND; return MDB_NOTFOUND;
if (unlikely((leaf->mn_flags & (F_DUPDATA|F_SUBDATA)) != F_SUBDATA)) if (unlikely((leaf->mn_flags & (F_DUPDATA|F_SUBDATA)) != F_SUBDATA))
return MDB_INCOMPATIBLE; /* not a named DB */ return MDB_INCOMPATIBLE; /* not a named DB */
rc = mdb_node_read(mc->mc_txn, leaf, &data); rc = mdb_node_read(&mc2, leaf, &data);
if (rc) if (rc)
return rc; return rc;
memcpy(&flags, ((char *) data.mv_data + offsetof(MDB_db, md_flags)), memcpy(&flags, ((char *) data.mv_data + offsetof(MDB_db, md_flags)),
@ -5615,7 +5621,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags)
mdb_cassert(mc, root > 1); mdb_cassert(mc, root > 1);
if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root) if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root)
if (unlikely((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0], NULL)) != 0)) if (unlikely((rc = mdb_page_get(mc, root, &mc->mc_pg[0], NULL)) != 0))
return rc; return rc;
mc->mc_snum = 1; mc->mc_snum = 1;
@ -5712,13 +5718,13 @@ release:
} }
/** Return the data associated with a given node. /** Return the data associated with a given node.
* @param[in] txn The transaction for this operation. * @param[in] mc The cursor for this operation.
* @param[in] leaf The node being read. * @param[in] leaf The node being read.
* @param[out] data Updated to point to the node's data. * @param[out] data Updated to point to the node's data.
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
*/ */
static MDBX_INLINE int static MDBX_INLINE int
mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data) mdb_node_read(MDB_cursor *mc, MDB_node *leaf, MDB_val *data)
{ {
MDB_page *omp; /* overflow page */ MDB_page *omp; /* overflow page */
pgno_t pgno; pgno_t pgno;
@ -5734,7 +5740,7 @@ mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
*/ */
data->mv_size = NODEDSZ(leaf); data->mv_size = NODEDSZ(leaf);
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
if (unlikely((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0)) { if (unlikely((rc = mdb_page_get(mc, pgno, &omp, NULL)) != 0)) {
mdb_debug("read overflow page %zu failed", pgno); mdb_debug("read overflow page %zu failed", pgno);
return rc; return rc;
} }
@ -5815,7 +5821,7 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right)
mdb_cassert(mc, IS_BRANCH(mc->mc_pg[mc->mc_top])); mdb_cassert(mc, IS_BRANCH(mc->mc_pg[mc->mc_top]));
indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (unlikely((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL)) != 0)) { if (unlikely((rc = mdb_page_get(mc, NODEPGNO(indx), &mp, NULL)) != 0)) {
/* mc will be inconsistent if caller does mc_snum++ as above */ /* mc will be inconsistent if caller does mc_snum++ as above */
mc->mc_flags &= ~(C_INITIALIZED|C_EOF); mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
return rc; return rc;
@ -5898,7 +5904,7 @@ skip:
mdb_xcursor_init1(mc, leaf); mdb_xcursor_init1(mc, leaf);
} }
if (data) { if (data) {
if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS))
return rc; return rc;
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
@ -5981,7 +5987,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
mdb_xcursor_init1(mc, leaf); mdb_xcursor_init1(mc, leaf);
} }
if (data) { if (data) {
if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS))
return rc; return rc;
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
@ -6170,7 +6176,7 @@ set1:
} }
} else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) { } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) {
MDB_val olddata; MDB_val olddata;
if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, &olddata)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(mc, leaf, &olddata)) != MDB_SUCCESS))
return rc; return rc;
rc = mc->mc_dbx->md_dcmp(data, &olddata); rc = mc->mc_dbx->md_dcmp(data, &olddata);
if (rc) { if (rc) {
@ -6183,7 +6189,7 @@ set1:
} else { } else {
if (mc->mc_xcursor) if (mc->mc_xcursor)
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS))
return rc; return rc;
} }
} }
@ -6232,7 +6238,7 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
} else { } else {
if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS))
return rc; return rc;
} }
} }
@ -6277,7 +6283,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
} else { } else {
if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS))
return rc; return rc;
} }
} }
@ -6326,7 +6332,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT); rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT);
} else { } else {
rc = mdb_node_read(mc->mc_txn, leaf, data); rc = mdb_node_read(mc, leaf, data);
} }
} }
} }
@ -6443,7 +6449,7 @@ fetchm:
MDB_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); MDB_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
MDB_GET_KEY(leaf, key); MDB_GET_KEY(leaf, key);
rc = mdb_node_read(mc->mc_txn, leaf, data); rc = mdb_node_read(mc, leaf, data);
break; break;
} }
} }
@ -6480,7 +6486,8 @@ mdb_cursor_touch(MDB_cursor *mc)
{ {
int rc = MDB_SUCCESS; int rc = MDB_SUCCESS;
if (mc->mc_dbi >= CORE_DBS && !(*mc->mc_dbflag & DB_DIRTY)) { if (mc->mc_dbi >= CORE_DBS && !(*mc->mc_dbflag & (DB_DIRTY|DB_DUPDATA))) {
/* Touch DB record of named DB */
MDB_cursor mc2; MDB_cursor mc2;
MDB_xcursor mcx; MDB_xcursor mcx;
if (TXN_DBI_CHANGED(mc->mc_txn, mc->mc_dbi)) if (TXN_DBI_CHANGED(mc->mc_txn, mc->mc_dbi))
@ -6843,7 +6850,7 @@ current:
int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
memcpy(&pg, olddata.mv_data, sizeof(pg)); memcpy(&pg, olddata.mv_data, sizeof(pg));
if (unlikely((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0)) if (unlikely((rc2 = mdb_page_get(mc, pg, &omp, &level)) != 0))
return rc2; return rc2;
ovpages = omp->mp_pages; ovpages = omp->mp_pages;
@ -6877,13 +6884,8 @@ current:
* parent txn, in case the user peeks at MDB_RESERVEd * parent txn, in case the user peeks at MDB_RESERVEd
* or unused parts. Some users treat ovpages specially. * or unused parts. Some users treat ovpages specially.
*/ */
#if MDBX_MODE_ENABLED
/* LY: New page will contain only header from origin,
* but no any payload */
memcpy(np, omp, PAGEHDRSZ);
#else
size_t sz = (size_t) env->me_psize * ovpages, off; size_t sz = (size_t) env->me_psize * ovpages, off;
if (!(flags & MDB_RESERVE)) { if (MDBX_MODE_ENABLED || !(flags & MDB_RESERVE)) {
/* Skip the part where LMDB will put *data. /* Skip the part where LMDB will put *data.
* Copy end of page, adjusting alignment so * Copy end of page, adjusting alignment so
* compiler may copy words instead of bytes. * compiler may copy words instead of bytes.
@ -6894,7 +6896,6 @@ current:
sz = PAGEHDRSZ; sz = PAGEHDRSZ;
} }
memcpy(np, omp, sz); /* Copy whole or header of page */ memcpy(np, omp, sz); /* Copy whole or header of page */
#endif /* MDBX_MODE_ENABLED */
omp = np; omp = np;
} }
SETDSZ(leaf, data->mv_size); SETDSZ(leaf, data->mv_size);
@ -7154,7 +7155,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned flags)
pgno_t pg; pgno_t pg;
memcpy(&pg, NODEDATA(leaf), sizeof(pg)); memcpy(&pg, NODEDATA(leaf), sizeof(pg));
if (unlikely((rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL)) || if (unlikely((rc = mdb_page_get(mc, pg, &omp, NULL)) ||
(rc = mdb_ovpage_free(mc, omp)))) (rc = mdb_ovpage_free(mc, omp))))
goto fail; goto fail;
} }
@ -7168,6 +7169,7 @@ fail:
} }
/** Allocate and initialize new pages for a database. /** Allocate and initialize new pages for a database.
* Set #MDB_TXN_ERROR on failure.
* @param[in] mc a cursor on the database being added to. * @param[in] mc a cursor on the database being added to.
* @param[in] flags flags defining what type of page is being allocated. * @param[in] flags flags defining what type of page is being allocated.
* @param[in] num the number of pages to allocate. This is usually 1, * @param[in] num the number of pages to allocate. This is usually 1,
@ -7253,6 +7255,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key)
} }
/** Add a node to the page pointed to by the cursor. /** Add a node to the page pointed to by the cursor.
* Set #MDB_TXN_ERROR on failure.
* @param[in] mc The cursor for this operation. * @param[in] mc The cursor for this operation.
* @param[in] indx The index on the page where the new node should be added. * @param[in] indx The index on the page where the new node should be added.
* @param[in] key The key for the new node. * @param[in] key The key for the new node.
@ -7563,7 +7566,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
} }
mdb_debug("Sub-db -%u root page %zu", mx->mx_cursor.mc_dbi, mdb_debug("Sub-db -%u root page %zu", mx->mx_cursor.mc_dbi,
mx->mx_db.md_root); mx->mx_db.md_root);
mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DUPDATA;
/* #if UINT_MAX < SIZE_MAX /* #if UINT_MAX < SIZE_MAX
if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t))
mx->mx_dbx.md_cmp = mdb_cmp_clong; mx->mx_dbx.md_cmp = mdb_cmp_clong;
@ -7589,7 +7592,7 @@ mdb_xcursor_init2(MDB_cursor *mc, MDB_xcursor *src_mx, int new_dupdata)
mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_top = 0;
mx->mx_cursor.mc_flags |= C_INITIALIZED; mx->mx_cursor.mc_flags |= C_INITIALIZED;
mx->mx_cursor.mc_ki[0] = 0; mx->mx_cursor.mc_ki[0] = 0;
mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DUPDATA;
#if UINT_MAX < SIZE_MAX #if UINT_MAX < SIZE_MAX
mx->mx_dbx.md_cmp = src_mx->mx_dbx.md_cmp; mx->mx_dbx.md_cmp = src_mx->mx_dbx.md_cmp;
#endif #endif
@ -7737,7 +7740,10 @@ mdb_cursor_close(MDB_cursor *mc)
if (mc) { if (mc) {
mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE); mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE);
if (!mc->mc_backup) { if (!mc->mc_backup) {
/* remove from txn, if tracked */ /* Remove from txn, if tracked.
* A read-only txn (!C_UNTRACK) may have been freed already,
* so do not peek inside it. Only write txns track cursors.
*/
if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) { if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi]; MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];
while (*prev && *prev != mc) prev = &(*prev)->mc_next; while (*prev && *prev != mc) prev = &(*prev)->mc_next;
@ -7767,6 +7773,7 @@ mdb_cursor_dbi(MDB_cursor *mc)
} }
/** Replace the key for a branch node with a new key. /** Replace the key for a branch node with a new key.
* Set #MDB_TXN_ERROR on failure.
* @param[in] mc Cursor pointing to the node to operate on. * @param[in] mc Cursor pointing to the node to operate on.
* @param[in] key The new key to use. * @param[in] key The new key to use.
* @return 0 on success, non-zero on failure. * @return 0 on success, non-zero on failure.
@ -8323,7 +8330,7 @@ mdb_rebalance(MDB_cursor *mc)
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0)); mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0));
rc = mdb_page_get(mc->mc_txn,mc->mc_db->md_root,&mc->mc_pg[0],NULL); rc = mdb_page_get(mc, mc->mc_db->md_root, &mc->mc_pg[0], NULL);
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
mc->mc_db->md_depth--; mc->mc_db->md_depth--;
@ -8384,7 +8391,7 @@ mdb_rebalance(MDB_cursor *mc)
mdb_debug("reading right neighbor"); mdb_debug("reading right neighbor");
mn.mc_ki[ptop]++; mn.mc_ki[ptop]++;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]); node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL); rc = mdb_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL);
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
mn.mc_ki[mn.mc_top] = 0; mn.mc_ki[mn.mc_top] = 0;
@ -8396,7 +8403,7 @@ mdb_rebalance(MDB_cursor *mc)
mdb_debug("reading left neighbor"); mdb_debug("reading left neighbor");
mn.mc_ki[ptop]--; mn.mc_ki[ptop]--;
node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]); node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]);
rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL); rc = mdb_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL);
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1; mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1;
@ -8587,7 +8594,6 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi,
* run out of space, triggering a split. We need this * run out of space, triggering a split. We need this
* cursor to be consistent until the end of the rebalance. * cursor to be consistent until the end of the rebalance.
*/ */
mc.mc_flags |= C_UNTRACK;
mc.mc_next = txn->mt_cursors[dbi]; mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc; txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_del(&mc, flags); rc = mdb_cursor_del(&mc, flags);
@ -8597,6 +8603,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi,
} }
/** Split a page and insert a new node. /** Split a page and insert a new node.
* Set #MDB_TXN_ERROR on failure.
* @param[in,out] mc Cursor pointing to the page and desired insertion index. * @param[in,out] mc Cursor pointing to the page and desired insertion index.
* The cursor will be updated to point to the actual page and index where * The cursor will be updated to point to the actual page and index where
* the node got inserted after the split. * the node got inserted after the split.
@ -8685,7 +8692,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
split_indx = newindx; split_indx = newindx;
nkeys = 0; nkeys = 0;
} else { } else {
split_indx = (nkeys+1) / 2; split_indx = (nkeys+1) / 2;
if (IS_LEAF2(rp)) { if (IS_LEAF2(rp)) {
@ -8845,7 +8851,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
} else { } else {
/* find right page's left sibling */ /* find right page's left sibling */
mc->mc_ki[ptop] = mn.mc_ki[ptop]; mc->mc_ki[ptop] = mn.mc_ki[ptop];
mdb_cursor_sibling(mc, 0); rc = mdb_cursor_sibling(mc, 0);
} }
} }
} else { } else {
@ -8853,8 +8859,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0); rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++; mn.mc_top++;
} }
if (unlikely(rc != MDB_SUCCESS)) if (unlikely(rc != MDB_SUCCESS)) {
if (rc == MDB_NOTFOUND) /* improper mdb_cursor_sibling() result */
rc = MDB_PROBLEM;
goto done; goto done;
}
if (nflags & MDB_APPEND) { if (nflags & MDB_APPEND) {
mc->mc_pg[mc->mc_top] = rp; mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[mc->mc_top] = 0; mc->mc_ki[mc->mc_top] = 0;
@ -9086,7 +9095,10 @@ typedef struct mdb_copy {
HANDLE mc_fd; HANDLE mc_fd;
int mc_toggle; /**< Buffer number in provider */ int mc_toggle; /**< Buffer number in provider */
int mc_new; /**< (0-2 buffers to write) | (#MDB_EOF at end) */ int mc_new; /**< (0-2 buffers to write) | (#MDB_EOF at end) */
volatile int mc_error; /**< Error code, never cleared if set */ /** Error code. Never cleared if set. Both threads can set nonzero
* to fail the copy. Not mutex-protected, LMDB expects atomic int.
*/
volatile int mc_error;
} mdb_copy; } mdb_copy;
/** Dedicated writer thread for compacting copy. */ /** Dedicated writer thread for compacting copy. */
@ -9164,12 +9176,15 @@ mdb_env_cthr_toggle(mdb_copy *my, int adjust)
return my->mc_error; return my->mc_error;
} }
/** Depth-first tree traversal for compacting copy. */ /** Depth-first tree traversal for compacting copy.
* @param[in] my control structure.
* @param[in,out] pg database root.
* @param[in] flags includes #F_DUPDATA if it is a sorted-duplicate sub-DB.
*/
static int __cold static int __cold
mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
{ {
MDB_cursor mc; MDB_cursor mc;
MDB_txn *txn = my->mc_txn;
MDB_node *ni; MDB_node *ni;
MDB_page *mo, *mp, *leaf; MDB_page *mo, *mp, *leaf;
char *buf, *ptr; char *buf, *ptr;
@ -9182,9 +9197,9 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
memset(&mc, 0, sizeof(mc)); memset(&mc, 0, sizeof(mc));
mc.mc_snum = 1; mc.mc_snum = 1;
mc.mc_txn = txn; mc.mc_txn = my->mc_txn;
rc = mdb_page_get(txn, *pg, &mc.mc_pg[0], NULL); rc = mdb_page_get(&mc, *pg, &mc.mc_pg[0], NULL);
if (rc) if (rc)
return rc; return rc;
rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST); rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST);
@ -9229,7 +9244,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(&pg, NODEDATA(ni), sizeof(pg));
memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t)); memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t));
rc = mdb_page_get(txn, pg, &omp, NULL); rc = mdb_page_get(&mc, pg, &omp, NULL);
if (rc) if (rc)
goto done; goto done;
if (my->mc_wlen[toggle] >= MDB_WBUF) { if (my->mc_wlen[toggle] >= MDB_WBUF) {
@ -9279,7 +9294,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
again: again:
ni = NODEPTR(mp, mc.mc_ki[mc.mc_top]); ni = NODEPTR(mp, mc.mc_ki[mc.mc_top]);
pg = NODEPGNO(ni); pg = NODEPGNO(ni);
rc = mdb_page_get(txn, pg, &mp, NULL); rc = mdb_page_get(&mc, pg, &mp, NULL);
if (rc) if (rc)
goto done; goto done;
mc.mc_top++; mc.mc_top++;
@ -9894,7 +9909,8 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned flags, MDB_dbi *dbi)
memset(&dummy, 0, sizeof(dummy)); memset(&dummy, 0, sizeof(dummy));
dummy.md_root = P_INVALID; dummy.md_root = P_INVALID;
dummy.md_flags = flags & PERSISTENT_FLAGS; dummy.md_flags = flags & PERSISTENT_FLAGS;
rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA); WITH_CURSOR_TRACKING(mc,
rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA));
dbflag |= DB_DIRTY; dbflag |= DB_DIRTY;
} }
@ -10025,7 +10041,7 @@ mdb_drop0(MDB_cursor *mc, int subs)
MDB_page *omp; MDB_page *omp;
pgno_t pg; pgno_t pg;
memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(&pg, NODEDATA(ni), sizeof(pg));
rc = mdb_page_get(txn, pg, &omp, NULL); rc = mdb_page_get(mc, pg, &omp, NULL);
if (unlikely(rc)) if (unlikely(rc))
goto done; goto done;
mdb_cassert(mc, IS_OVERFLOW(omp)); mdb_cassert(mc, IS_OVERFLOW(omp));
@ -10305,7 +10321,7 @@ mdb_reader_check(MDB_env *env, int *dead)
return mdb_reader_check0(env, 0, dead); return mdb_reader_check0(env, 0, dead);
} }
/** As #mdb_reader_check(). rlocked = <caller locked the reader mutex>. */ /** As #mdb_reader_check(). \b rlocked is set if caller locked #me_rmutex. */
static int __cold static int __cold
mdb_reader_check0(MDB_env *env, int rlocked, int *dead) mdb_reader_check0(MDB_env *env, int rlocked, int *dead)
{ {

9
mdbx.c
View File

@ -182,7 +182,12 @@ mdb_env_walk(mdb_walk_ctx_t *ctx, const char* dbi, pgno_t pg, int flags, int dee
if (pg == P_INVALID) if (pg == P_INVALID)
return MDB_SUCCESS; /* empty db */ return MDB_SUCCESS; /* empty db */
rc = mdb_page_get(ctx->mw_txn, pg, &mp, NULL); MDB_cursor mc;
memset(&mc, 0, sizeof(mc));
mc.mc_snum = 1;
mc.mc_txn = ctx->mw_txn;
rc = mdb_page_get(&mc, pg, &mp, NULL);
if (rc) if (rc)
return rc; return rc;
if (pg != mp->mp_p.p_pgno) if (pg != mp->mp_p.p_pgno)
@ -249,7 +254,7 @@ mdb_env_walk(mdb_walk_ctx_t *ctx, const char* dbi, pgno_t pg, int flags, int dee
payload_size += sizeof(pgno_t); payload_size += sizeof(pgno_t);
opg = NODEDATA(node); opg = NODEDATA(node);
rc = mdb_page_get(ctx->mw_txn, *opg, &omp, NULL); rc = mdb_page_get(&mc, *opg, &omp, NULL);
if (rc) if (rc)
return rc; return rc;
if (*opg != omp->mp_p.p_pgno) if (*opg != omp->mp_p.p_pgno)