diff --git a/lmdb.h b/lmdb.h index 1237fdcd..6a74660f 100644 --- a/lmdb.h +++ b/lmdb.h @@ -66,6 +66,11 @@ * This does not use actual memory or disk space, but users may need * to understand the difference so they won't be scared off. * + * - An LMDB configuration will often reserve considerable \b unused + * memory address space and maybe file size for future growth. + * This does not use actual memory or disk space, but users may need + * to understand the difference so they won't be scared off. + * * - By default, in versions before 0.9.10, unused portions of the data * file might receive garbage data from memory freed by other code. * (This does not happen when using the #MDB_WRITEMAP flag.) As of diff --git a/mdb.c b/mdb.c index 16a24bbd..447a618d 100644 --- a/mdb.c +++ b/mdb.c @@ -863,11 +863,12 @@ struct MDB_txn { * @ingroup internal * @{ */ -#define DB_DIRTY 0x01 /**< DB was modified or is DUPSORT data */ +#define DB_DIRTY 0x01 /**< DB was written in this txn */ #define DB_STALE 0x02 /**< Named-DB record is older than txnID */ #define DB_NEW 0x04 /**< Named-DB handle opened in this txn */ #define DB_VALID 0x08 /**< DB handle is valid, see also #MDB_VALID */ #define DB_USRVALID 0x10 /**< As #DB_VALID, but not set for #FREE_DBI */ +#define DB_DUPDATA 0x20 /**< DB is #MDB_DUPSORT data */ /** @} */ /** In write txns, array of cursors for each DB */ MDB_cursor **mt_cursors; @@ -1099,10 +1100,10 @@ typedef struct MDB_ntxn { #define METAPAGE_2(env) \ (&((MDB_metabuf*) ((env)->me_map + env->me_psize))->mb_metabuf.mm_meta) -static int mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp, int flags); -static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp); -static int mdb_page_touch(MDB_cursor *mc); -static int mdb_cursor_touch(MDB_cursor *mc); +static int mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp, int flags); +static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp); +static int mdb_page_touch(MDB_cursor *mc); +static int mdb_cursor_touch(MDB_cursor *mc); #define MDB_END_NAMES {"committed", "empty-commit", "abort", "reset", \ "reset-tmp", "fail-begin", "fail-beginchild"} @@ -1115,16 +1116,16 @@ enum { #define MDB_END_UPDATE 0x10 /**< update env state (DBIs) */ #define MDB_END_FREE 0x20 /**< free txn unless it is #MDB_env.%me_txn0 */ #define MDB_END_SLOT MDB_NOTLS /**< release any reader slot if #MDB_NOTLS */ -static int mdb_txn_end(MDB_txn *txn, unsigned mode); +static int mdb_txn_end(MDB_txn *txn, unsigned mode); -static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **mp, int *lvl); -static int mdb_page_search_root(MDB_cursor *mc, +static int mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **mp, int *lvl); +static int mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify); #define MDB_PS_MODIFY 1 #define MDB_PS_ROOTONLY 2 #define MDB_PS_FIRST 4 #define MDB_PS_LAST 8 -static int mdb_page_search(MDB_cursor *mc, +static int mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags); static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst); @@ -1132,17 +1133,17 @@ static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst); static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno, unsigned nflags); -static int mdb_env_read_header(MDB_env *env, MDB_meta *meta); -static int mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending); -static void mdb_env_close0(MDB_env *env); +static int mdb_env_read_header(MDB_env *env, MDB_meta *meta); +static int mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending); +static void mdb_env_close0(MDB_env *env); -static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp); -static int mdb_node_add(MDB_cursor *mc, indx_t indx, +static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp); +static int mdb_node_add(MDB_cursor *mc, indx_t indx, MDB_val *key, MDB_val *data, pgno_t pgno, unsigned flags); -static void mdb_node_del(MDB_cursor *mc, int ksize); -static void mdb_node_shrink(MDB_page *mp, indx_t indx); +static void mdb_node_del(MDB_cursor *mc, int ksize); +static void mdb_node_shrink(MDB_page *mp, indx_t indx); static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft); -static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data); +static int mdb_node_read(MDB_cursor *mc, MDB_node *leaf, MDB_val *data); static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data); static size_t mdb_branch_size(MDB_env *env, MDB_val *key); @@ -1168,8 +1169,8 @@ static void mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node); static void mdb_xcursor_init2(MDB_cursor *mc, MDB_xcursor *src_mx, int force); static int mdb_drop0(MDB_cursor *mc, int subs); -static void mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi); -static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead); +static void mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi); +static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead); /** @cond */ static MDB_cmp_func mdb_cmp_memn, mdb_cmp_memnr, mdb_cmp_int_ai, mdb_cmp_int_a2, mdb_cmp_int_ua; @@ -1550,6 +1551,7 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) /** Allocate memory for a page. * Re-use old malloc'd pages first for singletons, otherwise just malloc. + * Set #MDB_TXN_ERROR on failure. */ static MDB_page * mdb_page_malloc(MDB_txn *txn, unsigned num) @@ -1722,20 +1724,16 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) { enum { Mask = P_SUBP|P_DIRTY|P_LOOSE|P_KEEP }; MDB_txn *txn = mc->mc_txn; - MDB_cursor *m3; + MDB_cursor *m3, *m0 = mc; MDB_xcursor *mx; MDB_page *dp, *mp; MDB_node *leaf; unsigned i, j; int rc = MDB_SUCCESS, level; - /* Mark pages seen by cursors */ - if (mc->mc_flags & C_UNTRACK) - mc = NULL; /* will find mc in mt_cursors */ - for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) { - for (; mc; mc=mc->mc_next) { - if (!(mc->mc_flags & C_INITIALIZED)) - continue; + /* Mark pages seen by cursors: First m0, then tracked cursors */ + for (i = txn->mt_numdbs;; ) { + if (mc->mc_flags & C_INITIALIZED) { for (m3 = mc;; m3 = &mx->mx_cursor) { mp = NULL; for (j=0; jmc_snum; j++) { @@ -1754,10 +1752,13 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) break; } } - if (i == 0) - break; + mc = mc->mc_next; + for (; !mc || mc == m0; mc = txn->mt_cursors[--i]) + if (i == 0) + goto mark_done; } +mark_done: if (all) { /* Mark dirty root pages */ for (i=0; imt_numdbs; i++) { @@ -1765,7 +1766,7 @@ mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) pgno_t pgno = txn->mt_dbs[i].md_root; if (pgno == P_INVALID) continue; - if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) + if (unlikely((rc = mdb_page_get(m0, pgno, &dp, &level)) != MDB_SUCCESS)) break; if ((dp->mp_flags & Mask) == pflags && level <= 1) dp->mp_flags ^= P_KEEP; @@ -2053,7 +2054,7 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp) } /** Allocate page numbers and memory for writing. Maintain me_pglast, - * me_pghead and mt_next_pgno. + * me_pghead and mt_next_pgno. Set #MDB_TXN_ERROR on failure. * * If there are free pages available from older transactions, they * are re-used first. Otherwise allocate a new page at mt_next_pgno. @@ -2221,7 +2222,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp, int flags) np = m2.mc_pg[m2.mc_top]; leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]); - if (unlikely((rc = mdb_node_read(txn, leaf, &data)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(&m2, leaf, &data)) != MDB_SUCCESS)) goto fail; if ((flags & MDBX_LIFORECLAIM) && !txn->mt_lifo_reclaimed) { @@ -2480,6 +2481,7 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret) } /** Touch a page: make it dirty and re-insert into tree with updated pgno. + * Set #MDB_TXN_ERROR on failure. * @param[in] mc cursor pointing to the page to be touched * @return 0 on success, non-zero on failure. */ @@ -5375,7 +5377,9 @@ mdb_cursor_pop(MDB_cursor *mc) } } -/** Push a page onto the top of the cursor's stack. */ +/** Push a page onto the top of the cursor's stack. + * Set #MDB_TXN_ERROR on failure. + */ static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) { @@ -5395,15 +5399,17 @@ mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) } /** Find the address of the page corresponding to a given page number. - * @param[in] txn the transaction for this access. + * Set #MDB_TXN_ERROR on failure. + * @param[in] mc the cursor accessing the page. * @param[in] pgno the page number for the page to retrieve. * @param[out] ret address of a pointer where the page's address will be stored. * @param[out] lvl dirty_list inheritance level of found page. 1=current txn, 0=mapped page. * @return 0 on success, non-zero on failure. */ static int -mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) +mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl) { + MDB_txn *txn = mc->mc_txn; MDB_env *env = txn->mt_env; MDB_page *p = NULL; int level; @@ -5497,7 +5503,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int flags) mdb_cassert(mc, i < NUMKEYS(mp)); node = NODEPTR(mp, i); - if (unlikely((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)) + if (unlikely((rc = mdb_page_get(mc, NODEPGNO(node), &mp, NULL)) != 0)) return rc; mc->mc_ki[mc->mc_top] = i; @@ -5539,7 +5545,7 @@ mdb_page_search_lowest(MDB_cursor *mc) MDB_node *node = NODEPTR(mp, 0); int rc; - if (unlikely((rc = mdb_page_get(mc->mc_txn, NODEPGNO(node), &mp, NULL)) != 0)) + if (unlikely((rc = mdb_page_get(mc, NODEPGNO(node), &mp, NULL)) != 0)) return rc; mc->mc_ki[mc->mc_top] = 0; @@ -5591,7 +5597,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) return MDB_NOTFOUND; if (unlikely((leaf->mn_flags & (F_DUPDATA|F_SUBDATA)) != F_SUBDATA)) return MDB_INCOMPATIBLE; /* not a named DB */ - rc = mdb_node_read(mc->mc_txn, leaf, &data); + rc = mdb_node_read(&mc2, leaf, &data); if (rc) return rc; memcpy(&flags, ((char *) data.mv_data + offsetof(MDB_db, md_flags)), @@ -5615,7 +5621,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) mdb_cassert(mc, root > 1); if (!mc->mc_pg[0] || mc->mc_pg[0]->mp_pgno != root) - if (unlikely((rc = mdb_page_get(mc->mc_txn, root, &mc->mc_pg[0], NULL)) != 0)) + if (unlikely((rc = mdb_page_get(mc, root, &mc->mc_pg[0], NULL)) != 0)) return rc; mc->mc_snum = 1; @@ -5712,13 +5718,13 @@ release: } /** Return the data associated with a given node. - * @param[in] txn The transaction for this operation. + * @param[in] mc The cursor for this operation. * @param[in] leaf The node being read. * @param[out] data Updated to point to the node's data. * @return 0 on success, non-zero on failure. */ static MDBX_INLINE int -mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data) +mdb_node_read(MDB_cursor *mc, MDB_node *leaf, MDB_val *data) { MDB_page *omp; /* overflow page */ pgno_t pgno; @@ -5734,7 +5740,7 @@ mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data) */ data->mv_size = NODEDSZ(leaf); memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); - if (unlikely((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0)) { + if (unlikely((rc = mdb_page_get(mc, pgno, &omp, NULL)) != 0)) { mdb_debug("read overflow page %zu failed", pgno); return rc; } @@ -5815,7 +5821,7 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) mdb_cassert(mc, IS_BRANCH(mc->mc_pg[mc->mc_top])); indx = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - if (unlikely((rc = mdb_page_get(mc->mc_txn, NODEPGNO(indx), &mp, NULL)) != 0)) { + if (unlikely((rc = mdb_page_get(mc, NODEPGNO(indx), &mp, NULL)) != 0)) { /* mc will be inconsistent if caller does mc_snum++ as above */ mc->mc_flags &= ~(C_INITIALIZED|C_EOF); return rc; @@ -5898,7 +5904,7 @@ skip: mdb_xcursor_init1(mc, leaf); } if (data) { - if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS)) return rc; if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { @@ -5981,7 +5987,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) mdb_xcursor_init1(mc, leaf); } if (data) { - if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS)) return rc; if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { @@ -6170,7 +6176,7 @@ set1: } } else if (op == MDB_GET_BOTH || op == MDB_GET_BOTH_RANGE) { MDB_val olddata; - if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, &olddata)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(mc, leaf, &olddata)) != MDB_SUCCESS)) return rc; rc = mc->mc_dbx->md_dcmp(data, &olddata); if (rc) { @@ -6183,7 +6189,7 @@ set1: } else { if (mc->mc_xcursor) mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); - if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS)) return rc; } } @@ -6232,7 +6238,7 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data) if (unlikely(rc)) return rc; } else { - if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS)) return rc; } } @@ -6277,7 +6283,7 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data) if (unlikely(rc)) return rc; } else { - if (unlikely((rc = mdb_node_read(mc->mc_txn, leaf, data)) != MDB_SUCCESS)) + if (unlikely((rc = mdb_node_read(mc, leaf, data)) != MDB_SUCCESS)) return rc; } } @@ -6326,7 +6332,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT); } else { - rc = mdb_node_read(mc->mc_txn, leaf, data); + rc = mdb_node_read(mc, leaf, data); } } } @@ -6443,7 +6449,7 @@ fetchm: MDB_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { MDB_GET_KEY(leaf, key); - rc = mdb_node_read(mc->mc_txn, leaf, data); + rc = mdb_node_read(mc, leaf, data); break; } } @@ -6480,7 +6486,8 @@ mdb_cursor_touch(MDB_cursor *mc) { int rc = MDB_SUCCESS; - if (mc->mc_dbi >= CORE_DBS && !(*mc->mc_dbflag & DB_DIRTY)) { + if (mc->mc_dbi >= CORE_DBS && !(*mc->mc_dbflag & (DB_DIRTY|DB_DUPDATA))) { + /* Touch DB record of named DB */ MDB_cursor mc2; MDB_xcursor mcx; if (TXN_DBI_CHANGED(mc->mc_txn, mc->mc_dbi)) @@ -6843,7 +6850,7 @@ current: int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); memcpy(&pg, olddata.mv_data, sizeof(pg)); - if (unlikely((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0)) + if (unlikely((rc2 = mdb_page_get(mc, pg, &omp, &level)) != 0)) return rc2; ovpages = omp->mp_pages; @@ -6877,13 +6884,8 @@ current: * parent txn, in case the user peeks at MDB_RESERVEd * or unused parts. Some users treat ovpages specially. */ -#if MDBX_MODE_ENABLED - /* LY: New page will contain only header from origin, - * but no any payload */ - memcpy(np, omp, PAGEHDRSZ); -#else size_t sz = (size_t) env->me_psize * ovpages, off; - if (!(flags & MDB_RESERVE)) { + if (MDBX_MODE_ENABLED || !(flags & MDB_RESERVE)) { /* Skip the part where LMDB will put *data. * Copy end of page, adjusting alignment so * compiler may copy words instead of bytes. @@ -6894,7 +6896,6 @@ current: sz = PAGEHDRSZ; } memcpy(np, omp, sz); /* Copy whole or header of page */ -#endif /* MDBX_MODE_ENABLED */ omp = np; } SETDSZ(leaf, data->mv_size); @@ -7154,7 +7155,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned flags) pgno_t pg; memcpy(&pg, NODEDATA(leaf), sizeof(pg)); - if (unlikely((rc = mdb_page_get(mc->mc_txn, pg, &omp, NULL)) || + if (unlikely((rc = mdb_page_get(mc, pg, &omp, NULL)) || (rc = mdb_ovpage_free(mc, omp)))) goto fail; } @@ -7168,6 +7169,7 @@ fail: } /** Allocate and initialize new pages for a database. + * Set #MDB_TXN_ERROR on failure. * @param[in] mc a cursor on the database being added to. * @param[in] flags flags defining what type of page is being allocated. * @param[in] num the number of pages to allocate. This is usually 1, @@ -7253,6 +7255,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key) } /** Add a node to the page pointed to by the cursor. + * Set #MDB_TXN_ERROR on failure. * @param[in] mc The cursor for this operation. * @param[in] indx The index on the page where the new node should be added. * @param[in] key The key for the new node. @@ -7563,7 +7566,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) } mdb_debug("Sub-db -%u root page %zu", mx->mx_cursor.mc_dbi, mx->mx_db.md_root); - mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ + mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DUPDATA; /* #if UINT_MAX < SIZE_MAX if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t)) mx->mx_dbx.md_cmp = mdb_cmp_clong; @@ -7589,7 +7592,7 @@ mdb_xcursor_init2(MDB_cursor *mc, MDB_xcursor *src_mx, int new_dupdata) mx->mx_cursor.mc_top = 0; mx->mx_cursor.mc_flags |= C_INITIALIZED; mx->mx_cursor.mc_ki[0] = 0; - mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DIRTY; /* DB_DIRTY guides mdb_cursor_touch */ + mx->mx_dbflag = DB_VALID|DB_USRVALID|DB_DUPDATA; #if UINT_MAX < SIZE_MAX mx->mx_dbx.md_cmp = src_mx->mx_dbx.md_cmp; #endif @@ -7737,7 +7740,10 @@ mdb_cursor_close(MDB_cursor *mc) if (mc) { mdb_ensure(NULL, mc->mc_signature == MDBX_MC_SIGNATURE); if (!mc->mc_backup) { - /* remove from txn, if tracked */ + /* Remove from txn, if tracked. + * A read-only txn (!C_UNTRACK) may have been freed already, + * so do not peek inside it. Only write txns track cursors. + */ if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) { MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi]; while (*prev && *prev != mc) prev = &(*prev)->mc_next; @@ -7767,6 +7773,7 @@ mdb_cursor_dbi(MDB_cursor *mc) } /** Replace the key for a branch node with a new key. + * Set #MDB_TXN_ERROR on failure. * @param[in] mc Cursor pointing to the node to operate on. * @param[in] key The new key to use. * @return 0 on success, non-zero on failure. @@ -8323,7 +8330,7 @@ mdb_rebalance(MDB_cursor *mc) if (unlikely(rc)) return rc; mc->mc_db->md_root = NODEPGNO(NODEPTR(mp, 0)); - rc = mdb_page_get(mc->mc_txn,mc->mc_db->md_root,&mc->mc_pg[0],NULL); + rc = mdb_page_get(mc, mc->mc_db->md_root, &mc->mc_pg[0], NULL); if (unlikely(rc)) return rc; mc->mc_db->md_depth--; @@ -8384,7 +8391,7 @@ mdb_rebalance(MDB_cursor *mc) mdb_debug("reading right neighbor"); mn.mc_ki[ptop]++; node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]); - rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL); + rc = mdb_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL); if (unlikely(rc)) return rc; mn.mc_ki[mn.mc_top] = 0; @@ -8396,7 +8403,7 @@ mdb_rebalance(MDB_cursor *mc) mdb_debug("reading left neighbor"); mn.mc_ki[ptop]--; node = NODEPTR(mc->mc_pg[ptop], mn.mc_ki[ptop]); - rc = mdb_page_get(mc->mc_txn,NODEPGNO(node),&mn.mc_pg[mn.mc_top],NULL); + rc = mdb_page_get(mc, NODEPGNO(node), &mn.mc_pg[mn.mc_top], NULL); if (unlikely(rc)) return rc; mn.mc_ki[mn.mc_top] = NUMKEYS(mn.mc_pg[mn.mc_top]) - 1; @@ -8587,7 +8594,6 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, * run out of space, triggering a split. We need this * cursor to be consistent until the end of the rebalance. */ - mc.mc_flags |= C_UNTRACK; mc.mc_next = txn->mt_cursors[dbi]; txn->mt_cursors[dbi] = &mc; rc = mdb_cursor_del(&mc, flags); @@ -8597,6 +8603,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi, } /** Split a page and insert a new node. + * Set #MDB_TXN_ERROR on failure. * @param[in,out] mc Cursor pointing to the page and desired insertion index. * The cursor will be updated to point to the actual page and index where * the node got inserted after the split. @@ -8685,7 +8692,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno split_indx = newindx; nkeys = 0; } else { - split_indx = (nkeys+1) / 2; if (IS_LEAF2(rp)) { @@ -8845,7 +8851,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno } else { /* find right page's left sibling */ mc->mc_ki[ptop] = mn.mc_ki[ptop]; - mdb_cursor_sibling(mc, 0); + rc = mdb_cursor_sibling(mc, 0); } } } else { @@ -8853,8 +8859,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0); mn.mc_top++; } - if (unlikely(rc != MDB_SUCCESS)) + if (unlikely(rc != MDB_SUCCESS)) { + if (rc == MDB_NOTFOUND) /* improper mdb_cursor_sibling() result */ + rc = MDB_PROBLEM; goto done; + } if (nflags & MDB_APPEND) { mc->mc_pg[mc->mc_top] = rp; mc->mc_ki[mc->mc_top] = 0; @@ -9086,7 +9095,10 @@ typedef struct mdb_copy { HANDLE mc_fd; int mc_toggle; /**< Buffer number in provider */ int mc_new; /**< (0-2 buffers to write) | (#MDB_EOF at end) */ - volatile int mc_error; /**< Error code, never cleared if set */ + /** Error code. Never cleared if set. Both threads can set nonzero + * to fail the copy. Not mutex-protected, LMDB expects atomic int. + */ + volatile int mc_error; } mdb_copy; /** Dedicated writer thread for compacting copy. */ @@ -9164,12 +9176,15 @@ mdb_env_cthr_toggle(mdb_copy *my, int adjust) return my->mc_error; } - /** Depth-first tree traversal for compacting copy. */ + /** Depth-first tree traversal for compacting copy. + * @param[in] my control structure. + * @param[in,out] pg database root. + * @param[in] flags includes #F_DUPDATA if it is a sorted-duplicate sub-DB. + */ static int __cold mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) { MDB_cursor mc; - MDB_txn *txn = my->mc_txn; MDB_node *ni; MDB_page *mo, *mp, *leaf; char *buf, *ptr; @@ -9182,9 +9197,9 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) memset(&mc, 0, sizeof(mc)); mc.mc_snum = 1; - mc.mc_txn = txn; + mc.mc_txn = my->mc_txn; - rc = mdb_page_get(txn, *pg, &mc.mc_pg[0], NULL); + rc = mdb_page_get(&mc, *pg, &mc.mc_pg[0], NULL); if (rc) return rc; rc = mdb_page_search_root(&mc, NULL, MDB_PS_FIRST); @@ -9229,7 +9244,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) memcpy(&pg, NODEDATA(ni), sizeof(pg)); memcpy(NODEDATA(ni), &my->mc_next_pgno, sizeof(pgno_t)); - rc = mdb_page_get(txn, pg, &omp, NULL); + rc = mdb_page_get(&mc, pg, &omp, NULL); if (rc) goto done; if (my->mc_wlen[toggle] >= MDB_WBUF) { @@ -9279,7 +9294,7 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags) again: ni = NODEPTR(mp, mc.mc_ki[mc.mc_top]); pg = NODEPGNO(ni); - rc = mdb_page_get(txn, pg, &mp, NULL); + rc = mdb_page_get(&mc, pg, &mp, NULL); if (rc) goto done; mc.mc_top++; @@ -9894,7 +9909,8 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned flags, MDB_dbi *dbi) memset(&dummy, 0, sizeof(dummy)); dummy.md_root = P_INVALID; dummy.md_flags = flags & PERSISTENT_FLAGS; - rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA); + WITH_CURSOR_TRACKING(mc, + rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA)); dbflag |= DB_DIRTY; } @@ -10025,7 +10041,7 @@ mdb_drop0(MDB_cursor *mc, int subs) MDB_page *omp; pgno_t pg; memcpy(&pg, NODEDATA(ni), sizeof(pg)); - rc = mdb_page_get(txn, pg, &omp, NULL); + rc = mdb_page_get(mc, pg, &omp, NULL); if (unlikely(rc)) goto done; mdb_cassert(mc, IS_OVERFLOW(omp)); @@ -10305,7 +10321,7 @@ mdb_reader_check(MDB_env *env, int *dead) return mdb_reader_check0(env, 0, dead); } -/** As #mdb_reader_check(). rlocked = . */ +/** As #mdb_reader_check(). \b rlocked is set if caller locked #me_rmutex. */ static int __cold mdb_reader_check0(MDB_env *env, int rlocked, int *dead) { diff --git a/mdbx.c b/mdbx.c index 64c57683..ce930336 100644 --- a/mdbx.c +++ b/mdbx.c @@ -182,7 +182,12 @@ mdb_env_walk(mdb_walk_ctx_t *ctx, const char* dbi, pgno_t pg, int flags, int dee if (pg == P_INVALID) return MDB_SUCCESS; /* empty db */ - rc = mdb_page_get(ctx->mw_txn, pg, &mp, NULL); + MDB_cursor mc; + memset(&mc, 0, sizeof(mc)); + mc.mc_snum = 1; + mc.mc_txn = ctx->mw_txn; + + rc = mdb_page_get(&mc, pg, &mp, NULL); if (rc) return rc; if (pg != mp->mp_p.p_pgno) @@ -220,7 +225,7 @@ mdb_env_walk(mdb_walk_ctx_t *ctx, const char* dbi, pgno_t pg, int flags, int dee } for (align_bytes = i = 0; i < nkeys; - align_bytes += ((payload_size + align_bytes) & 1), i++) { + align_bytes += ((payload_size + align_bytes) & 1), i++) { MDB_node *node; if (IS_LEAF2(mp)) { @@ -249,7 +254,7 @@ mdb_env_walk(mdb_walk_ctx_t *ctx, const char* dbi, pgno_t pg, int flags, int dee payload_size += sizeof(pgno_t); opg = NODEDATA(node); - rc = mdb_page_get(ctx->mw_txn, *opg, &omp, NULL); + rc = mdb_page_get(&mc, *opg, &omp, NULL); if (rc) return rc; if (*opg != omp->mp_p.p_pgno)