mdbx: alter mdbx_page_merge().

Change-Id: I33a3523e0efc914f6d85dfbbf69b97dd70d6c721
This commit is contained in:
Leo Yuriev 2018-09-09 16:54:26 +03:00
parent 36fe81edad
commit ff738f1512

View File

@ -9775,6 +9775,8 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
MDBX_val key; MDBX_val key;
int rc; int rc;
mdbx_cassert(csrc, csrc != cdst);
/* Mark dst as dirty. */ /* Mark dst as dirty. */
if (unlikely(rc = mdbx_page_touch(cdst))) if (unlikely(rc = mdbx_page_touch(cdst)))
return rc; return rc;
@ -9786,13 +9788,20 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
pdst->mp_pgno); pdst->mp_pgno);
mdbx_cassert(csrc, PAGETYPE(psrc) == PAGETYPE(pdst)); mdbx_cassert(csrc, PAGETYPE(psrc) == PAGETYPE(pdst));
mdbx_cassert(csrc,
csrc->mc_dbi == cdst->mc_dbi && csrc->mc_db == cdst->mc_db);
mdbx_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */ mdbx_cassert(csrc, csrc->mc_snum > 1); /* can't merge root page */
mdbx_cassert(cdst, cdst->mc_snum > 1); mdbx_cassert(cdst, cdst->mc_snum > 1);
mdbx_cassert(cdst, cdst->mc_snum < cdst->mc_db->md_depth ||
IS_LEAF(cdst->mc_pg[cdst->mc_db->md_depth - 1]));
mdbx_cassert(csrc, csrc->mc_snum < csrc->mc_db->md_depth ||
IS_LEAF(csrc->mc_pg[csrc->mc_db->md_depth - 1]));
const int pagetype = PAGETYPE(psrc);
/* Move all nodes from src to dst */ /* Move all nodes from src to dst */
const unsigned nkeys = NUMKEYS(pdst); const unsigned nkeys = NUMKEYS(pdst);
unsigned j = nkeys; unsigned j = nkeys;
if (IS_LEAF2(psrc)) { if (unlikely(pagetype & P_LEAF2)) {
key.iov_len = csrc->mc_db->md_xsize; key.iov_len = csrc->mc_db->md_xsize;
key.iov_base = PAGEDATA(psrc); key.iov_base = PAGEDATA(psrc);
for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) { for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) {
@ -9804,7 +9813,7 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
} else { } else {
for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) { for (unsigned i = 0; i < NUMKEYS(psrc); i++, j++) {
srcnode = NODEPTR(psrc, i); srcnode = NODEPTR(psrc, i);
if (i == 0 && IS_BRANCH(psrc)) { if (i == 0 && (pagetype & P_BRANCH)) {
MDBX_cursor mn; MDBX_cursor mn;
mdbx_cursor_copy(csrc, &mn); mdbx_cursor_copy(csrc, &mn);
mn.mc_xcursor = NULL; mn.mc_xcursor = NULL;
@ -9825,7 +9834,7 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
key.iov_base = NODEKEY(srcnode); key.iov_base = NODEKEY(srcnode);
} }
if (IS_LEAF(psrc)) { if (pagetype & P_LEAF) {
MDBX_val data; MDBX_val data;
data.iov_len = NODEDSZ(srcnode); data.iov_len = NODEDSZ(srcnode);
data.iov_base = NODEDATA(srcnode); data.iov_base = NODEDATA(srcnode);
@ -9841,7 +9850,7 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
mdbx_debug("dst page %" PRIaPGNO " now has %u keys (%.1f%% filled)", mdbx_debug("dst page %" PRIaPGNO " now has %u keys (%.1f%% filled)",
pdst->mp_pgno, NUMKEYS(pdst), pdst->mp_pgno, NUMKEYS(pdst),
(float)PAGEFILL(cdst->mc_txn->mt_env, pdst) / 10); PAGEFILL(cdst->mc_txn->mt_env, pdst) / 10.24);
mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]); mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]);
mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]); mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]);
@ -9859,26 +9868,18 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
} }
csrc->mc_top++; csrc->mc_top++;
/* If not operating on FreeDB, allow this page to be reused mdbx_cassert(csrc, psrc == csrc->mc_pg[csrc->mc_top]);
* in this txn. Otherwise just add to free list. */ mdbx_cassert(cdst, pdst == cdst->mc_pg[cdst->mc_top]);
rc = mdbx_page_loose(csrc, psrc);
if (unlikely(rc))
return rc;
{ {
/* Adjust other cursors pointing to mp */ /* Adjust other cursors pointing to mp */
MDBX_cursor *m2, *m3; MDBX_cursor *m2, *m3;
const MDBX_dbi dbi = csrc->mc_dbi; const MDBX_dbi dbi = csrc->mc_dbi;
unsigned top = csrc->mc_top; const unsigned top = csrc->mc_top;
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) { for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) {
if (csrc->mc_flags & C_SUB) m3 = (csrc->mc_flags & C_SUB) ? &m2->mc_xcursor->mx_cursor : m2;
m3 = &m2->mc_xcursor->mx_cursor; if (m3 == csrc || top >= m3->mc_snum)
else
m3 = m2;
if (m3 == csrc)
continue;
if (m3->mc_snum < csrc->mc_snum)
continue; continue;
if (m3->mc_pg[top] == psrc) { if (m3->mc_pg[top] == psrc) {
m3->mc_pg[top] = pdst; m3->mc_pg[top] = pdst;
@ -9893,23 +9894,94 @@ static int mdbx_page_merge(MDBX_cursor *csrc, MDBX_cursor *cdst) {
XCURSOR_REFRESH(m3, m3->mc_pg[top], m3->mc_ki[top]); XCURSOR_REFRESH(m3, m3->mc_pg[top], m3->mc_ki[top]);
} }
} }
{
unsigned snum = cdst->mc_snum; /* If not operating on FreeDB, allow this page to be reused
uint16_t depth = cdst->mc_db->md_depth; * in this txn. Otherwise just add to free list. */
rc = mdbx_page_loose(csrc, psrc);
if (unlikely(rc))
return rc;
mdbx_cassert(cdst, cdst->mc_db->md_entries > 0);
mdbx_cassert(cdst, cdst->mc_snum <= cdst->mc_db->md_depth);
mdbx_cassert(cdst, cdst->mc_top > 0);
mdbx_cassert(cdst, cdst->mc_snum == cdst->mc_top + 1);
MDBX_page *const top_page = cdst->mc_pg[cdst->mc_top];
const indx_t top_indx = cdst->mc_ki[cdst->mc_top];
const uint16_t save_snum = cdst->mc_snum;
const uint16_t save_depth = cdst->mc_db->md_depth;
mdbx_cursor_pop(cdst); mdbx_cursor_pop(cdst);
rc = mdbx_rebalance(cdst); rc = mdbx_rebalance(cdst);
if (unlikely(rc)) if (unlikely(rc))
return rc; return rc;
mdbx_cassert(cdst, cdst->mc_db->md_entries > 0);
/* Did the tree height change? */ mdbx_cassert(cdst, cdst->mc_db->md_entries > 0);
if (depth != cdst->mc_db->md_depth) mdbx_cassert(cdst, cdst->mc_snum <= cdst->mc_db->md_depth);
snum += cdst->mc_db->md_depth - depth; mdbx_cassert(cdst, cdst->mc_top >= 0);
mdbx_cassert(cdst, snum >= 1 && snum <= UINT16_MAX); mdbx_cassert(cdst, cdst->mc_snum == cdst->mc_top + 1);
cdst->mc_snum = (uint16_t)snum;
cdst->mc_top = (uint16_t)(snum - 1); if (IS_LEAF(cdst->mc_pg[cdst->mc_top])) {
} /* LY: don't touch cursor if top-page is a LEAF */
mdbx_cassert(cdst, IS_LEAF(cdst->mc_pg[cdst->mc_top]) ||
PAGETYPE(cdst->mc_pg[cdst->mc_top]) == pagetype);
return MDBX_SUCCESS; return MDBX_SUCCESS;
}
if (pagetype != PAGETYPE(top_page)) {
/* LY: LEAF-page becomes BRANCH, unable restore cursor's stack */
goto bailout;
}
if (top_page == cdst->mc_pg[cdst->mc_top]) {
/* LY: don't touch cursor if prev top-page already on the top */
mdbx_cassert(cdst, cdst->mc_ki[cdst->mc_top] == top_indx);
mdbx_cassert(cdst, IS_LEAF(cdst->mc_pg[cdst->mc_top]) ||
PAGETYPE(cdst->mc_pg[cdst->mc_top]) == pagetype);
return MDBX_SUCCESS;
}
const int new_snum = save_snum - save_depth + cdst->mc_db->md_depth;
if (unlikely(new_snum < 1 || new_snum > cdst->mc_db->md_depth)) {
/* LY: out of range, unable restore cursor's stack */
goto bailout;
}
if (top_page == cdst->mc_pg[new_snum - 1]) {
mdbx_cassert(cdst, cdst->mc_ki[new_snum - 1] == top_indx);
/* LY: restore cursor stack */
cdst->mc_snum = (uint16_t)new_snum;
cdst->mc_top = (uint16_t)new_snum - 1;
mdbx_cassert(cdst, cdst->mc_snum < cdst->mc_db->md_depth ||
IS_LEAF(cdst->mc_pg[cdst->mc_db->md_depth - 1]));
mdbx_cassert(cdst, IS_LEAF(cdst->mc_pg[cdst->mc_top]) ||
PAGETYPE(cdst->mc_pg[cdst->mc_top]) == pagetype);
return MDBX_SUCCESS;
}
MDBX_page *const stub_page = (MDBX_page *)(~(uintptr_t)top_page);
const indx_t stub_indx = top_indx;
if (save_depth > cdst->mc_db->md_depth &&
((cdst->mc_pg[save_snum - 1] == top_page &&
cdst->mc_ki[save_snum - 1] == top_indx) ||
(cdst->mc_pg[save_snum - 1] == stub_page &&
cdst->mc_ki[save_snum - 1] == stub_indx))) {
/* LY: restore cursor stack */
cdst->mc_pg[new_snum - 1] = top_page;
cdst->mc_ki[new_snum - 1] = top_indx;
cdst->mc_pg[new_snum] = (MDBX_page *)(~(uintptr_t)cdst->mc_pg[new_snum]);
cdst->mc_ki[new_snum] = ~cdst->mc_ki[new_snum];
cdst->mc_snum = (uint16_t)new_snum;
cdst->mc_top = (uint16_t)new_snum - 1;
mdbx_cassert(cdst, cdst->mc_snum < cdst->mc_db->md_depth ||
IS_LEAF(cdst->mc_pg[cdst->mc_db->md_depth - 1]));
mdbx_cassert(cdst, IS_LEAF(cdst->mc_pg[cdst->mc_top]) ||
PAGETYPE(cdst->mc_pg[cdst->mc_top]) == pagetype);
return MDBX_SUCCESS;
}
bailout:
/* LY: unable restore cursor's stack */
cdst->mc_flags &= ~C_INITIALIZED;
return MDBX_CURSOR_FULL;
} }
/* Copy the contents of a cursor. /* Copy the contents of a cursor.