mdbx: backport - ITS#8321 a lot for cursor tracking.

Multiple bugs were fixed in the cursor fixups which
adjust other open cursors in response to various write ops.

Includes:

 - ITS#8321 Fix del/dupsort.
   When deleting a dupsort key, if other cursors pointed at that key,
   set them to uninit'd, not EOF. They no longer have anything to
   point at.

 - ITS#8321 don't skip fixups on splitting cursors.
   Adjustments can't be skipped, in recursive calls each level must
   fixup their own level.

 - ITS#8321 fix mdb_cursor_chk().
   It was reporting spurious errors due to uninit'd cursors

 - ITS#8321 fix mdb_cursor_shadow().
   Set a valid txn so that cursor fixup code works on the shadows

 - ITS#8321 fix mdb_cursor_put.
   Ignore sub-cursors that shouldn't be fixed up

 - ITS#8321 track temporary cursors.
   In rebalance/split operations, temporary cursors need to be visible
   to propagate fixups

 - ITS#8321 simplify page_split fixups.

 - ITS#8321 reorganize page_split fixups.
   DUPFIXED fixups needed to occur after separator update.
   MDB_RESERVE handling moved after split fixup.

Change-Id: I0c04acf54ebf6e84f32996b5723ec6fafb983ad9
This commit is contained in:
Leo Yuriev 2015-11-23 09:29:01 +03:00
parent 0a97fbcbab
commit aeea7ebb08
2 changed files with 79 additions and 45 deletions

View File

@ -21,6 +21,7 @@ LMDB 0.9.17 Release Engineering
Fix ITS#8313 mdb_rebalance dummy cursor
Fix ITS#8315 dirty_room in nested txn
Fix ITS#8316 page_merge cursor tracking
Fix ITS#8321 cursor tracking
Added mdb_txn_id() (ITS#7994)
Added robust mutex support
Miscellaneous cleanup/simplification

115
mdb.c
View File

@ -889,7 +889,6 @@ struct MDB_cursor {
#define C_EOF 0x02 /**< No more data */
#define C_SUB 0x04 /**< Cursor is a sub-cursor */
#define C_DEL 0x08 /**< last op was a cursor_del */
#define C_SPLITTING 0x20 /**< Cursor is in page_split */
#define C_UNTRACK 0x40 /**< Un-track cursor when closing */
#define C_RECLAIMING 0x80 /**< FreeDB lookup is prohibited */
/** @} */
@ -1401,7 +1400,7 @@ mdb_cursor_chk(MDB_cursor *mc)
MDB_node *node;
MDB_page *mp;
if (!mc->mc_snum && !(mc->mc_flags & C_INITIALIZED)) return;
if (!mc->mc_snum || !(mc->mc_flags & C_INITIALIZED)) return;
for (i=0; i<mc->mc_top; i++) {
mp = mc->mc_pg[i];
node = NODEPTR(mp, mc->mc_ki[i]);
@ -2604,14 +2603,15 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
*bk = *mc;
mc->mc_backup = bk;
mc->mc_db = &dst->mt_dbs[i];
/* Kill pointers into src - and dst to reduce abuse: The
* user may not use mc until dst ends. Otherwise we'd...
/* Kill pointers into src to reduce abuse: The
* user may not use mc until dst ends. But we need a valid
* txn pointer here for cursor fixups to keep working.
*/
mc->mc_txn = NULL; /* ...set this to dst */
mc->mc_dbflag = NULL; /* ...and &dst->mt_dbflags[i] */
mc->mc_txn = dst;
mc->mc_dbflag = &dst->mt_dbflags[i];
if ((mx = mc->mc_xcursor) != NULL) {
*(MDB_xcursor *)(bk+1) = *mx;
mx->mx_cursor.mc_txn = NULL; /* ...and dst. */
mx->mx_cursor.mc_txn = dst;
}
mc->mc_next = dst->mt_cursors[i];
dst->mt_cursors[i] = mc;
@ -6835,6 +6835,7 @@ put_sub:
MDB_xcursor *mx = mc->mc_xcursor;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
int nkeys = NUMKEYS(mp);
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
@ -6842,9 +6843,9 @@ put_sub:
if (m2->mc_pg[i] == mp) {
if (m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init2(m2, mx, new_dupdata);
} else if (!insert_key) {
} else if (!insert_key && m2->mc_ki[i] < nkeys) {
MDB_node *n2 = NODEPTR(mp, m2->mc_ki[i]);
if (!(n2->mn_flags & F_SUBDATA))
if ((n2->mn_flags & (F_SUBDATA|F_DUPDATA)) == F_DUPDATA)
m2->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(n2);
}
}
@ -7678,6 +7679,22 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key)
static void
mdb_cursor_copy(const MDB_cursor *csrc, MDB_cursor *cdst);
/** Track a temporary cursor */
#define CURSOR_TMP_TRACK(mc, mn, dummy, tracked) \
if (mc->mc_flags & C_SUB) { \
dummy.mc_flags = C_INITIALIZED; \
dummy.mc_xcursor = (MDB_xcursor *)&mn; \
tracked = &dummy; \
} else { \
tracked = &mn; \
} \
tracked->mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi]; \
mc->mc_txn->mt_cursors[mc->mc_dbi] = tracked
/** Stop tracking a temporary cursor */
#define CURSOR_TMP_UNTRACK(mc, tracked) \
mc->mc_txn->mt_cursors[mc->mc_dbi] = tracked->mc_next
/** Move a node from csrc to cdst.
*/
static int
@ -7833,6 +7850,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
*/
if (csrc->mc_ki[csrc->mc_top] == 0) {
if (csrc->mc_ki[csrc->mc_top-1] != 0) {
MDB_cursor dummy, *tracked;
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], 0, key.mv_size);
} else {
@ -7845,7 +7863,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
mdb_cursor_copy(csrc, &mn);
mn.mc_snum--;
mn.mc_top--;
if (unlikely((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS))
/* We want mdb_rebalance to find mn when doing fixups */
CURSOR_TMP_TRACK(csrc, mn, dummy, tracked);
rc = mdb_update_key(&mn, &key);
CURSOR_TMP_UNTRACK(csrc, tracked);
if (unlikely(rc != MDB_SUCCESS))
return rc;
}
if (IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
@ -7861,6 +7883,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
if (cdst->mc_ki[cdst->mc_top] == 0) {
if (cdst->mc_ki[cdst->mc_top-1] != 0) {
MDB_cursor dummy, *tracked;
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
key.mv_data = LEAF2KEY(cdst->mc_pg[cdst->mc_top], 0, key.mv_size);
} else {
@ -7873,7 +7896,11 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
mdb_cursor_copy(cdst, &mn);
mn.mc_snum--;
mn.mc_top--;
if (unlikely((rc = mdb_update_key(&mn, &key)) != MDB_SUCCESS))
/* We want mdb_rebalance to find mn when doing fixups */
CURSOR_TMP_TRACK(cdst, mn, dummy, tracked);
rc = mdb_update_key(&mn, &key);
CURSOR_TMP_UNTRACK(cdst, tracked);
if (unlikely(rc != MDB_SUCCESS))
return rc;
}
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top])) {
@ -8231,24 +8258,13 @@ mdb_rebalance(MDB_cursor *mc)
if (!fromleft) {
rc = mdb_page_merge(&mn, mc);
} else {
MDB_cursor dummy;
MDB_cursor dummy, *tracked;
oldki += NUMKEYS(mn.mc_pg[mn.mc_top]);
mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1;
/* We want mdb_rebalance to find mn when doing fixups */
if (mc->mc_flags & C_SUB) {
dummy.mc_flags = C_INITIALIZED;
dummy.mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi];
mc->mc_txn->mt_cursors[mc->mc_dbi] = &dummy;
dummy.mc_xcursor = (MDB_xcursor *)&mn;
} else {
mn.mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi];
mc->mc_txn->mt_cursors[mc->mc_dbi] = &mn;
}
CURSOR_TMP_TRACK(mc, mn, dummy, tracked);
rc = mdb_page_merge(mc, &mn);
if (mc->mc_flags & C_SUB)
mc->mc_txn->mt_cursors[mc->mc_dbi] = dummy.mc_next;
else
mc->mc_txn->mt_cursors[mc->mc_dbi] = mn.mc_next;
CURSOR_TMP_UNTRACK(mc, tracked);
mdb_cursor_copy(&mn, mc);
}
mc->mc_flags &= ~C_EOF;
@ -8286,7 +8302,7 @@ mdb_cursor_del0(MDB_cursor *mc)
if (m3->mc_ki[mc->mc_top] > ki)
m3->mc_ki[mc->mc_top]--;
else if (mc->mc_db->md_flags & MDB_DUPSORT)
m3->mc_xcursor->mx_cursor.mc_flags |= C_EOF;
m3->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
}
}
}
@ -8475,7 +8491,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
mdb_debug("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
}
mc->mc_flags |= C_SPLITTING;
mdb_cursor_copy(mc, &mn);
mn.mc_pg[mn.mc_top] = rp;
mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
@ -8526,8 +8541,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
rp->mp_lower += sizeof(indx_t);
rp->mp_upper -= ksize - sizeof(indx_t);
mc->mc_ki[mc->mc_top] = x;
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[ptop]++;
}
} else {
int psize, nsize, k;
@ -8621,11 +8634,15 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
*/
if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) {
int snum = mc->mc_snum;
MDB_cursor dummy, *tracked;
mn.mc_snum--;
mn.mc_top--;
did_split = 1;
/* We want other splits to find mn when doing fixups */
CURSOR_TMP_TRACK(mc, mn, dummy, tracked);
rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0);
if (unlikely(rc))
CURSOR_TMP_UNTRACK(mc, tracked);
if (unlikely(rc != MDB_SUCCESS))
goto done;
/* root split? */
@ -8655,10 +8672,8 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++;
}
mc->mc_flags ^= C_SPLITTING;
if (unlikely(rc != MDB_SUCCESS)) {
if (unlikely(rc != MDB_SUCCESS))
goto done;
}
if (nflags & MDB_APPEND) {
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[mc->mc_top] = 0;
@ -8725,12 +8740,26 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
/* reset back to original page */
if (newindx < split_indx) {
mc->mc_pg[mc->mc_top] = mp;
} else {
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[ptop]++;
/* Make sure mc_ki is still valid.
*/
if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
for (i=0; i<=ptop; i++) {
mc->mc_pg[i] = mn.mc_pg[i];
mc->mc_ki[i] = mn.mc_ki[i];
}
}
}
if (nflags & MDB_RESERVE) {
node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
node = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (!(node->mn_flags & F_BIGDATA))
newdata->mv_data = NODEDATA(node);
}
} else {
if (newindx >= split_indx) {
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[ptop]++;
/* Make sure mc_ki is still valid.
@ -8749,7 +8778,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
int fixup = NUMKEYS(mp);
nkeys = NUMKEYS(mp);
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
@ -8762,12 +8791,15 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
continue;
if (new_root) {
int k;
/* sub cursors may be on different DB */
if (m3->mc_pg[0] != mp)
continue;
/* root split */
for (k=new_root; k>=0; k--) {
m3->mc_ki[k+1] = m3->mc_ki[k];
m3->mc_pg[k+1] = m3->mc_pg[k];
}
if (m3->mc_ki[0] >= split_indx) {
if (m3->mc_ki[0] > nkeys) {
m3->mc_ki[0] = 1;
} else {
m3->mc_ki[0] = 0;
@ -8776,15 +8808,16 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
m3->mc_snum++;
m3->mc_top++;
}
if (m3->mc_flags & C_SPLITTING)
continue;
if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp) {
if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
m3->mc_ki[mc->mc_top]++;
if (m3->mc_ki[mc->mc_top] >= fixup) {
if (m3->mc_ki[mc->mc_top] >= nkeys) {
m3->mc_pg[mc->mc_top] = rp;
m3->mc_ki[mc->mc_top] -= fixup;
m3->mc_ki[ptop] = mn.mc_ki[ptop];
m3->mc_ki[mc->mc_top] -= nkeys;
for (i=0; i<mc->mc_top; i++) {
m3->mc_ki[i] = mn.mc_ki[i];
m3->mc_pg[i] = mn.mc_pg[i];
}
}
} else if (!did_split && m3->mc_top >= ptop && m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {